from __future__ import absolute_import # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # from __future__ import absolute_import import os from .common import Test, Skipped, free_tcp_ports, \ MessengerReceiverC, MessengerSenderC, \ ReactorReceiverC, ReactorSenderC, \ isSSLPresent # # Tests that run the apps # class AppTests(Test): def __init__(self, *args): Test.__init__(self, *args) self.is_valgrind = False def default(self, name, value, **kwargs): if self.is_valgrind: default = kwargs.get("valgrind", value) else: default = value return Test.default(self, name, default, **kwargs) @property def iterations(self): return int(self.default("iterations", 2, fast=1, valgrind=2)) @property def send_count(self): return int(self.default("send_count", 17, fast=1, valgrind=2)) @property def target_count(self): return int(self.default("target_count", 5, fast=1, valgrind=2)) @property def send_batch(self): return int(self.default("send_batch", 7, fast=1, valgrind=2)) @property def forward_count(self): return int(self.default("forward_count", 5, fast=1, valgrind=2)) @property def port_count(self): return int(self.default("port_count", 3, fast=1, valgrind=2)) @property def sender_count(self): return int(self.default("sender_count", 3, fast=1, valgrind=2)) def valgrind_test(self): self.is_valgrind = True def setUp(self): self.senders = [] self.receivers = [] def tearDown(self): pass def _do_test(self, iterations=1): verbose = self.verbose for R in self.receivers: R.start(verbose) for j in range(iterations): for S in self.senders: S.start(verbose) for S in self.senders: S.wait() #print("SENDER OUTPUT:") #print( S.stdout() ) assert S.status() == 0, ("Command '%s' failed status=%d: '%s' '%s'" % (str(S.cmdline()), S.status(), S.stdout(), S.stderr())) for R in self.receivers: R.wait() #print("RECEIVER OUTPUT") #print( R.stdout() ) assert R.status() == 0, ("Command '%s' failed status=%d: '%s' '%s'" % (str(R.cmdline()), R.status(), R.stdout(), R.stderr())) # # Traffic passing tests based on the Messenger apps # class MessengerTests(AppTests): _timeout = 60 def _ssl_check(self): if not isSSLPresent(): raise Skipped("No SSL libraries found.") if os.name == "nt": raise Skipped("Windows SChannel lacks anonymous cipher support.") def __init__(self, *args): AppTests.__init__(self, *args) def _do_oneway_test(self, receiver, sender, domain="amqp"): """ Send N messages to a receiver. Parameters: iterations - repeat the senders this many times target_count = # of targets to send to. send_count = # messages sent to each target """ iterations = self.iterations send_count = self.send_count target_count = self.target_count send_total = send_count * target_count receive_total = send_total * iterations port = free_tcp_ports()[0] receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port)] receiver.receive_count = receive_total receiver.timeout = MessengerTests._timeout self.receivers.append(receiver) sender.targets = ["%s://0.0.0.0:%s/X%d" % (domain, port, j) for j in range(target_count)] sender.send_count = send_total sender.timeout = MessengerTests._timeout self.senders.append(sender) self._do_test(iterations) def _do_echo_test(self, receiver, sender, domain="amqp"): """ Send N messages to a receiver, which responds to each. Parameters: iterations - repeat the senders this many times target_count - # targets to send to send_count = # messages sent to each target send_batch - wait for replies after this many messages sent """ iterations = self.iterations send_count = self.send_count target_count = self.target_count send_batch = self.send_batch send_total = send_count * target_count receive_total = send_total * iterations port = free_tcp_ports()[0] receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port)] receiver.receive_count = receive_total receiver.send_reply = True receiver.timeout = MessengerTests._timeout self.receivers.append(receiver) sender.targets = ["%s://0.0.0.0:%s/%dY" % (domain, port, j) for j in range(target_count)] sender.send_count = send_total sender.get_reply = True sender.send_batch = send_batch sender.timeout = MessengerTests._timeout self.senders.append(sender) self._do_test(iterations) # Removed messenger "relay" tests. The test start-up is faulty: # msgr-recv prints it's -X ready message when it starts to open a # connection but it does not wait for the remote open. The relay # tests depends on mapping a container name from an incoming # connection. They can fail under if the sender starts before the # connection is complete (esp. valgrind with SSL connections) We # could fix the tests but since messenger is deprecated it does # not seem worthwhile. def _do_star_topology_test(self, r_factory, s_factory, domain="amqp"): """ A star-like topology, with a central receiver at the hub, and senders at the spokes. Each sender will connect to each of the ports the receiver is listening on. Each sender will then create N links per each connection. Each sender will send X messages per link, waiting for a response. Parameters: iterations - repeat the senders this many times port_count - # of ports the receiver will listen on. Each sender connects to all ports. sender_count - # of senders target_count - # of targets per connection send_count - # of messages sent to each target send_batch - # of messages to send before waiting for response """ iterations = self.iterations port_count = self.port_count sender_count = self.sender_count target_count = self.target_count send_count = self.send_count send_batch = self.send_batch send_total = port_count * target_count * send_count receive_total = send_total * sender_count * iterations ports = free_tcp_ports(port_count) receiver = r_factory() receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port) for port in ports] receiver.receive_count = receive_total receiver.send_reply = True receiver.timeout = MessengerTests._timeout self.receivers.append(receiver) for i in range(sender_count): sender = s_factory() sender.targets = ["%s://0.0.0.0:%s/%d" % (domain, port, j) for port in ports for j in range(target_count)] sender.send_count = send_total sender.send_batch = send_batch sender.get_reply = True sender.timeout = MessengerTests._timeout self.senders.append(sender) self._do_test(iterations) def test_oneway_C(self): self._do_oneway_test(MessengerReceiverC(), MessengerSenderC()) def test_oneway_C_SSL(self): self._ssl_check() self._do_oneway_test(MessengerReceiverC(), MessengerSenderC(), "amqps") def test_echo_C(self): self._do_echo_test(MessengerReceiverC(), MessengerSenderC()) def test_echo_C_SSL(self): self._ssl_check() self._do_echo_test(MessengerReceiverC(), MessengerSenderC(), "amqps") def test_star_topology_C(self): self._do_star_topology_test(MessengerReceiverC, MessengerSenderC) def test_star_topology_C_SSL(self): self._ssl_check() self._do_star_topology_test(MessengerReceiverC, MessengerSenderC, "amqps") def test_oneway_reactor(self): self._do_oneway_test(ReactorReceiverC(), ReactorSenderC())