#!/usr/bin/python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License # # Run the C examples and verify that they behave as expected. # Example executables must be in PATH import unittest from test_subprocess import Popen, Server, TestProcessError, check_output MESSAGES=10 def receive_expect_messages(n=MESSAGES): return ''.join(['{"sequence"=%s}\n'%i for i in range(1, n+1)]) def receive_expect_total(n=MESSAGES): return "%s messages received\n"%n def receive_expect(n=MESSAGES): return receive_expect_messages(n)+receive_expect_total(n) def send_expect(n=MESSAGES): return "%s messages sent and acknowledged\n" % n def send_abort_expect(n=MESSAGES): return "%s messages started and aborted\n" % n class Broker(Server): def __init__(self): super(Broker, self).__init__(["broker", "", "0"], kill_me=True) class ExampleTest(unittest.TestCase): def runex(self, name, port, messages=MESSAGES): """Run an example with standard arguments, return output""" return check_output([name, "", port, "xtest", str(messages)]) def startex(self, name, port, messages=MESSAGES): """Start an example sub-process with standard arguments""" return Popen([name, "", port, "xtest", str(messages)]) def test_send_receive(self): """Send first then receive""" with Broker() as b: self.assertEqual(send_expect(), self.runex("send", b.port)) self.assertMultiLineEqual(receive_expect(), self.runex("receive", b.port)) def test_receive_send(self): """Start receiving first, then send.""" with Broker() as b: r = self.startex("receive", b.port) self.assertEqual(send_expect(), self.runex("send", b.port)) self.assertMultiLineEqual(receive_expect(), r.communicate()[0]) def test_send_direct(self): """Send to direct server""" d = Server(["direct", "", "0"]) self.assertEqual(send_expect(), self.runex("send", d.port)) self.assertMultiLineEqual(receive_expect(), d.communicate()[0]) def test_receive_direct(self): """Receive from direct server""" d = Server(["direct", "", "0"]) self.assertMultiLineEqual(receive_expect(), self.runex("receive", d.port)) self.assertEqual("10 messages sent and acknowledged\n", d.communicate()[0]) def test_send_abort_broker(self): """Sending aborted messages to a broker""" with Broker() as b: self.assertEqual(send_expect(), self.runex("send", b.port)) self.assertEqual(send_abort_expect(), self.runex("send-abort", b.port)) for i in range(MESSAGES): self.assertEqual("Message aborted\n", b.stdout.readline()) self.assertEqual(send_expect(), self.runex("send", b.port)) expect = receive_expect_messages(MESSAGES)+receive_expect_messages(MESSAGES)+receive_expect_total(20) self.assertMultiLineEqual(expect, self.runex("receive", b.port, "20")) def test_send_abort_direct(self): """Send aborted messages to the direct server""" d = Server(["direct", "", "0", "examples", "20"]) self.assertEqual(send_expect(), self.runex("send", d.port)) self.assertEqual(send_abort_expect(), self.runex("send-abort", d.port)) self.assertEqual(send_expect(), self.runex("send", d.port)) expect = receive_expect_messages() + "Message aborted\n"*MESSAGES + receive_expect_messages()+receive_expect_total(20) self.maxDiff = None self.assertMultiLineEqual(expect, d.communicate()[0]) def test_send_ssl_receive(self): """Send with SSL, then receive""" try: with Broker() as b: got = self.runex("send-ssl", b.port) self.assertIn("secure connection:", got) self.assertIn(send_expect(), got) self.assertMultiLineEqual(receive_expect(), self.runex("receive", b.port)) except TestProcessError as e: if e.output and e.output.startswith(b"error initializing SSL"): print("Skipping %s: SSL not available" % self.id()) else: raise if __name__ == "__main__": unittest.main()