#!/usr/bin/env python # # Public Domain 2014-present MongoDB, Inc. # Public Domain 2008-2014 WiredTiger, Inc. # # This is free and unencumbered software released into the public domain. # # Anyone is free to copy, modify, publish, use, compile, sell, or # distribute this software, either in source code form or as a compiled # binary, for any purpose, commercial or non-commercial, and by any # means. # # In jurisdictions that recognize copyright laws, the author or authors # of this software dedicate any and all copyright interest in the # software to the public domain. We make this dedication for the benefit # of the public at large and to the detriment of our heirs and # successors. We intend this dedication to be an overt act of # relinquishment in perpetuity of all present and future rights to this # software under copyright law. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. # IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR # OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, # ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR # OTHER DEALINGS IN THE SOFTWARE. # # [TEST_TAGS] # ignored_file # [END_TAGS] # # WiredTigerTestCase # parent class for all test cases # from __future__ import print_function # If unittest2 is available, use it in preference to (the old) unittest try: import unittest2 as unittest except ImportError: import unittest from contextlib import contextmanager import errno, glob, os, re, shutil, sys, time, traceback import wiredtiger, wtscenario, wthooks def shortenWithEllipsis(s, maxlen): if len(s) > maxlen: s = s[0:maxlen-3] + '...' return s class CapturedFd(object): """ CapturedFd encapsulates a file descriptor (e.g. 1 or 2) that is diverted to a file. We use this to capture and check the C stdout/stderr. Meanwhile we reset Python's sys.stdout, sys.stderr, using duped copies of the original 1, 2 fds. The end result is that Python's sys.stdout sys.stderr behave normally (e.g. go to the tty), while the C stdout/stderr ends up in a file that we can verify. """ def __init__(self, filename, desc): self.filename = filename self.desc = desc self.expectpos = 0 self.file = None def readFileFrom(self, filename, pos, maxchars): """ Read a file starting at a given position, returning the beginning of its contents """ with open(filename, 'r') as f: f.seek(pos) return shortenWithEllipsis(f.read(maxchars+1), maxchars) def capture(self): """ Start capturing the file descriptor. Note that the original targetFd is closed, we expect that the caller has duped it and passed the dup to us in the constructor. """ self.file = open(self.filename, 'w') return self.file def release(self): """ Stop capturing. """ self.file.close() self.file = None def hasUnexpectedOutput(self, testcase): """ Check to see that there is no unexpected output in the captured output file. """ if WiredTigerTestCase._ignoreStdout: return if self.file != None: self.file.flush() return self.expectpos < os.path.getsize(self.filename) def check(self, testcase): """ Check to see that there is no unexpected output in the captured output file. If there is, raise it as a test failure. This is generally called after 'release' is called. """ if self.hasUnexpectedOutput(testcase): contents = self.readFileFrom(self.filename, self.expectpos, 10000) WiredTigerTestCase.prout('ERROR: ' + self.filename + ' unexpected ' + self.desc + ', contains:\n"' + contents + '"') testcase.fail('unexpected ' + self.desc + ', contains: "' + contents + '"') self.expectpos = os.path.getsize(self.filename) def ignorePreviousOutput(self): """ Ignore any output up to this point. """ if self.file != None: self.file.flush() self.expectpos = os.path.getsize(self.filename) def checkAdditional(self, testcase, expect): """ Check to see that an additional string has been added to the output file. If it has not, raise it as a test failure. In any case, reset the expected pos to account for the new output. """ if self.file != None: self.file.flush() gotstr = self.readFileFrom(self.filename, self.expectpos, 1000) testcase.assertEqual(gotstr, expect, 'in ' + self.desc + ', expected "' + expect + '", but got "' + gotstr + '"') self.expectpos = os.path.getsize(self.filename) def checkAdditionalPattern(self, testcase, pat, re_flags = 0): """ Check to see that an additional string has been added to the output file. If it has not, raise it as a test failure. In any case, reset the expected pos to account for the new output. """ if self.file != None: self.file.flush() gotstr = self.readFileFrom(self.filename, self.expectpos, 1500) if re.search(pat, gotstr, re_flags) == None: testcase.fail('in ' + self.desc + ', expected pattern "' + pat + '", but got "' + gotstr + '"') self.expectpos = os.path.getsize(self.filename) class TestSuiteConnection(object): def __init__(self, conn, connlist): connlist.append(conn) self._conn = conn self._connlist = connlist def close(self, config=''): self._connlist.remove(self._conn) return self._conn.close(config) # Proxy everything except what we explicitly define to the # wrapped connection def __getattr__(self, attr): if attr in self.__dict__: return getattr(self, attr) else: return getattr(self._conn, attr) # Just like a list of strings, but with a convenience function class ExtensionList(list): skipIfMissing = False def extension(self, dirname, name, extarg=None): if name and name != 'none': ext = '' if extarg == None else '=' + extarg self.append(dirname + '/' + name + ext) class WiredTigerTestCase(unittest.TestCase): _globalSetup = False _printOnceSeen = {} _ttyDescriptor = None # set this early, to allow tty() to be called any time. # conn_config can be overridden to add to basic connection configuration. # Can be a string or a callable function or lambda expression. conn_config = '' # session_config can be overridden to add to basic session configuration. # Can be a string or a callable function or lambda expression. session_config = '' # conn_extensions can be overridden to add a list of extensions to load. # Each entry is a string (directory and extension name) and optional config. # Example: # conn_extensions = ('extractors/csv_extractor', # 'test/fail_fs={allow_writes=100}') conn_extensions = () @staticmethod def globalSetup(preserveFiles = False, removeAtStart = True, useTimestamp = False, gdbSub = False, lldbSub = False, verbose = 1, builddir = None, dirarg = None, longtest = False, ignoreStdout = False, seedw = 0, seedz = 0, hookmgr = None): WiredTigerTestCase._preserveFiles = preserveFiles d = 'WT_TEST' if dirarg == None else dirarg if useTimestamp: d += '.' + time.strftime('%Y%m%d-%H%M%S', time.localtime()) if removeAtStart: shutil.rmtree(d, ignore_errors=True) os.makedirs(d) wtscenario.set_long_run(longtest) WiredTigerTestCase._parentTestdir = d WiredTigerTestCase._builddir = builddir WiredTigerTestCase._origcwd = os.getcwd() WiredTigerTestCase._resultfile = open(os.path.join(d, 'results.txt'), "w", 1) # line buffered WiredTigerTestCase._gdbSubprocess = gdbSub WiredTigerTestCase._lldbSubprocess = lldbSub WiredTigerTestCase._longtest = longtest WiredTigerTestCase._verbose = verbose WiredTigerTestCase._ignoreStdout = ignoreStdout WiredTigerTestCase._dupout = os.dup(sys.stdout.fileno()) WiredTigerTestCase._stdout = sys.stdout WiredTigerTestCase._stderr = sys.stderr WiredTigerTestCase._concurrent = False WiredTigerTestCase._globalSetup = True WiredTigerTestCase._seeds = [521288629, 362436069] WiredTigerTestCase._randomseed = False if hookmgr == None: hookmgr = wthooks.WiredTigerHookManager() WiredTigerTestCase._hookmgr = hookmgr if seedw != 0 and seedz != 0: WiredTigerTestCase._randomseed = True WiredTigerTestCase._seeds = [seedw, seedz] def fdSetUp(self): self.captureout = CapturedFd('stdout.txt', 'standard output') self.captureerr = CapturedFd('stderr.txt', 'error output') sys.stdout = self.captureout.capture() sys.stderr = self.captureerr.capture() def fdTearDown(self): # restore stderr/stdout self.captureout.release() self.captureerr.release() sys.stdout = WiredTigerTestCase._stdout sys.stderr = WiredTigerTestCase._stderr def __init__(self, *args, **kwargs): if hasattr(self, 'scenarios'): assert(len(self.scenarios) == len(dict(self.scenarios))) unittest.TestCase.__init__(self, *args, **kwargs) self.skipped = False if not self._globalSetup: WiredTigerTestCase.globalSetup() def __str__(self): # when running with scenarios, if the number_scenarios() method # is used, then each scenario is given a number, which can # help distinguish tests. scen = '' if hasattr(self, 'scenario_number') and hasattr(self, 'scenario_name'): scen = ' -s ' + str(self.scenario_number) + \ ' (' + self.scenario_name + ')' return self.simpleName() + scen def shortDesc(self): ret_str = '' if hasattr(self, 'scenario_number'): ret_str = ' -s ' + str(self.scenario_number) return self.simpleName() + ret_str def simpleName(self): return "%s.%s.%s" % (self.__module__, self.className(), self._testMethodName) def buildDirectory(self): return self._builddir def skipTest(self, reason): self.skipped = True super(WiredTigerTestCase, self).skipTest(reason) # Return the wiredtiger_open extension argument for # any needed shared library. def extensionsConfig(self): exts = self.conn_extensions if hasattr(exts, '__call__'): exts = ExtensionList() self.conn_extensions(exts) result = '' extfiles = {} skipIfMissing = False earlyLoading = '' if hasattr(exts, 'skip_if_missing'): skipIfMissing = exts.skip_if_missing if hasattr(exts, 'early_load_ext') and exts.early_load_ext == True: earlyLoading = '=(early_load=true)' for ext in exts: extconf = '' if '=' in ext: splits = ext.split('=', 1) ext = splits[0] extconf = '=' + splits[1] splits = ext.split('/') if len(splits) != 2: raise Exception(self.shortid() + ": " + ext + ": extension is not named /") libname = splits[1] dirname = splits[0] pat = os.path.join(WiredTigerTestCase._builddir, 'ext', dirname, libname, '.libs', 'libwiredtiger_*.so') filenames = glob.glob(pat) if len(filenames) == 0: if skipIfMissing: self.skipTest('extension "' + ext + '" not built') continue else: raise Exception(self.shortid() + ": " + ext + ": no extensions library found matching: " + pat) elif len(filenames) > 1: raise Exception(self.shortid() + ": " + ext + ": multiple extensions libraries found matching: " + pat) complete = '"' + filenames[0] + '"' + extconf if ext in extfiles: if extfiles[ext] != complete: raise Exception(self.shortid() + ": non-matching extension arguments in " + str(exts)) else: extfiles[ext] = complete if len(extfiles) != 0: result = ',extensions=[' + ','.join(list(extfiles.values())) + earlyLoading + ']' return result # Can be overridden, but first consider setting self.conn_config # or self.conn_extensions def setUpConnectionOpen(self, home): self.home = home config = self.conn_config if hasattr(config, '__call__'): config = self.conn_config() config += self.extensionsConfig() # In case the open starts additional threads, flush first to # avoid confusion. sys.stdout.flush() conn_param = 'create,error_prefix="%s",%s' % (self.shortid(), config) try: conn = self.wiredtiger_open(home, conn_param) except wiredtiger.WiredTigerError as e: print("Failed wiredtiger_open: dir '%s', config '%s'" % \ (home, conn_param)) raise e return conn # Replacement for wiredtiger.wiredtiger_open that returns # a proxied connection that knows to close it itself at the # end of the run, unless it was already closed. def wiredtiger_open(self, home=None, config=''): conn = wiredtiger.wiredtiger_open(home, config) return TestSuiteConnection(conn, self._connections) # Can be overridden, but first consider setting self.session_config def setUpSessionOpen(self, conn): config = self.session_config if hasattr(config, '__call__'): config = self.session_config() return conn.open_session(config) # Can be overridden def close_conn(self, config=''): """ Close the connection if already open. """ if self.conn != None: self.conn.close(config) self.conn = None def open_conn(self, directory=".", config=None): """ Open the connection if already closed. """ if self.conn == None: if config != None: self._old_config = self.conn_config self.conn_config = config self.conn = self.setUpConnectionOpen(directory) if config != None: self.conn_config = self._old_config self.session = self.setUpSessionOpen(self.conn) def reopen_conn(self, directory=".", config=None): """ Reopen the connection. """ self.close_conn() self.open_conn(directory, config) def setUp(self): if not hasattr(self.__class__, 'wt_ntests'): self.__class__.wt_ntests = 0 if WiredTigerTestCase._concurrent: self.testsubdir = self.shortid() + '.' + str(self.__class__.wt_ntests) else: self.testsubdir = self.className() + '.' + str(self.__class__.wt_ntests) self.testdir = os.path.join(WiredTigerTestCase._parentTestdir, self.testsubdir) self.__class__.wt_ntests += 1 self.starttime = time.time() if WiredTigerTestCase._verbose > 2: self.prhead('started in ' + self.testdir, True) # tearDown needs connections list, set it here in case the open fails. self._connections = [] self.origcwd = os.getcwd() shutil.rmtree(self.testdir, ignore_errors=True) if os.path.exists(self.testdir): raise Exception(self.testdir + ": cannot remove directory") os.makedirs(self.testdir) os.chdir(self.testdir) with open('testname.txt', 'w+') as namefile: namefile.write(str(self) + '\n') self.fdSetUp() # tearDown needs a conn field, set it here in case the open fails. self.conn = None try: self.conn = self.setUpConnectionOpen(".") self.session = self.setUpSessionOpen(self.conn) except: self.tearDown() raise # Used as part of tearDown determining if there is an error. def list2reason(self, result, fieldname): exc_list = getattr(result, fieldname, None) if exc_list and exc_list[-1][0] is self: return exc_list[-1][1] def cleanStderr(self): self.captureerr.ignorePreviousOutput() def cleanStdout(self): self.captureout.ignorePreviousOutput() def checkStderr(self): self.captureerr.check(self) def checkStdout(self): self.captureout.check(self) def tearDown(self): # This approach works for all our support Python versions and # is suggested by one of the answers in: # https://stackoverflow.com/questions/4414234/getting-pythons-unittest-results-in-a-teardown-method # In addition, check to make sure exc_info is "clean", because # the ConcurrencyTestSuite in Python2 indicates failures using that. if hasattr(self, '_outcome'): # Python 3.4+ result = self.defaultTestResult() # these 2 methods have no side effects self._feedErrorsToResult(result, self._outcome.errors) else: # Python 3.2 - 3.3 or 3.0 - 3.1 and 2.7 result = getattr(self, '_outcomeForDoCleanups', self._resultForDoCleanups) error = self.list2reason(result, 'errors') failure = self.list2reason(result, 'failures') exc_failure = (sys.exc_info() != (None, None, None)) passed = not error and not failure and not exc_failure self.pr('finishing') # Close all connections that weren't explicitly closed. # Connections left open (as a result of a test failure) # can result in cascading errors. We also make sure # self.conn is on the list of active connections. if not self.conn in self._connections: self._connections.append(self.conn) for conn in self._connections: try: conn.close() except: pass self._connections = [] try: self.fdTearDown() self.captureout.check(self) self.captureerr.check(self) finally: # always get back to original directory os.chdir(self.origcwd) # Make sure no read-only files or directories were left behind os.chmod(self.testdir, 0o777) for root, dirs, files in os.walk(self.testdir): for d in dirs: os.chmod(os.path.join(root, d), 0o777) for f in files: os.chmod(os.path.join(root, f), 0o666) self.pr('passed=' + str(passed)) self.pr('skipped=' + str(self.skipped)) # Clean up unless there's a failure if (passed and (not WiredTigerTestCase._preserveFiles)) or self.skipped: shutil.rmtree(self.testdir, ignore_errors=True) else: self.pr('preserving directory ' + self.testdir) elapsed = time.time() - self.starttime if elapsed > 0.001 and WiredTigerTestCase._verbose >= 2: print("%s: %.2f seconds" % (str(self), elapsed)) if (not passed) and (not self.skipped): print("ERROR in " + str(self)) self.pr('FAIL') self.pr('preserving directory ' + self.testdir) if WiredTigerTestCase._verbose > 2: self.prhead('TEST COMPLETED') def backup(self, backup_dir, session=None): if session is None: session = self.session shutil.rmtree(backup_dir, ignore_errors=True) os.mkdir(backup_dir) bkp_cursor = session.open_cursor('backup:', None, None) while True: ret = bkp_cursor.next() if ret != 0: break shutil.copy(bkp_cursor.get_key(), backup_dir) self.assertEqual(ret, wiredtiger.WT_NOTFOUND) bkp_cursor.close() @contextmanager def expectedStdout(self, expect): self.captureout.check(self) yield self.captureout.checkAdditional(self, expect) @contextmanager def expectedStderr(self, expect): self.captureerr.check(self) yield self.captureerr.checkAdditional(self, expect) @contextmanager def expectedStdoutPattern(self, pat, re_flags=0): self.captureout.check(self) yield self.captureout.checkAdditionalPattern(self, pat, re_flags) @contextmanager def expectedStderrPattern(self, pat, re_flags=0): self.captureerr.check(self) yield self.captureerr.checkAdditionalPattern(self, pat, re_flags) def ignoreStdoutPatternIfExists(self, pat, re_flags=0): if self.captureout.hasUnexpectedOutput(self): self.captureout.checkAdditionalPattern(self, pat, re_flags) def ignoreStderrPatternIfExists(self, pat, re_flags=0): if self.captureerr.hasUnexpectedOutput(self): self.captureerr.checkAdditionalPattern(self, pat, re_flags) def assertRaisesWithMessage(self, exceptionType, expr, message): """ Like TestCase.assertRaises(), but also checks to see that a message is printed on stderr. If message starts and ends with a slash, it is considered a pattern that must appear in stderr (it need not encompass the entire error output). Otherwise, the message must match verbatim, including any trailing newlines. """ if len(message) > 2 and message[0] == '/' and message[-1] == '/': with self.expectedStderrPattern(message[1:-1]): self.assertRaises(exceptionType, expr) else: with self.expectedStderr(message): self.assertRaises(exceptionType, expr) def assertRaisesException(self, exceptionType, expr, exceptionString=None, optional=False): """ Like TestCase.assertRaises(), with some additional options. If the exceptionString argument is used, the exception's string must match it, or its pattern if the string starts and ends with a slash. If optional is set, then no assertion occurs if the exception doesn't occur. Returns true if the assertion is raised. """ raised = False try: expr() except BaseException as err: self.pr('Exception raised shown as string: "' + \ str(err) + '"') if not isinstance(err, exceptionType): self.fail('Exception of incorrect type raised, got type: ' + \ str(type(err))) if exceptionString != None: # Match either a pattern or an exact string. fail = False self.pr('Expecting string msg: ' + exceptionString) if len(exceptionString) > 2 and \ exceptionString[0] == '/' and exceptionString[-1] == '/' : if re.search(exceptionString[1:-1], str(err)) == None: fail = True elif exceptionString != str(err): fail = True if fail: self.fail('Exception with incorrect string raised, got: "' + \ str(err) + '" Expected: ' + exceptionString) raised = True if not raised and not optional: self.fail('no assertion raised') return raised def raisesBusy(self, expr): """ Execute the expression, returning true if a 'Resource busy' exception is raised, returning false if no exception is raised. The actual exception message can be different across platforms and therefore we rely on os libraries to give use the correct exception message. Any other exception raises a test suite failure. """ return self.assertRaisesException(wiredtiger.WiredTigerError, \ expr, os.strerror(errno.EBUSY), optional=True) def assertTimestampsEqual(self, ts1, ts2): """ TestCase.assertEqual() for timestamps """ self.assertEqual(int(ts1, 16), int(ts2, 16)) def exceptionToStderr(self, expr): """ Used by assertRaisesHavingMessage to convert an expression that throws an error to an expression that throws the same error but also has the exception string on stderr. """ try: expr() except BaseException as err: sys.stderr.write('Exception: ' + str(err)) raise def assertRaisesHavingMessage(self, exceptionType, expr, message): """ Like TestCase.assertRaises(), but also checks to see that the assert exception, when string-ified, includes a message. If message starts and ends with a slash, it is considered a pattern that must appear (it need not encompass the entire message). Otherwise, the message must match verbatim. """ self.assertRaisesWithMessage( exceptionType, lambda: self.exceptionToStderr(expr), message) @staticmethod def printOnce(msg): # There's a race condition with multiple threads, # but we won't worry about it. We err on the side # of printing the message too many times. if not msg in WiredTigerTestCase._printOnceSeen: WiredTigerTestCase._printOnceSeen[msg] = msg WiredTigerTestCase.prout(msg) def KNOWN_FAILURE(self, name): myname = self.simpleName() msg = '**** ' + myname + ' HAS A KNOWN FAILURE: ' + name + ' ****' self.printOnce(msg) self.skipTest('KNOWN FAILURE: ' + name) def KNOWN_LIMITATION(self, name): myname = self.simpleName() msg = '**** ' + myname + ' HAS A KNOWN LIMITATION: ' + name + ' ****' self.printOnce(msg) def databaseCorrupted(self, directory = None): """ Mark this test as having a corrupted database by creating a DATABASE_CORRUPTED file in the home directory. """ if directory == None: directory = self.home open(os.path.join(directory, "DATABASE_CORRUPTED"), "a").close() @staticmethod def printVerbose(level, message): if level <= WiredTigerTestCase._verbose: WiredTigerTestCase.prout(message) def verbose(self, level, message): WiredTigerTestCase.printVerbose(level, message) def prout(self, s): WiredTigerTestCase.prout(s) @staticmethod def prout(s): os.write(WiredTigerTestCase._dupout, str.encode(s + '\n')) def pr(self, s): """ print a progress line for testing """ msg = ' ' + self.shortid() + ': ' + s WiredTigerTestCase._resultfile.write(msg + '\n') def prhead(self, s, *beginning): """ print a header line for testing, something important """ msg = '' if len(beginning) > 0: msg += '\n' msg += ' ' + self.shortid() + ': ' + s self.prout(msg) WiredTigerTestCase._resultfile.write(msg + '\n') def prexception(self, excinfo): WiredTigerTestCase._resultfile.write('\n') traceback.print_exception(excinfo[0], excinfo[1], excinfo[2], None, WiredTigerTestCase._resultfile) WiredTigerTestCase._resultfile.write('\n') def recno(self, i): """ return a recno key """ return i # print directly to tty, useful for debugging def tty(self, message): WiredTigerTestCase.tty(message) @staticmethod def tty(message): if WiredTigerTestCase._ttyDescriptor == None: WiredTigerTestCase._ttyDescriptor = open('/dev/tty', 'w') WiredTigerTestCase._ttyDescriptor.write(message + '\n') def ttyVerbose(self, level, message): WiredTigerTestCase.ttyVerbose(level, message) @staticmethod def ttyVerbose(level, message): if level <= WiredTigerTestCase._verbose: WiredTigerTestCase.tty(message) def shortid(self): return self.id().replace("__main__.","") def className(self): return self.__class__.__name__ def longtest(description): """ Used as a function decorator, for example, @wttest.longtest("description"). The decorator indicates that this test function should only be included when running the test suite with the --long option. """ def runit_decorator(func): return func if not WiredTigerTestCase._longtest: return unittest.skip(description + ' (enable with --long)') else: return runit_decorator def islongtest(): return WiredTigerTestCase._longtest def getseed(): return WiredTigerTestCase._seeds def runsuite(suite, parallel): suite_to_run = suite if parallel > 1: from concurrencytest import ConcurrentTestSuite, fork_for_tests if not WiredTigerTestCase._globalSetup: WiredTigerTestCase.globalSetup() WiredTigerTestCase._concurrent = True suite_to_run = ConcurrentTestSuite(suite, fork_for_tests(parallel)) try: if WiredTigerTestCase._randomseed: WiredTigerTestCase.prout("Starting test suite with seedw={0} and seedz={1}. Rerun this test with -seed {0}.{1} to get the same randomness" .format(str(WiredTigerTestCase._seeds[0]), str(WiredTigerTestCase._seeds[1]))) return unittest.TextTestRunner( verbosity=WiredTigerTestCase._verbose).run(suite_to_run) except BaseException as e: # This should not happen for regular test errors, unittest should catch everything print('ERROR: running test: ', e) raise e def run(name='__main__'): result = runsuite(unittest.TestLoader().loadTestsFromName(name), False) sys.exit(0 if result.wasSuccessful() else 1)