# Copyright (c) 2010 testtools developers. See LICENSE for details. """Tests for the evil Twisted reactor-spinning we do.""" import os import signal from extras import try_import from testtools import ( skipIf, TestCase, ) from testtools.matchers import ( Equals, Is, MatchesException, Raises, ) _spinner = try_import('testtools._spinner') defer = try_import('twisted.internet.defer') Failure = try_import('twisted.python.failure.Failure') class NeedsTwistedTestCase(TestCase): def setUp(self): super(NeedsTwistedTestCase, self).setUp() if defer is None or Failure is None: self.skipTest("Need Twisted to run") class TestNotReentrant(NeedsTwistedTestCase): def test_not_reentrant(self): # A function decorated as not being re-entrant will raise a # _spinner.ReentryError if it is called while it is running. calls = [] @_spinner.not_reentrant def log_something(): calls.append(None) if len(calls) < 5: log_something() self.assertThat( log_something, Raises(MatchesException(_spinner.ReentryError))) self.assertEqual(1, len(calls)) def test_deeper_stack(self): calls = [] @_spinner.not_reentrant def g(): calls.append(None) if len(calls) < 5: f() @_spinner.not_reentrant def f(): calls.append(None) if len(calls) < 5: g() self.assertThat(f, Raises(MatchesException(_spinner.ReentryError))) self.assertEqual(2, len(calls)) class TestExtractResult(NeedsTwistedTestCase): def test_not_fired(self): # _spinner.extract_result raises _spinner.DeferredNotFired if it's # given a Deferred that has not fired. self.assertThat(lambda:_spinner.extract_result(defer.Deferred()), Raises(MatchesException(_spinner.DeferredNotFired))) def test_success(self): # _spinner.extract_result returns the value of the Deferred if it has # fired successfully. marker = object() d = defer.succeed(marker) self.assertThat(_spinner.extract_result(d), Equals(marker)) def test_failure(self): # _spinner.extract_result raises the failure's exception if it's given # a Deferred that is failing. try: 1/0 except ZeroDivisionError: f = Failure() d = defer.fail(f) self.assertThat(lambda:_spinner.extract_result(d), Raises(MatchesException(ZeroDivisionError))) class TestTrapUnhandledErrors(NeedsTwistedTestCase): def test_no_deferreds(self): marker = object() result, errors = _spinner.trap_unhandled_errors(lambda: marker) self.assertEqual([], errors) self.assertIs(marker, result) def test_unhandled_error(self): failures = [] def make_deferred_but_dont_handle(): try: 1/0 except ZeroDivisionError: f = Failure() failures.append(f) defer.fail(f) result, errors = _spinner.trap_unhandled_errors( make_deferred_but_dont_handle) self.assertIs(None, result) self.assertEqual(failures, [error.failResult for error in errors]) class TestRunInReactor(NeedsTwistedTestCase): def make_reactor(self): from twisted.internet import reactor return reactor def make_spinner(self, reactor=None): if reactor is None: reactor = self.make_reactor() return _spinner.Spinner(reactor) def make_timeout(self): return 0.01 def test_function_called(self): # run_in_reactor actually calls the function given to it. calls = [] marker = object() self.make_spinner().run(self.make_timeout(), calls.append, marker) self.assertThat(calls, Equals([marker])) def test_return_value_returned(self): # run_in_reactor returns the value returned by the function given to # it. marker = object() result = self.make_spinner().run(self.make_timeout(), lambda: marker) self.assertThat(result, Is(marker)) def test_exception_reraised(self): # If the given function raises an error, run_in_reactor re-raises that # error. self.assertThat( lambda:self.make_spinner().run(self.make_timeout(), lambda: 1/0), Raises(MatchesException(ZeroDivisionError))) def test_keyword_arguments(self): # run_in_reactor passes keyword arguments on. calls = [] function = lambda *a, **kw: calls.extend([a, kw]) self.make_spinner().run(self.make_timeout(), function, foo=42) self.assertThat(calls, Equals([(), {'foo': 42}])) def test_not_reentrant(self): # run_in_reactor raises an error if it is called inside another call # to run_in_reactor. spinner = self.make_spinner() self.assertThat(lambda: spinner.run( self.make_timeout(), spinner.run, self.make_timeout(), lambda: None), Raises(MatchesException(_spinner.ReentryError))) def test_deferred_value_returned(self): # If the given function returns a Deferred, run_in_reactor returns the # value in the Deferred at the end of the callback chain. marker = object() result = self.make_spinner().run( self.make_timeout(), lambda: defer.succeed(marker)) self.assertThat(result, Is(marker)) def test_preserve_signal_handler(self): signals = ['SIGINT', 'SIGTERM', 'SIGCHLD'] signals = filter( None, (getattr(signal, name, None) for name in signals)) for sig in signals: self.addCleanup(signal.signal, sig, signal.getsignal(sig)) new_hdlrs = list(lambda *a: None for _ in signals) for sig, hdlr in zip(signals, new_hdlrs): signal.signal(sig, hdlr) spinner = self.make_spinner() spinner.run(self.make_timeout(), lambda: None) self.assertEqual(new_hdlrs, map(signal.getsignal, signals)) def test_timeout(self): # If the function takes too long to run, we raise a # _spinner.TimeoutError. timeout = self.make_timeout() self.assertThat( lambda:self.make_spinner().run(timeout, lambda: defer.Deferred()), Raises(MatchesException(_spinner.TimeoutError))) def test_no_junk_by_default(self): # If the reactor hasn't spun yet, then there cannot be any junk. spinner = self.make_spinner() self.assertThat(spinner.get_junk(), Equals([])) def test_clean_do_nothing(self): # If there's nothing going on in the reactor, then clean does nothing # and returns an empty list. spinner = self.make_spinner() result = spinner._clean() self.assertThat(result, Equals([])) def test_clean_delayed_call(self): # If there's a delayed call in the reactor, then clean cancels it and # returns an empty list. reactor = self.make_reactor() spinner = self.make_spinner(reactor) call = reactor.callLater(10, lambda: None) results = spinner._clean() self.assertThat(results, Equals([call])) self.assertThat(call.active(), Equals(False)) def test_clean_delayed_call_cancelled(self): # If there's a delayed call that's just been cancelled, then it's no # longer there. reactor = self.make_reactor() spinner = self.make_spinner(reactor) call = reactor.callLater(10, lambda: None) call.cancel() results = spinner._clean() self.assertThat(results, Equals([])) def test_clean_selectables(self): # If there's still a selectable (e.g. a listening socket), then # clean() removes it from the reactor's registry. # # Note that the socket is left open. This emulates a bug in trial. from twisted.internet.protocol import ServerFactory reactor = self.make_reactor() spinner = self.make_spinner(reactor) port = reactor.listenTCP(0, ServerFactory(), interface='127.0.0.1') spinner.run(self.make_timeout(), lambda: None) results = spinner.get_junk() self.assertThat(results, Equals([port])) def test_clean_running_threads(self): import threading import time current_threads = list(threading.enumerate()) reactor = self.make_reactor() timeout = self.make_timeout() spinner = self.make_spinner(reactor) spinner.run(timeout, reactor.callInThread, time.sleep, timeout / 2.0) # Python before 2.5 has a race condition with thread handling where # join() does not remove threads from enumerate before returning - the # thread being joined does the removal. This was fixed in Python 2.5 # but we still support 2.4, so we have to workaround the issue. # http://bugs.python.org/issue1703448. self.assertThat( [thread for thread in threading.enumerate() if thread.isAlive()], Equals(current_threads)) def test_leftover_junk_available(self): # If 'run' is given a function that leaves the reactor dirty in some # way, 'run' will clean up the reactor and then store information # about the junk. This information can be got using get_junk. from twisted.internet.protocol import ServerFactory reactor = self.make_reactor() spinner = self.make_spinner(reactor) port = spinner.run( self.make_timeout(), reactor.listenTCP, 0, ServerFactory(), interface='127.0.0.1') self.assertThat(spinner.get_junk(), Equals([port])) def test_will_not_run_with_previous_junk(self): # If 'run' is called and there's still junk in the spinner's junk # list, then the spinner will refuse to run. from twisted.internet.protocol import ServerFactory reactor = self.make_reactor() spinner = self.make_spinner(reactor) timeout = self.make_timeout() spinner.run(timeout, reactor.listenTCP, 0, ServerFactory(), interface='127.0.0.1') self.assertThat(lambda: spinner.run(timeout, lambda: None), Raises(MatchesException(_spinner.StaleJunkError))) def test_clear_junk_clears_previous_junk(self): # If 'run' is called and there's still junk in the spinner's junk # list, then the spinner will refuse to run. from twisted.internet.protocol import ServerFactory reactor = self.make_reactor() spinner = self.make_spinner(reactor) timeout = self.make_timeout() port = spinner.run(timeout, reactor.listenTCP, 0, ServerFactory(), interface='127.0.0.1') junk = spinner.clear_junk() self.assertThat(junk, Equals([port])) self.assertThat(spinner.get_junk(), Equals([])) @skipIf(os.name != "posix", "Sending SIGINT with os.kill is posix only") def test_sigint_raises_no_result_error(self): # If we get a SIGINT during a run, we raise _spinner.NoResultError. SIGINT = getattr(signal, 'SIGINT', None) if not SIGINT: self.skipTest("SIGINT not available") reactor = self.make_reactor() spinner = self.make_spinner(reactor) timeout = self.make_timeout() reactor.callLater(timeout, os.kill, os.getpid(), SIGINT) self.assertThat(lambda:spinner.run(timeout * 5, defer.Deferred), Raises(MatchesException(_spinner.NoResultError))) self.assertEqual([], spinner._clean()) @skipIf(os.name != "posix", "Sending SIGINT with os.kill is posix only") def test_sigint_raises_no_result_error_second_time(self): # If we get a SIGINT during a run, we raise _spinner.NoResultError. # This test is exactly the same as test_sigint_raises_no_result_error, # and exists to make sure we haven't futzed with state. self.test_sigint_raises_no_result_error() @skipIf(os.name != "posix", "Sending SIGINT with os.kill is posix only") def test_fast_sigint_raises_no_result_error(self): # If we get a SIGINT during a run, we raise _spinner.NoResultError. SIGINT = getattr(signal, 'SIGINT', None) if not SIGINT: self.skipTest("SIGINT not available") reactor = self.make_reactor() spinner = self.make_spinner(reactor) timeout = self.make_timeout() reactor.callWhenRunning(os.kill, os.getpid(), SIGINT) self.assertThat(lambda:spinner.run(timeout * 5, defer.Deferred), Raises(MatchesException(_spinner.NoResultError))) self.assertEqual([], spinner._clean()) @skipIf(os.name != "posix", "Sending SIGINT with os.kill is posix only") def test_fast_sigint_raises_no_result_error_second_time(self): self.test_fast_sigint_raises_no_result_error() def test_suite(): from unittest import TestLoader return TestLoader().loadTestsFromName(__name__)