# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # from __future__ import absolute_import import time from proton.reactor import Container, ApplicationEvent, EventInjector, Selector, Backoff from proton.handlers import Handshaker, MessagingHandler from proton import Handler, Url, symbol from .common import Test, SkipTest, TestServer, free_tcp_port, free_tcp_ports, ensureCanTestExtendedSASL class Barf(Exception): pass class BarfOnInit: def on_reactor_init(self, event): raise Barf() def on_connection_init(self, event): raise Barf() def on_session_init(self, event): raise Barf() def on_link_init(self, event): raise Barf() class BarfOnTask: def on_timer_task(self, event): raise Barf() class BarfOnFinal: init = False def on_reactor_init(self, event): self.init = True def on_reactor_final(self, event): raise Barf() class BarfOnFinalDerived(Handshaker): init = False def on_reactor_init(self, event): self.init = True def on_reactor_final(self, event): raise Barf() class ExceptionTest(Test): def setUp(self): self.container = Container() def test_reactor_final(self): self.container.global_handler = BarfOnFinal() try: self.container.run() assert False, "expected to barf" except Barf: pass def test_global_set(self): self.container.global_handler = BarfOnInit() try: self.container.run() assert False, "expected to barf" except Barf: pass def test_global_add(self): self.container.global_handler.add(BarfOnInit()) try: self.container.run() assert False, "expected to barf" except Barf: pass def test_reactor_set(self): self.container.handler = BarfOnInit() try: self.container.run() assert False, "expected to barf" except Barf: pass def test_reactor_add(self): self.container.handler.add(BarfOnInit()) try: self.container.run() assert False, "expected to barf" except Barf: pass def test_connection(self): self.container.connection(BarfOnInit()) try: self.container.run() assert False, "expected to barf" except Barf: pass def test_connection_set(self): c = self.container.connection() c.handler = BarfOnInit() try: self.container.run() assert False, "expected to barf" except Barf: pass def test_connection_add(self): c = self.container.connection() c.handler = object() c.handler.add(BarfOnInit()) try: self.container.run() assert False, "expected to barf" except Barf: pass def test_session_set(self): c = self.container.connection() s = c.session() s.handler = BarfOnInit() try: self.container.run() assert False, "expected to barf" except Barf: pass def test_session_add(self): c = self.container.connection() s = c.session() s.handler = object() s.handler.add(BarfOnInit()) try: self.container.run() assert False, "expected to barf" except Barf: pass def test_link_set(self): c = self.container.connection() s = c.session() l = s.sender("xxx") l.handler = BarfOnInit() try: self.container.run() assert False, "expected to barf" except Barf: pass def test_link_add(self): c = self.container.connection() s = c.session() l = s.sender("xxx") l.handler = object() l.handler.add(BarfOnInit()) try: self.container.run() assert False, "expected to barf" except Barf: pass def test_schedule(self): self.container.schedule(0, BarfOnTask()) try: self.container.run() assert False, "expected to barf" except Barf: pass def test_schedule_event(self): class Nothing: def __init__(self, p): self.parent = p results = [] def on_timer_task(self, event): self.parent.triggered = True assert event.context == self.parent.task assert event.container == self.parent.container self.task = self.container.schedule(0, Nothing(self)) self.container.run() assert self.triggered def test_schedule_many_nothings(self): class Nothing: results = [] def on_timer_task(self, event): self.results.append(None) num = 12345 for a in range(num): self.container.schedule(0, Nothing()) self.container.run() assert len(Nothing.results) == num def test_schedule_many_nothing_refs(self): class Nothing: results = [] def on_timer_task(self, event): self.results.append(None) num = 12345 tasks = [] for a in range(num): tasks.append(self.container.schedule(0, Nothing())) self.container.run() assert len(Nothing.results) == num def test_schedule_many_nothing_refs_cancel_before_run(self): class Nothing: results = [] def on_timer_task(self, event): self.results.append(None) num = 12345 tasks = [] for a in range(num): tasks.append(self.container.schedule(0, Nothing())) for task in tasks: task.cancel() self.container.run() assert len(Nothing.results) == 0 def test_schedule_cancel(self): barf = self.container.schedule(10, BarfOnTask()) class CancelBarf: def __init__(self, barf): self.barf = barf def on_timer_task(self, event): self.barf.cancel() pass self.container.schedule(0, CancelBarf(barf)) start = time.time() try: self.container.run() elapsed = time.time() - start assert elapsed < 10, "should have cancelled immediately, took %ss" % elapsed except Barf: assert False, "expected barf to be cancelled" def test_schedule_cancel_many(self): num = 12345 barfs = set() for a in range(num): barf = self.container.schedule(10 * (a + 1), BarfOnTask()) class CancelBarf: def __init__(self, barf): self.barf = barf def on_timer_task(self, event): self.barf.cancel() barfs.discard(self.barf) pass self.container.schedule(0, CancelBarf(barf)) barfs.add(barf) start = time.time() try: self.container.run() elapsed = time.time() - start assert elapsed < num, "expected cancelled task to not delay the reactor by %s" % elapsed assert not barfs, "expected all barfs to be discarded" except Barf: assert False, "expected barf to be cancelled" class ApplicationEventTest(Test): """Test application defined events and handlers.""" class MyTestServer(TestServer): def __init__(self): super(ApplicationEventTest.MyTestServer, self).__init__() class MyHandler(Handler): def __init__(self, test): super(ApplicationEventTest.MyHandler, self).__init__() self._test = test def on_hello(self, event): # verify PROTON-1056 self._test.hello_rcvd = str(event) def on_goodbye(self, event): self._test.goodbye_rcvd = str(event) def setUp(self): import os if not hasattr(os, 'pipe'): # KAG: seems like Jython doesn't have an os.pipe() method raise SkipTest() if os.name == "nt": # Correct implementation on Windows is complicated raise SkipTest("PROTON-1071") self.server = ApplicationEventTest.MyTestServer() self.server.reactor.handler.add(ApplicationEventTest.MyHandler(self)) self.event_injector = EventInjector() self.hello_event = ApplicationEvent("hello") self.goodbye_event = ApplicationEvent("goodbye") self.server.reactor.selectable(self.event_injector) self.hello_rcvd = None self.goodbye_rcvd = None self.server.start() def tearDown(self): self.server.stop() def _wait_for(self, predicate, timeout=10.0): deadline = time.time() + timeout while time.time() < deadline: if predicate(): break time.sleep(0.1) assert predicate() def test_application_events(self): self.event_injector.trigger(self.hello_event) self._wait_for(lambda: self.hello_rcvd is not None) self.event_injector.trigger(self.goodbye_event) self._wait_for(lambda: self.goodbye_rcvd is not None) class AuthenticationTestHandler(MessagingHandler): def __init__(self): super(AuthenticationTestHandler, self).__init__() port = free_tcp_port() self.url = "localhost:%i" % port self.verified = False def on_start(self, event): self.listener = event.container.listen(self.url) def on_connection_opened(self, event): event.connection.close() def on_connection_opening(self, event): assert event.connection.transport.user == "user@proton" self.verified = True def on_connection_closed(self, event): event.connection.close() self.listener.close() def on_connection_error(self, event): event.connection.close() self.listener.close() class ContainerTest(Test): """Test container subclass of reactor.""" def test_event_has_container_attribute(self): ensureCanTestExtendedSASL() class TestHandler(MessagingHandler): def __init__(self): super(TestHandler, self).__init__() port = free_tcp_port() self.url = "localhost:%i" % port def on_start(self, event): self.listener = event.container.listen(self.url) def on_connection_closing(self, event): event.connection.close() self.listener.close() test_handler = TestHandler() container = Container(test_handler) class ConnectionHandler(MessagingHandler): def __init__(self): super(ConnectionHandler, self).__init__() def on_connection_opened(self, event): event.connection.close() assert event.container is event.reactor assert event.container is container container.connect(test_handler.url, handler=ConnectionHandler()) container.run() def test_authentication_via_url(self): ensureCanTestExtendedSASL() test_handler = AuthenticationTestHandler() container = Container(test_handler) container.connect("%s:password@%s" % ("user%40proton", test_handler.url), reconnect=False) container.run() assert test_handler.verified def test_authentication_via_container_attributes(self): ensureCanTestExtendedSASL() test_handler = AuthenticationTestHandler() container = Container(test_handler) container.user = "user@proton" container.password = "password" container.connect(test_handler.url, reconnect=False) container.run() assert test_handler.verified def test_authentication_via_kwargs(self): ensureCanTestExtendedSASL() test_handler = AuthenticationTestHandler() container = Container(test_handler) container.connect(test_handler.url, user="user@proton", password="password", reconnect=False) container.run() assert test_handler.verified class _ServerHandler(MessagingHandler): def __init__(self, host): super(ContainerTest._ServerHandler, self).__init__() self.host = host self.port = free_tcp_port() self.client_addr = None self.peer_hostname = None def on_start(self, event): self.listener = event.container.listen("%s:%s" % (self.host, self.port)) def on_connection_opened(self, event): self.client_addr = event.connected_address self.peer_hostname = event.connection.remote_hostname def on_connection_closing(self, event): event.connection.close() self.listener.close() class _ClientHandler(MessagingHandler): def __init__(self): super(ContainerTest._ClientHandler, self).__init__() self.server_addr = None self.errors = 0 def on_connection_opened(self, event): self.server_addr = event.connected_address event.connection.close() def on_transport_error(self, event): self.errors += 1 def test_numeric_hostname(self): ensureCanTestExtendedSASL() server_handler = ContainerTest._ServerHandler("127.0.0.1") client_handler = ContainerTest._ClientHandler() container = Container(server_handler) container.connect(url="127.0.0.1:%s" % (server_handler.port), handler=client_handler) container.run() assert server_handler.client_addr assert client_handler.server_addr assert server_handler.peer_hostname == "127.0.0.1", server_handler.peer_hostname assert client_handler.server_addr.rsplit(':', 1)[1] == str(server_handler.port) def test_non_numeric_hostname(self): ensureCanTestExtendedSASL() server_handler = ContainerTest._ServerHandler("localhost") client_handler = ContainerTest._ClientHandler() container = Container(server_handler) container.connect(url="localhost:%s" % (server_handler.port), handler=client_handler) container.run() assert server_handler.client_addr assert client_handler.server_addr assert server_handler.peer_hostname == "localhost", server_handler.peer_hostname assert client_handler.server_addr.rsplit(':', 1)[1] == str(server_handler.port) def test_virtual_host(self): ensureCanTestExtendedSASL() server_handler = ContainerTest._ServerHandler("localhost") container = Container(server_handler) conn = container.connect(url="localhost:%s" % (server_handler.port), handler=ContainerTest._ClientHandler(), virtual_host="a.b.c.org") container.run() assert server_handler.peer_hostname == "a.b.c.org", server_handler.peer_hostname def test_no_virtual_host(self): # explicitly setting an empty virtual host should prevent the hostname # field from being sent in the Open performative when using the # Python Container. server_handler = ContainerTest._ServerHandler("localhost") container = Container(server_handler) conn = container.connect(url="localhost:%s" % (server_handler.port), handler=ContainerTest._ClientHandler(), virtual_host="") container.run() assert server_handler.peer_hostname is None, server_handler.peer_hostname class _ReconnectServerHandler(MessagingHandler): def __init__(self, host, listen_on_error=True): super(ContainerTest._ReconnectServerHandler, self).__init__() self.host = host self.port = free_tcp_port() self.client_addr = None self.peer_hostname = None self.listen_on_error = listen_on_error def on_connection_opened(self, event): self.client_addr = event.connected_address self.peer_hostname = event.connection.remote_hostname self.listener.close() def on_connection_closing(self, event): event.connection.close() def listen(self, container): if self.listen_on_error: self.listener = container.listen("%s:%s" % (self.host, self.port)) class _ReconnectClientHandler(MessagingHandler): def __init__(self, server_handler): super(ContainerTest._ReconnectClientHandler, self).__init__() self.connect_failed = False self.server_addr = None self.server_handler = server_handler def on_connection_opened(self, event): self.server_addr = event.connected_address event.connection.close() def on_transport_error(self, event): assert self.connect_failed == False self.connect_failed = True self.server_handler.listen(event.container) def test_failover(self): server_handler = ContainerTest._ServerHandler("localhost") client_handler = ContainerTest._ClientHandler() free_ports = free_tcp_ports(2) container = Container(server_handler) container.connect(urls=["localhost:%s" % (free_ports[0]), "localhost:%s" % (free_ports[1]), "localhost:%s" % (server_handler.port)], handler=client_handler) container.run() assert server_handler.peer_hostname == 'localhost', server_handler.peer_hostname assert client_handler.server_addr == Url(host='localhost', port=server_handler.port), client_handler.server_addr def test_failover_fail(self): client_handler = ContainerTest._ClientHandler() free_ports = free_tcp_ports(2) container = Container(client_handler) start = time.time() container.connect(urls=["localhost:%s" % (free_ports[0]), "localhost:%s" % (free_ports[1])], reconnect=Backoff(max_tries=5), handler=client_handler) container.run() end = time.time() assert client_handler.errors == 10 # Total time for failure should be greater than but close to 3s # would like to have an upper bound of about 3.2 too - but loaded CI machines can take a loooong time! assert 3.0 < end - start, end - start assert client_handler.server_addr is None, client_handler.server_addr def test_failover_fail_custom_reconnect(self): client_handler = ContainerTest._ClientHandler() free_ports = free_tcp_ports(2) container = Container(client_handler) start = time.time() container.connect(urls=["localhost:%s" % (free_ports[0]), "localhost:%s" % (free_ports[1])], reconnect=[0, 0.5, 1], handler=client_handler) container.run() end = time.time() assert client_handler.errors == 6 # Total time for failure should be greater than but close to 3s # would like to have an upper bound of about 3.2 too - but loaded CI machines can take a loooong time! assert 3.0 < end - start, end - start assert client_handler.server_addr is None, client_handler.server_addr def test_reconnect(self): server_handler = ContainerTest._ReconnectServerHandler("localhost", listen_on_error=True) client_handler = ContainerTest._ReconnectClientHandler(server_handler) container = Container(server_handler) container.connect(url="localhost:%s" % (server_handler.port), handler=client_handler) container.run() assert server_handler.peer_hostname == 'localhost', server_handler.peer_hostname assert client_handler.connect_failed assert client_handler.server_addr == Url(host='localhost', port=server_handler.port), client_handler.server_addr def test_not_reconnecting(self): server_handler = ContainerTest._ReconnectServerHandler("localhost", listen_on_error=False) client_handler = ContainerTest._ReconnectClientHandler(server_handler) container = Container(server_handler) container.connect(url="localhost:%s" % (server_handler.port), handler=client_handler, reconnect=False) container.run() assert server_handler.peer_hostname is None, server_handler.peer_hostname assert client_handler.connect_failed assert client_handler.server_addr is None, client_handler.server_addr class SelectorTest(Test): """Test the Selector""" def test_unicode_selector(self): assert Selector(u"Hello").filter_set[symbol('selector')].value == u"Hello" def test_non_unicode_selector(self): assert Selector(b"Hello").filter_set[symbol('selector')].value == u"Hello"