# Copyright (c) 2010-2011 testtools developers. See LICENSE for details. """Tests for the DeferredRunTest single test execution logic.""" import os import signal from extras import try_import from testtools import ( skipIf, TestCase, TestResult, ) from testtools.content import ( text_content, ) from testtools.matchers import ( Equals, KeysEqual, MatchesException, Raises, ) from testtools.runtest import RunTest from testtools.testresult.doubles import ExtendedTestResult from testtools.tests.test_spinner import NeedsTwistedTestCase assert_fails_with = try_import('testtools.deferredruntest.assert_fails_with') AsynchronousDeferredRunTest = try_import( 'testtools.deferredruntest.AsynchronousDeferredRunTest') flush_logged_errors = try_import( 'testtools.deferredruntest.flush_logged_errors') SynchronousDeferredRunTest = try_import( 'testtools.deferredruntest.SynchronousDeferredRunTest') defer = try_import('twisted.internet.defer') failure = try_import('twisted.python.failure') log = try_import('twisted.python.log') DelayedCall = try_import('twisted.internet.base.DelayedCall') class X(object): """Tests that we run as part of our tests, nested to avoid discovery.""" class Base(TestCase): def setUp(self): super(X.Base, self).setUp() self.calls = ['setUp'] self.addCleanup(self.calls.append, 'clean-up') def test_something(self): self.calls.append('test') def tearDown(self): self.calls.append('tearDown') super(X.Base, self).tearDown() class ErrorInSetup(Base): expected_calls = ['setUp', 'clean-up'] expected_results = [('addError', RuntimeError)] def setUp(self): super(X.ErrorInSetup, self).setUp() raise RuntimeError("Error in setUp") class ErrorInTest(Base): expected_calls = ['setUp', 'tearDown', 'clean-up'] expected_results = [('addError', RuntimeError)] def test_something(self): raise RuntimeError("Error in test") class FailureInTest(Base): expected_calls = ['setUp', 'tearDown', 'clean-up'] expected_results = [('addFailure', AssertionError)] def test_something(self): self.fail("test failed") class ErrorInTearDown(Base): expected_calls = ['setUp', 'test', 'clean-up'] expected_results = [('addError', RuntimeError)] def tearDown(self): raise RuntimeError("Error in tearDown") class ErrorInCleanup(Base): expected_calls = ['setUp', 'test', 'tearDown', 'clean-up'] expected_results = [('addError', ZeroDivisionError)] def test_something(self): self.calls.append('test') self.addCleanup(lambda: 1/0) class TestIntegration(NeedsTwistedTestCase): def assertResultsMatch(self, test, result): events = list(result._events) self.assertEqual(('startTest', test), events.pop(0)) for expected_result in test.expected_results: result = events.pop(0) if len(expected_result) == 1: self.assertEqual((expected_result[0], test), result) else: self.assertEqual((expected_result[0], test), result[:2]) error_type = expected_result[1] self.assertIn(error_type.__name__, str(result[2])) self.assertEqual([('stopTest', test)], events) def test_runner(self): result = ExtendedTestResult() test = self.test_factory('test_something', runTest=self.runner) test.run(result) self.assertEqual(test.calls, self.test_factory.expected_calls) self.assertResultsMatch(test, result) def make_integration_tests(): from unittest import TestSuite from testtools import clone_test_with_new_id runners = [ ('RunTest', RunTest), ('SynchronousDeferredRunTest', SynchronousDeferredRunTest), ('AsynchronousDeferredRunTest', AsynchronousDeferredRunTest), ] tests = [ X.ErrorInSetup, X.ErrorInTest, X.ErrorInTearDown, X.FailureInTest, X.ErrorInCleanup, ] base_test = X.TestIntegration('test_runner') integration_tests = [] for runner_name, runner in runners: for test in tests: new_test = clone_test_with_new_id( base_test, '%s(%s, %s)' % ( base_test.id(), runner_name, test.__name__)) new_test.test_factory = test new_test.runner = runner integration_tests.append(new_test) return TestSuite(integration_tests) class TestSynchronousDeferredRunTest(NeedsTwistedTestCase): def make_result(self): return ExtendedTestResult() def make_runner(self, test): return SynchronousDeferredRunTest(test, test.exception_handlers) def test_success(self): class SomeCase(TestCase): def test_success(self): return defer.succeed(None) test = SomeCase('test_success') runner = self.make_runner(test) result = self.make_result() runner.run(result) self.assertThat( result._events, Equals([ ('startTest', test), ('addSuccess', test), ('stopTest', test)])) def test_failure(self): class SomeCase(TestCase): def test_failure(self): return defer.maybeDeferred(self.fail, "Egads!") test = SomeCase('test_failure') runner = self.make_runner(test) result = self.make_result() runner.run(result) self.assertThat( [event[:2] for event in result._events], Equals([ ('startTest', test), ('addFailure', test), ('stopTest', test)])) def test_setUp_followed_by_test(self): class SomeCase(TestCase): def setUp(self): super(SomeCase, self).setUp() return defer.succeed(None) def test_failure(self): return defer.maybeDeferred(self.fail, "Egads!") test = SomeCase('test_failure') runner = self.make_runner(test) result = self.make_result() runner.run(result) self.assertThat( [event[:2] for event in result._events], Equals([ ('startTest', test), ('addFailure', test), ('stopTest', test)])) class TestAsynchronousDeferredRunTest(NeedsTwistedTestCase): def make_reactor(self): from twisted.internet import reactor return reactor def make_result(self): return ExtendedTestResult() def make_runner(self, test, timeout=None): if timeout is None: timeout = self.make_timeout() return AsynchronousDeferredRunTest( test, test.exception_handlers, timeout=timeout) def make_timeout(self): return 0.005 def test_setUp_returns_deferred_that_fires_later(self): # setUp can return a Deferred that might fire at any time. # AsynchronousDeferredRunTest will not go on to running the test until # the Deferred returned by setUp actually fires. call_log = [] marker = object() d = defer.Deferred().addCallback(call_log.append) class SomeCase(TestCase): def setUp(self): super(SomeCase, self).setUp() call_log.append('setUp') return d def test_something(self): call_log.append('test') def fire_deferred(): self.assertThat(call_log, Equals(['setUp'])) d.callback(marker) test = SomeCase('test_something') timeout = self.make_timeout() runner = self.make_runner(test, timeout=timeout) result = self.make_result() reactor = self.make_reactor() reactor.callLater(timeout, fire_deferred) runner.run(result) self.assertThat(call_log, Equals(['setUp', marker, 'test'])) def test_calls_setUp_test_tearDown_in_sequence(self): # setUp, the test method and tearDown can all return # Deferreds. AsynchronousDeferredRunTest will make sure that each of # these are run in turn, only going on to the next stage once the # Deferred from the previous stage has fired. call_log = [] a = defer.Deferred() a.addCallback(lambda x: call_log.append('a')) b = defer.Deferred() b.addCallback(lambda x: call_log.append('b')) c = defer.Deferred() c.addCallback(lambda x: call_log.append('c')) class SomeCase(TestCase): def setUp(self): super(SomeCase, self).setUp() call_log.append('setUp') return a def test_success(self): call_log.append('test') return b def tearDown(self): super(SomeCase, self).tearDown() call_log.append('tearDown') return c test = SomeCase('test_success') timeout = self.make_timeout() runner = self.make_runner(test, timeout) result = self.make_result() reactor = self.make_reactor() def fire_a(): self.assertThat(call_log, Equals(['setUp'])) a.callback(None) def fire_b(): self.assertThat(call_log, Equals(['setUp', 'a', 'test'])) b.callback(None) def fire_c(): self.assertThat( call_log, Equals(['setUp', 'a', 'test', 'b', 'tearDown'])) c.callback(None) reactor.callLater(timeout * 0.25, fire_a) reactor.callLater(timeout * 0.5, fire_b) reactor.callLater(timeout * 0.75, fire_c) runner.run(result) self.assertThat( call_log, Equals(['setUp', 'a', 'test', 'b', 'tearDown', 'c'])) def test_async_cleanups(self): # Cleanups added with addCleanup can return # Deferreds. AsynchronousDeferredRunTest will run each of them in # turn. class SomeCase(TestCase): def test_whatever(self): pass test = SomeCase('test_whatever') call_log = [] a = defer.Deferred().addCallback(lambda x: call_log.append('a')) b = defer.Deferred().addCallback(lambda x: call_log.append('b')) c = defer.Deferred().addCallback(lambda x: call_log.append('c')) test.addCleanup(lambda: a) test.addCleanup(lambda: b) test.addCleanup(lambda: c) def fire_a(): self.assertThat(call_log, Equals([])) a.callback(None) def fire_b(): self.assertThat(call_log, Equals(['a'])) b.callback(None) def fire_c(): self.assertThat(call_log, Equals(['a', 'b'])) c.callback(None) timeout = self.make_timeout() reactor = self.make_reactor() reactor.callLater(timeout * 0.25, fire_a) reactor.callLater(timeout * 0.5, fire_b) reactor.callLater(timeout * 0.75, fire_c) runner = self.make_runner(test, timeout) result = self.make_result() runner.run(result) self.assertThat(call_log, Equals(['a', 'b', 'c'])) def test_clean_reactor(self): # If there's cruft left over in the reactor, the test fails. reactor = self.make_reactor() timeout = self.make_timeout() class SomeCase(TestCase): def test_cruft(self): reactor.callLater(timeout * 10.0, lambda: None) test = SomeCase('test_cruft') runner = self.make_runner(test, timeout) result = self.make_result() runner.run(result) self.assertThat( [event[:2] for event in result._events], Equals( [('startTest', test), ('addError', test), ('stopTest', test)])) error = result._events[1][2] self.assertThat(error, KeysEqual('traceback', 'twisted-log')) def test_exports_reactor(self): # The reactor is set as an attribute on the test case. reactor = self.make_reactor() timeout = self.make_timeout() class SomeCase(TestCase): def test_cruft(self): self.assertIs(reactor, self.reactor) test = SomeCase('test_cruft') runner = self.make_runner(test, timeout) result = TestResult() runner.run(result) self.assertEqual([], result.errors) self.assertEqual([], result.failures) def test_unhandled_error_from_deferred(self): # If there's a Deferred with an unhandled error, the test fails. Each # unhandled error is reported with a separate traceback. class SomeCase(TestCase): def test_cruft(self): # Note we aren't returning the Deferred so that the error will # be unhandled. defer.maybeDeferred(lambda: 1/0) defer.maybeDeferred(lambda: 2/0) test = SomeCase('test_cruft') runner = self.make_runner(test) result = self.make_result() runner.run(result) error = result._events[1][2] result._events[1] = ('addError', test, None) self.assertThat(result._events, Equals( [('startTest', test), ('addError', test, None), ('stopTest', test)])) self.assertThat( error, KeysEqual( 'twisted-log', 'unhandled-error-in-deferred', 'unhandled-error-in-deferred-1', )) def test_unhandled_error_from_deferred_combined_with_error(self): # If there's a Deferred with an unhandled error, the test fails. Each # unhandled error is reported with a separate traceback, and the error # is still reported. class SomeCase(TestCase): def test_cruft(self): # Note we aren't returning the Deferred so that the error will # be unhandled. defer.maybeDeferred(lambda: 1/0) 2 / 0 test = SomeCase('test_cruft') runner = self.make_runner(test) result = self.make_result() runner.run(result) error = result._events[1][2] result._events[1] = ('addError', test, None) self.assertThat(result._events, Equals( [('startTest', test), ('addError', test, None), ('stopTest', test)])) self.assertThat( error, KeysEqual( 'traceback', 'twisted-log', 'unhandled-error-in-deferred', )) @skipIf(os.name != "posix", "Sending SIGINT with os.kill is posix only") def test_keyboard_interrupt_stops_test_run(self): # If we get a SIGINT during a test run, the test stops and no more # tests run. SIGINT = getattr(signal, 'SIGINT', None) if not SIGINT: raise self.skipTest("SIGINT unavailable") class SomeCase(TestCase): def test_pause(self): return defer.Deferred() test = SomeCase('test_pause') reactor = self.make_reactor() timeout = self.make_timeout() runner = self.make_runner(test, timeout * 5) result = self.make_result() reactor.callLater(timeout, os.kill, os.getpid(), SIGINT) self.assertThat(lambda:runner.run(result), Raises(MatchesException(KeyboardInterrupt))) @skipIf(os.name != "posix", "Sending SIGINT with os.kill is posix only") def test_fast_keyboard_interrupt_stops_test_run(self): # If we get a SIGINT during a test run, the test stops and no more # tests run. SIGINT = getattr(signal, 'SIGINT', None) if not SIGINT: raise self.skipTest("SIGINT unavailable") class SomeCase(TestCase): def test_pause(self): return defer.Deferred() test = SomeCase('test_pause') reactor = self.make_reactor() timeout = self.make_timeout() runner = self.make_runner(test, timeout * 5) result = self.make_result() reactor.callWhenRunning(os.kill, os.getpid(), SIGINT) self.assertThat(lambda:runner.run(result), Raises(MatchesException(KeyboardInterrupt))) def test_timeout_causes_test_error(self): # If a test times out, it reports itself as having failed with a # TimeoutError. class SomeCase(TestCase): def test_pause(self): return defer.Deferred() test = SomeCase('test_pause') runner = self.make_runner(test) result = self.make_result() runner.run(result) error = result._events[1][2] self.assertThat( [event[:2] for event in result._events], Equals( [('startTest', test), ('addError', test), ('stopTest', test)])) self.assertIn('TimeoutError', str(error['traceback'])) def test_convenient_construction(self): # As a convenience method, AsynchronousDeferredRunTest has a # classmethod that returns an AsynchronousDeferredRunTest # factory. This factory has the same API as the RunTest constructor. reactor = object() timeout = object() handler = object() factory = AsynchronousDeferredRunTest.make_factory(reactor, timeout) runner = factory(self, [handler]) self.assertIs(reactor, runner._reactor) self.assertIs(timeout, runner._timeout) self.assertIs(self, runner.case) self.assertEqual([handler], runner.handlers) def test_use_convenient_factory(self): # Make sure that the factory can actually be used. factory = AsynchronousDeferredRunTest.make_factory() class SomeCase(TestCase): run_tests_with = factory def test_something(self): pass case = SomeCase('test_something') case.run() def test_convenient_construction_default_reactor(self): # As a convenience method, AsynchronousDeferredRunTest has a # classmethod that returns an AsynchronousDeferredRunTest # factory. This factory has the same API as the RunTest constructor. reactor = object() handler = object() factory = AsynchronousDeferredRunTest.make_factory(reactor=reactor) runner = factory(self, [handler]) self.assertIs(reactor, runner._reactor) self.assertIs(self, runner.case) self.assertEqual([handler], runner.handlers) def test_convenient_construction_default_timeout(self): # As a convenience method, AsynchronousDeferredRunTest has a # classmethod that returns an AsynchronousDeferredRunTest # factory. This factory has the same API as the RunTest constructor. timeout = object() handler = object() factory = AsynchronousDeferredRunTest.make_factory(timeout=timeout) runner = factory(self, [handler]) self.assertIs(timeout, runner._timeout) self.assertIs(self, runner.case) self.assertEqual([handler], runner.handlers) def test_convenient_construction_default_debugging(self): # As a convenience method, AsynchronousDeferredRunTest has a # classmethod that returns an AsynchronousDeferredRunTest # factory. This factory has the same API as the RunTest constructor. handler = object() factory = AsynchronousDeferredRunTest.make_factory(debug=True) runner = factory(self, [handler]) self.assertIs(self, runner.case) self.assertEqual([handler], runner.handlers) self.assertEqual(True, runner._debug) def test_deferred_error(self): class SomeTest(TestCase): def test_something(self): return defer.maybeDeferred(lambda: 1/0) test = SomeTest('test_something') runner = self.make_runner(test) result = self.make_result() runner.run(result) self.assertThat( [event[:2] for event in result._events], Equals([ ('startTest', test), ('addError', test), ('stopTest', test)])) error = result._events[1][2] self.assertThat(error, KeysEqual('traceback', 'twisted-log')) def test_only_addError_once(self): # Even if the reactor is unclean and the test raises an error and the # cleanups raise errors, we only called addError once per test. reactor = self.make_reactor() class WhenItRains(TestCase): def it_pours(self): # Add a dirty cleanup. self.addCleanup(lambda: 3 / 0) # Dirty the reactor. from twisted.internet.protocol import ServerFactory reactor.listenTCP(0, ServerFactory(), interface='127.0.0.1') # Unhandled error. defer.maybeDeferred(lambda: 2 / 0) # Actual error. raise RuntimeError("Excess precipitation") test = WhenItRains('it_pours') runner = self.make_runner(test) result = self.make_result() runner.run(result) self.assertThat( [event[:2] for event in result._events], Equals([ ('startTest', test), ('addError', test), ('stopTest', test)])) error = result._events[1][2] self.assertThat( error, KeysEqual( 'traceback', 'traceback-1', 'traceback-2', 'twisted-log', 'unhandled-error-in-deferred', )) def test_log_err_is_error(self): # An error logged during the test run is recorded as an error in the # tests. class LogAnError(TestCase): def test_something(self): try: 1/0 except ZeroDivisionError: f = failure.Failure() log.err(f) test = LogAnError('test_something') runner = self.make_runner(test) result = self.make_result() runner.run(result) self.assertThat( [event[:2] for event in result._events], Equals([ ('startTest', test), ('addError', test), ('stopTest', test)])) error = result._events[1][2] self.assertThat(error, KeysEqual('logged-error', 'twisted-log')) def test_log_err_flushed_is_success(self): # An error logged during the test run is recorded as an error in the # tests. class LogAnError(TestCase): def test_something(self): try: 1/0 except ZeroDivisionError: f = failure.Failure() log.err(f) flush_logged_errors(ZeroDivisionError) test = LogAnError('test_something') runner = self.make_runner(test) result = self.make_result() runner.run(result) self.assertThat( result._events, Equals([ ('startTest', test), ('addSuccess', test, {'twisted-log': text_content('')}), ('stopTest', test)])) def test_log_in_details(self): class LogAnError(TestCase): def test_something(self): log.msg("foo") 1/0 test = LogAnError('test_something') runner = self.make_runner(test) result = self.make_result() runner.run(result) self.assertThat( [event[:2] for event in result._events], Equals([ ('startTest', test), ('addError', test), ('stopTest', test)])) error = result._events[1][2] self.assertThat(error, KeysEqual('traceback', 'twisted-log')) def test_debugging_unchanged_during_test_by_default(self): debugging = [(defer.Deferred.debug, DelayedCall.debug)] class SomeCase(TestCase): def test_debugging_enabled(self): debugging.append((defer.Deferred.debug, DelayedCall.debug)) test = SomeCase('test_debugging_enabled') runner = AsynchronousDeferredRunTest( test, handlers=test.exception_handlers, reactor=self.make_reactor(), timeout=self.make_timeout()) runner.run(self.make_result()) self.assertEqual(debugging[0], debugging[1]) def test_debugging_enabled_during_test_with_debug_flag(self): self.patch(defer.Deferred, 'debug', False) self.patch(DelayedCall, 'debug', False) debugging = [] class SomeCase(TestCase): def test_debugging_enabled(self): debugging.append((defer.Deferred.debug, DelayedCall.debug)) test = SomeCase('test_debugging_enabled') runner = AsynchronousDeferredRunTest( test, handlers=test.exception_handlers, reactor=self.make_reactor(), timeout=self.make_timeout(), debug=True) runner.run(self.make_result()) self.assertEqual([(True, True)], debugging) self.assertEqual(False, defer.Deferred.debug) self.assertEqual(False, defer.Deferred.debug) class TestAssertFailsWith(NeedsTwistedTestCase): """Tests for `assert_fails_with`.""" if SynchronousDeferredRunTest is not None: run_tests_with = SynchronousDeferredRunTest def test_assert_fails_with_success(self): # assert_fails_with fails the test if it's given a Deferred that # succeeds. marker = object() d = assert_fails_with(defer.succeed(marker), RuntimeError) def check_result(failure): failure.trap(self.failureException) self.assertThat( str(failure.value), Equals("RuntimeError not raised (%r returned)" % (marker,))) d.addCallbacks( lambda x: self.fail("Should not have succeeded"), check_result) return d def test_assert_fails_with_success_multiple_types(self): # assert_fails_with fails the test if it's given a Deferred that # succeeds. marker = object() d = assert_fails_with( defer.succeed(marker), RuntimeError, ZeroDivisionError) def check_result(failure): failure.trap(self.failureException) self.assertThat( str(failure.value), Equals("RuntimeError, ZeroDivisionError not raised " "(%r returned)" % (marker,))) d.addCallbacks( lambda x: self.fail("Should not have succeeded"), check_result) return d def test_assert_fails_with_wrong_exception(self): # assert_fails_with fails the test if it's given a Deferred that # succeeds. d = assert_fails_with( defer.maybeDeferred(lambda: 1/0), RuntimeError, KeyboardInterrupt) def check_result(failure): failure.trap(self.failureException) lines = str(failure.value).splitlines() self.assertThat( lines[:2], Equals([ ("ZeroDivisionError raised instead of RuntimeError, " "KeyboardInterrupt:"), " Traceback (most recent call last):", ])) d.addCallbacks( lambda x: self.fail("Should not have succeeded"), check_result) return d def test_assert_fails_with_expected_exception(self): # assert_fails_with calls back with the value of the failure if it's # one of the expected types of failures. try: 1/0 except ZeroDivisionError: f = failure.Failure() d = assert_fails_with(defer.fail(f), ZeroDivisionError) return d.addCallback(self.assertThat, Equals(f.value)) def test_custom_failure_exception(self): # If assert_fails_with is passed a 'failureException' keyword # argument, then it will raise that instead of `AssertionError`. class CustomException(Exception): pass marker = object() d = assert_fails_with( defer.succeed(marker), RuntimeError, failureException=CustomException) def check_result(failure): failure.trap(CustomException) self.assertThat( str(failure.value), Equals("RuntimeError not raised (%r returned)" % (marker,))) return d.addCallbacks( lambda x: self.fail("Should not have succeeded"), check_result) class TestRunWithLogObservers(NeedsTwistedTestCase): def test_restores_observers(self): from testtools.deferredruntest import run_with_log_observers from twisted.python import log # Make sure there's at least one observer. This reproduces bug # #926189. log.addObserver(lambda *args: None) observers = list(log.theLogPublisher.observers) run_with_log_observers([], lambda: None) self.assertEqual(observers, log.theLogPublisher.observers) def test_suite(): from unittest import TestLoader, TestSuite return TestSuite( [TestLoader().loadTestsFromName(__name__), make_integration_tests()])