# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # from __future__ import absolute_import import sys from proton import * from . import common class Test(common.Test): pass class ClientTransportTest(Test): def setUp(self): self.transport = Transport() self.peer = Transport() self.conn = Connection() self.peer.bind(self.conn) def tearDown(self): self.transport = None self.peer = None self.conn = None def drain(self): while True: p = self.transport.pending() if p < 0: return elif p > 0: data = self.transport.peek(p) self.peer.push(data) self.transport.pop(len(data)) else: assert False def assert_error(self, name): assert self.conn.remote_container is None, self.conn.remote_container self.drain() # verify that we received an open frame assert self.conn.remote_container is not None, self.conn.remote_container # verify that we received a close frame assert self.conn.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_CLOSED, self.conn.state # verify that a framing error was reported assert self.conn.remote_condition.name == name, self.conn.remote_condition def testEOS(self): self.transport.push(b"") # should be a noop self.transport.close_tail() # should result in framing error self.assert_error(u'amqp:connection:framing-error') def testPartial(self): self.transport.push(b"AMQ") # partial header self.transport.close_tail() # should result in framing error self.assert_error(u'amqp:connection:framing-error') def testGarbage(self, garbage=b"GARBAGE_"): self.transport.push(garbage) self.assert_error(u'amqp:connection:framing-error') assert self.transport.pending() < 0 self.transport.close_tail() assert self.transport.pending() < 0 def testSmallGarbage(self): self.testGarbage(b"XXX") def testBigGarbage(self): self.testGarbage(b"GARBAGE_XXX") def testHeader(self): self.transport.push(b"AMQP\x00\x01\x00\x00") self.transport.close_tail() self.assert_error(u'amqp:connection:framing-error') def testHeaderBadDOFF1(self): """Verify doff > size error""" self.testGarbage(b"AMQP\x00\x01\x00\x00\x00\x00\x00\x08\x08\x00\x00\x00") def testHeaderBadDOFF2(self): """Verify doff < 2 error""" self.testGarbage(b"AMQP\x00\x01\x00\x00\x00\x00\x00\x08\x01\x00\x00\x00") def testHeaderBadSize(self): """Verify size > max_frame_size error""" self.transport.max_frame_size = 512 self.testGarbage(b"AMQP\x00\x01\x00\x00\x00\x00\x02\x01\x02\x00\x00\x00") def testProtocolNotSupported(self): self.transport.push(b"AMQP\x01\x01\x0a\x00") p = self.transport.pending() assert p >= 8, p bytes = self.transport.peek(p) assert bytes[:8] == b"AMQP\x00\x01\x00\x00" self.transport.pop(p) self.drain() assert self.transport.closed def testPeek(self): out = self.transport.peek(1024) assert out is not None def testBindAfterOpen(self): conn = Connection() ssn = conn.session() conn.open() ssn.open() conn.container = "test-container" conn.hostname = "test-hostname" trn = Transport() trn.bind(conn) out = trn.peek(1024) assert b"test-container" in out, repr(out) assert b"test-hostname" in out, repr(out) self.transport.push(out) c = Connection() assert c.remote_container is None assert c.remote_hostname is None assert c.session_head(0) is None self.transport.bind(c) assert c.remote_container == "test-container" assert c.remote_hostname == "test-hostname" assert c.session_head(0) is not None def testCloseHead(self): n = self.transport.pending() assert n > 0, n try: self.transport.close_head() except TransportException: e = sys.exc_info()[1] assert "aborted" in str(e), str(e) n = self.transport.pending() assert n < 0, n def testCloseTail(self): n = self.transport.capacity() assert n > 0, n try: self.transport.close_tail() except TransportException: e = sys.exc_info()[1] assert "aborted" in str(e), str(e) n = self.transport.capacity() assert n < 0, n def testUnpairedPop(self): conn = Connection() self.transport.bind(conn) conn.hostname = "hostname" conn.open() dat1 = self.transport.peek(1024) ssn = conn.session() ssn.open() dat2 = self.transport.peek(1024) assert dat2[:len(dat1)] == dat1 snd = ssn.sender("sender") snd.open() self.transport.pop(len(dat1)) self.transport.pop(len(dat2) - len(dat1)) dat3 = self.transport.peek(1024) self.transport.pop(len(dat3)) assert self.transport.peek(1024) == b"" self.peer.push(dat1) self.peer.push(dat2[len(dat1):]) self.peer.push(dat3) class ServerTransportTest(Test): def setUp(self): self.transport = Transport(Transport.SERVER) self.peer = Transport() self.conn = Connection() self.peer.bind(self.conn) def tearDOwn(self): self.transport = None self.peer = None self.conn = None def drain(self): while True: p = self.transport.pending() if p < 0: return elif p > 0: bytes = self.transport.peek(p) self.peer.push(bytes) self.transport.pop(len(bytes)) else: assert False def assert_error(self, name): assert self.conn.remote_container is None, self.conn.remote_container self.drain() # verify that we received an open frame assert self.conn.remote_container is not None, self.conn.remote_container # verify that we received a close frame assert self.conn.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_CLOSED, self.conn.state # verify that a framing error was reported assert self.conn.remote_condition.name == name, self.conn.remote_condition # TODO: This may no longer be testing anything def testEOS(self): self.transport.push(b"") # should be a noop self.transport.close_tail() p = self.transport.pending() self.drain() assert self.transport.closed def testPartial(self): self.transport.push(b"AMQ") # partial header self.transport.close_tail() p = self.transport.pending() assert p >= 8, p bytes = self.transport.peek(p) assert bytes[:8] == b"AMQP\x00\x01\x00\x00" self.transport.pop(p) self.drain() assert self.transport.closed def testGarbage(self, garbage=b"GARBAGE_"): self.transport.push(garbage) p = self.transport.pending() assert p >= 8, p bytes = self.transport.peek(p) assert bytes[:8] == b"AMQP\x00\x01\x00\x00" self.transport.pop(p) self.drain() assert self.transport.closed def testSmallGarbage(self): self.testGarbage(b"XXX") def testBigGarbage(self): self.testGarbage(b"GARBAGE_XXX") def testHeader(self): self.transport.push(b"AMQP\x00\x01\x00\x00") self.transport.close_tail() self.assert_error(u'amqp:connection:framing-error') def testProtocolNotSupported(self): self.transport.push(b"AMQP\x01\x01\x0a\x00") p = self.transport.pending() assert p >= 8, p bytes = self.transport.peek(p) assert bytes[:8] == b"AMQP\x00\x01\x00\x00" self.transport.pop(p) self.drain() assert self.transport.closed def testPeek(self): out = self.transport.peek(1024) assert out is not None def testBindAfterOpen(self): conn = Connection() ssn = conn.session() conn.open() ssn.open() conn.container = "test-container" conn.hostname = "test-hostname" trn = Transport() trn.bind(conn) out = trn.peek(1024) assert b"test-container" in out, repr(out) assert b"test-hostname" in out, repr(out) self.transport.push(out) c = Connection() assert c.remote_container is None assert c.remote_hostname is None assert c.session_head(0) is None self.transport.bind(c) assert c.remote_container == "test-container" assert c.remote_hostname == "test-hostname" assert c.session_head(0) is not None def testCloseHead(self): n = self.transport.pending() assert n >= 0, n try: self.transport.close_head() except TransportException: e = sys.exc_info()[1] assert "aborted" in str(e), str(e) n = self.transport.pending() assert n < 0, n def testCloseTail(self): n = self.transport.capacity() assert n > 0, n try: self.transport.close_tail() except TransportException: e = sys.exc_info()[1] assert "aborted" in str(e), str(e) n = self.transport.capacity() assert n < 0, n def testUnpairedPop(self): conn = Connection() self.transport.bind(conn) conn.hostname = "hostname" conn.open() dat1 = self.transport.peek(1024) ssn = conn.session() ssn.open() dat2 = self.transport.peek(1024) assert dat2[:len(dat1)] == dat1 snd = ssn.sender("sender") snd.open() self.transport.pop(len(dat1)) self.transport.pop(len(dat2) - len(dat1)) dat3 = self.transport.peek(1024) self.transport.pop(len(dat3)) assert self.transport.peek(1024) == b"" self.peer.push(dat1) self.peer.push(dat2[len(dat1):]) self.peer.push(dat3) def testEOSAfterSASL(self): self.transport.sasl().allowed_mechs('ANONYMOUS') self.peer.sasl().allowed_mechs('ANONYMOUS') # this should send over the sasl header plus a sasl-init set up # for anonymous p = self.peer.pending() self.transport.push(self.peer.peek(p)) self.peer.pop(p) # now we send EOS self.transport.close_tail() # the server may send an error back p = self.transport.pending() while p > 0: self.peer.push(self.transport.peek(p)) self.transport.pop(p) p = self.transport.pending() # server closed assert self.transport.pending() < 0 class LogTest(Test): def testTracer(self): t = Transport() assert t.tracer is None messages = [] def tracer(transport, message): messages.append((transport, message)) t.tracer = tracer assert t.tracer is tracer t.log("one") t.log("two") t.log("three") assert messages == [(t, "TRACE: one"), (t, "TRACE: two"), (t, "TRACE: three")], messages