# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # from __future__ import absolute_import import os from proton import * from . import common from . import engine from .common import pump, Skipped def _sslCertpath(file): """ Return the full path to the certificate, keyfile, etc. """ if os.name == "nt": if file.find("private-key") != -1: # The private key is not in a separate store return None # Substitute pkcs#12 equivalent for the CA/key store if file.endswith(".pem"): file = file[:-4] + ".p12" return os.path.join(os.path.dirname(__file__), "ssl_db/%s" % file) def _testSaslMech(self, mech, clientUser='user@proton', authUser='user@proton', authzid=None, encrypted=False, authenticated=True): self.s1.allowed_mechs(mech) self.c1.open() self.c2.open() pump(self.t1, self.t2, 1024) if encrypted is not None: assert self.t2.encrypted == encrypted, encrypted assert self.t1.encrypted == encrypted, encrypted if authzid is None: authzid = authUser assert self.t2.authenticated == authenticated, authenticated assert self.t1.authenticated == authenticated, authenticated if authenticated: # Server assert self.t2.user == authUser assert self.s2.user == authUser assert self.s2.authorization == authzid, self.s2.authorization assert self.s2.mech == mech.strip() assert self.s2.outcome == SASL.OK, self.s2.outcome assert self.c2.state & Endpoint.LOCAL_ACTIVE and self.c2.state & Endpoint.REMOTE_ACTIVE,\ "local_active=%s, remote_active=%s" % ( self.c1.state & Endpoint.LOCAL_ACTIVE, self.c1.state & Endpoint.REMOTE_ACTIVE) # Client assert self.t1.user == clientUser assert self.s1.user == clientUser assert self.s1.mech == mech.strip() assert self.s1.outcome == SASL.OK, self.s1.outcome assert self.c1.state & Endpoint.LOCAL_ACTIVE and self.c1.state & Endpoint.REMOTE_ACTIVE,\ "local_active=%s, remote_active=%s" % ( self.c1.state & Endpoint.LOCAL_ACTIVE, self.c1.state & Endpoint.REMOTE_ACTIVE) else: # Server assert self.t2.user is None assert self.s2.user is None assert self.s2.authorization is None, self.s2.authorization assert self.s2.outcome != SASL.OK, self.s2.outcome # Client assert self.t1.user == clientUser assert self.s1.user == clientUser assert self.s1.outcome != SASL.OK, self.s1.outcome class Test(common.Test): pass def consumeAllOuput(t): stops = 0 while stops < 1: out = t.peek(1024) l = len(out) if out else 0 t.pop(l) if l <= 0: stops += 1 class SaslTest(Test): def setUp(self): self.t1 = Transport() self.s1 = SASL(self.t1) self.t2 = Transport(Transport.SERVER) self.t2.max_frame_size = 65536 self.s2 = SASL(self.t2) def pump(self): pump(self.t1, self.t2, 1024) # We have to generate the client frames manually because proton does not # generate pipelined SASL and AMQP frames together def testIllegalProtocolLayering(self): # Server self.s2.allowed_mechs('ANONYMOUS') c2 = Connection() self.t2.bind(c2) assert self.s2.outcome is None # Push client bytes into server self.t2.push( # SASL b'AMQP\x03\x01\x00\x00' # @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b"anonymous@fuschia"] b'\x00\x00\x002\x02\x01\x00\x00\x00SA\xd0\x00\x00\x00"\x00\x00\x00\x02\xa3\x09ANONYMOUS\xa0\x11anonymous@fuschia' # SASL (again illegally) b'AMQP\x03\x01\x00\x00' # @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b"anonymous@fuschia"] b'\x00\x00\x002\x02\x01\x00\x00\x00SA\xd0\x00\x00\x00"\x00\x00\x00\x02\xa3\x09ANONYMOUS\xa0\x11anonymous@fuschia' # AMQP b'AMQP\x00\x01\x00\x00' # @open(16) [container-id="", channel-max=1234] b'\x00\x00\x00!\x02\x00\x00\x00\x00S\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x0a\xa1\x00@@`\x04\xd2@@@@@@' ) consumeAllOuput(self.t2) assert self.t2.condition assert self.t2.closed assert not c2.state & Endpoint.REMOTE_ACTIVE def testPipelinedClient(self): # Server self.s2.allowed_mechs('ANONYMOUS') c2 = Connection() self.t2.bind(c2) assert self.s2.outcome is None # Push client bytes into server self.t2.push( # SASL b'AMQP\x03\x01\x00\x00' # @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b"anonymous@fuschia"] b'\x00\x00\x002\x02\x01\x00\x00\x00SA\xd0\x00\x00\x00"\x00\x00\x00\x02\xa3\x09ANONYMOUS\xa0\x11anonymous@fuschia' # AMQP b'AMQP\x00\x01\x00\x00' # @open(16) [container-id="", channel-max=1234] b'\x00\x00\x00!\x02\x00\x00\x00\x00S\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x0a\xa1\x00@@`\x04\xd2@@@@@@' ) consumeAllOuput(self.t2) assert not self.t2.condition assert self.s2.outcome == SASL.OK assert c2.state & Endpoint.REMOTE_ACTIVE def testPipelinedServer(self): # Client self.s1.allowed_mechs('ANONYMOUS') c1 = Connection() self.t1.bind(c1) assert self.s1.outcome is None # Push server bytes into client # Commented out lines in this test are where the client input processing doesn't # run after output processing even though there is input waiting self.t1.push( # SASL b'AMQP\x03\x01\x00\x00' # @sasl-mechanisms(64) [sasl-server-mechanisms=@PN_SYMBOL[:ANONYMOUS]] b'\x00\x00\x00\x1c\x02\x01\x00\x00\x00S@\xc0\x0f\x01\xe0\x0c\x01\xa3\tANONYMOUS' # @sasl-outcome(68) [code=0] b'\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00' # AMQP b'AMQP\x00\x01\x00\x00' # @open(16) [container-id="", channel-max=1234] b'\x00\x00\x00!\x02\x00\x00\x00\x00S\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x0a\xa1\x00@@`\x04\xd2@@@@@@' ) consumeAllOuput(self.t1) assert self.s1.outcome == SASL.OK assert c1.state & Endpoint.REMOTE_ACTIVE def testPipelined2(self): out1 = self.t1.peek(1024) self.t1.pop(len(out1)) self.t2.push(out1) self.s2.allowed_mechs('ANONYMOUS') c2 = Connection() c2.open() self.t2.bind(c2) out2 = self.t2.peek(1024) self.t2.pop(len(out2)) self.t1.push(out2) out1 = self.t1.peek(1024) assert len(out1) > 0 def testFracturedSASL(self): """ PROTON-235 """ assert self.s1.outcome is None # self.t1.trace(Transport.TRACE_FRM) out = self.t1.peek(1024) self.t1.pop(len(out)) self.t1.push(b"AMQP\x03\x01\x00\x00") out = self.t1.peek(1024) self.t1.pop(len(out)) self.t1.push(b"\x00\x00\x00") out = self.t1.peek(1024) self.t1.pop(len(out)) self.t1.push(b"6\x02\x01\x00\x00\x00S@\xc0\x29\x01\xe0\x26\x04\xa3\x05PLAIN\x0aDIGEST-MD5\x09ANONYMOUS\x08CRAM-MD5") out = self.t1.peek(1024) self.t1.pop(len(out)) self.t1.push(b"\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00") out = self.t1.peek(1024) self.t1.pop(len(out)) while out: out = self.t1.peek(1024) self.t1.pop(len(out)) assert self.s1.outcome == SASL.OK, self.s1.outcome def test_singleton(self): """Verify that only a single instance of SASL can exist per Transport""" transport = Transport() attr = object() sasl1 = SASL(transport) sasl1.my_attribute = attr sasl2 = transport.sasl() sasl3 = SASL(transport) assert sasl1 == sasl2 assert sasl1 == sasl3 assert sasl1.my_attribute == attr assert sasl2.my_attribute == attr assert sasl3.my_attribute == attr transport = Transport() sasl1 = transport.sasl() sasl1.my_attribute = attr sasl2 = SASL(transport) assert sasl1 == sasl2 assert sasl1.my_attribute == attr assert sasl2.my_attribute == attr def testSaslSkipped(self): """Verify that the server (with SASL) correctly handles a client without SASL""" self.t1 = Transport() self.t2.require_auth(False) self.pump() assert self.s2.outcome is None assert self.t2.condition is None assert self.t2.authenticated == False assert self.s1.outcome is None assert self.t1.condition is None assert self.t1.authenticated == False def testSaslSkippedFail(self): """Verify that the server (with SASL) correctly handles a client without SASL""" self.t1 = Transport() self.t2.require_auth(True) self.pump() assert self.s2.outcome is None assert self.t2.condition is not None assert self.s1.outcome is None assert self.t1.condition is not None def testMechNotFound(self): self.c1 = Connection() self.c1.open() self.t1.bind(self.c1) self.s1.allowed_mechs('IMPOSSIBLE') self.pump() assert self.t2.authenticated == False assert self.t1.authenticated == False assert self.s1.outcome != SASL.OK assert self.s2.outcome != SASL.OK class SASLMechTest(Test): def setUp(self): self.t1 = Transport() self.s1 = SASL(self.t1) self.t2 = Transport(Transport.SERVER) self.s2 = SASL(self.t2) self.c1 = Connection() self.c1.user = 'user@proton' self.c1.password = 'password' self.c1.hostname = 'localhost' self.c2 = Connection() def testANON(self): self.t1.bind(self.c1) self.t2.bind(self.c2) _testSaslMech(self, 'ANONYMOUS', authUser='anonymous') def testCRAMMD5(self): common.ensureCanTestExtendedSASL() self.t1.bind(self.c1) self.t2.bind(self.c2) _testSaslMech(self, 'CRAM-MD5') def testDIGESTMD5(self): common.ensureCanTestExtendedSASL() self.t1.bind(self.c1) self.t2.bind(self.c2) _testSaslMech(self, 'DIGEST-MD5') # PLAIN shouldn't work without encryption without special setting def testPLAINfail(self): common.ensureCanTestExtendedSASL() self.t1.bind(self.c1) self.t2.bind(self.c2) _testSaslMech(self, 'PLAIN', authenticated=False) # Client won't accept PLAIN even if offered by server without special setting def testPLAINClientFail(self): common.ensureCanTestExtendedSASL() self.s2.allow_insecure_mechs = True self.t1.bind(self.c1) self.t2.bind(self.c2) _testSaslMech(self, 'PLAIN', authenticated=False) # PLAIN will only work if both ends are specially set up def testPLAIN(self): common.ensureCanTestExtendedSASL() self.s1.allow_insecure_mechs = True self.s2.allow_insecure_mechs = True self.t1.bind(self.c1) self.t2.bind(self.c2) _testSaslMech(self, 'PLAIN') # SCRAM not supported before Cyrus SASL 2.1.26 # so not universal and hence need a test for support # to keep it in tests. # def testSCRAMSHA1(self): # common.ensureCanTestExtendedSASL() # # self.t1.bind(self.c1) # self.t2.bind(self.c2) # _testSaslMech(self, 'SCRAM-SHA-1') def _sslConnection(domain, transport, connection): transport.bind(connection) ssl = SSL(transport, domain, None) return connection class SSLSASLTest(Test): def setUp(self): if not common.isSSLPresent(): raise Skipped("No SSL libraries found.") self.server_domain = SSLDomain(SSLDomain.MODE_SERVER) self.client_domain = SSLDomain(SSLDomain.MODE_CLIENT) self.t1 = Transport() self.s1 = SASL(self.t1) self.t2 = Transport(Transport.SERVER) self.s2 = SASL(self.t2) self.c1 = Connection() self.c2 = Connection() def testSSLPlainSimple(self): if not SASL.extended(): raise Skipped("Simple SASL server does not support PLAIN") common.ensureCanTestExtendedSASL() clientUser = 'user@proton' mech = 'PLAIN' self.c1.user = clientUser self.c1.password = 'password' self.c1.hostname = 'localhost' ssl1 = _sslConnection(self.client_domain, self.t1, self.c1) ssl2 = _sslConnection(self.server_domain, self.t2, self.c2) _testSaslMech(self, mech, encrypted=True) def testSSLPlainAuthzid(self): if not SASL.extended(): raise Skipped("Simple SASL server does not support PLAIN") common.ensureCanTestExtendedSASL() clientUser = 'user@proton' authzid = 'user2' mech = 'PLAIN' self.c1.user = clientUser self.c1.authorization = authzid self.c1.password = 'password' self.c1.hostname = 'localhost' ssl1 = _sslConnection(self.client_domain, self.t1, self.c1) ssl2 = _sslConnection(self.server_domain, self.t2, self.c2) _testSaslMech(self, mech, authzid=authzid, encrypted=True) def testSSLPlainSimpleFail(self): if not SASL.extended(): raise Skipped("Simple SASL server does not support PLAIN") common.ensureCanTestExtendedSASL() clientUser = 'usr@proton' mech = 'PLAIN' self.c1.user = clientUser self.c1.password = 'password' self.c1.hostname = 'localhost' ssl1 = _sslConnection(self.client_domain, self.t1, self.c1) ssl2 = _sslConnection(self.server_domain, self.t2, self.c2) _testSaslMech(self, mech, clientUser='usr@proton', encrypted=True, authenticated=False) def testSSLExternalSimple(self): if os.name == "nt": extUser = 'O=Client, CN=127.0.0.1' else: extUser = 'O=Client,CN=127.0.0.1' mech = 'EXTERNAL' self.server_domain.set_credentials(_sslCertpath("server-certificate.pem"), _sslCertpath("server-private-key.pem"), "server-password") self.server_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem")) self.server_domain.set_peer_authentication(SSLDomain.VERIFY_PEER, _sslCertpath("ca-certificate.pem")) self.client_domain.set_credentials(_sslCertpath("client-certificate.pem"), _sslCertpath("client-private-key.pem"), "client-password") self.client_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem")) self.client_domain.set_peer_authentication(SSLDomain.VERIFY_PEER) ssl1 = _sslConnection(self.client_domain, self.t1, self.c1) ssl2 = _sslConnection(self.server_domain, self.t2, self.c2) _testSaslMech(self, mech, clientUser=None, authUser=extUser, encrypted=True) def testSSLExternalAuthzid(self): if os.name == "nt": extUser = 'O=Client, CN=127.0.0.1' else: extUser = 'O=Client,CN=127.0.0.1' mech = 'EXTERNAL' authzid = 'user_foo' self.c1.authorization = authzid self.server_domain.set_credentials(_sslCertpath("server-certificate.pem"), _sslCertpath("server-private-key.pem"), "server-password") self.server_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem")) self.server_domain.set_peer_authentication(SSLDomain.VERIFY_PEER, _sslCertpath("ca-certificate.pem")) self.client_domain.set_credentials(_sslCertpath("client-certificate.pem"), _sslCertpath("client-private-key.pem"), "client-password") self.client_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem")) self.client_domain.set_peer_authentication(SSLDomain.VERIFY_PEER) ssl1 = _sslConnection(self.client_domain, self.t1, self.c1) ssl2 = _sslConnection(self.server_domain, self.t2, self.c2) _testSaslMech(self, mech, clientUser=None, authUser=extUser, authzid=authzid, encrypted=True) def testSSLExternalSimpleFail(self): mech = 'EXTERNAL' self.server_domain.set_credentials(_sslCertpath("server-certificate.pem"), _sslCertpath("server-private-key.pem"), "server-password") self.server_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem")) self.server_domain.set_peer_authentication(SSLDomain.VERIFY_PEER, _sslCertpath("ca-certificate.pem")) self.client_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem")) self.client_domain.set_peer_authentication(SSLDomain.VERIFY_PEER) ssl1 = _sslConnection(self.client_domain, self.t1, self.c1) ssl2 = _sslConnection(self.server_domain, self.t2, self.c2) _testSaslMech(self, mech, clientUser=None, authUser=None, encrypted=None, authenticated=False) class SASLEventTest(engine.CollectorTest): def setUp(self): engine.CollectorTest.setUp(self) self.t1 = Transport() self.s1 = SASL(self.t1) self.t2 = Transport(Transport.SERVER) self.s2 = SASL(self.t2) self.c1 = Connection() self.c1.user = 'user@proton' self.c1.password = 'password' self.c1.hostname = 'localhost' self.c2 = Connection() self.collector = Collector() def testNormalAuthenticationClient(self): common.ensureCanTestExtendedSASL() self.c1.collect(self.collector) self.t1.bind(self.c1) self.t2.bind(self.c2) _testSaslMech(self, 'DIGEST-MD5') self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND, Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT, Event.CONNECTION_REMOTE_OPEN) def testNormalAuthenticationServer(self): common.ensureCanTestExtendedSASL() self.c2.collect(self.collector) self.t1.bind(self.c1) self.t2.bind(self.c2) _testSaslMech(self, 'DIGEST-MD5') self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND, Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT, Event.CONNECTION_REMOTE_OPEN) def testFailedAuthenticationClient(self): common.ensureCanTestExtendedSASL() clientUser = "usr@proton" self.c1.user = clientUser self.c1.collect(self.collector) self.t1.bind(self.c1) self.t2.bind(self.c2) _testSaslMech(self, 'DIGEST-MD5', clientUser=clientUser, authenticated=False) self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND, Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT, Event.TRANSPORT_ERROR, Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED) def testFailedAuthenticationServer(self): common.ensureCanTestExtendedSASL() clientUser = "usr@proton" self.c1.user = clientUser self.c2.collect(self.collector) self.t1.bind(self.c1) self.t2.bind(self.c2) _testSaslMech(self, 'DIGEST-MD5', clientUser=clientUser, authenticated=False) self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND, Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT, Event.TRANSPORT_ERROR, Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED) def testNoMechClient(self): common.ensureCanTestExtendedSASL() self.c1.collect(self.collector) self.s2.allowed_mechs('IMPOSSIBLE') self.t1.bind(self.c1) self.t2.bind(self.c2) _testSaslMech(self, 'DIGEST-MD5', authenticated=False) self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND, Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT, Event.TRANSPORT_ERROR, Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED) def testNoMechServer(self): common.ensureCanTestExtendedSASL() self.c2.collect(self.collector) self.s2.allowed_mechs('IMPOSSIBLE') self.t1.bind(self.c1) self.t2.bind(self.c2) _testSaslMech(self, 'DIGEST-MD5', authenticated=False) self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND, Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT, Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_ERROR, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED) def testDisallowedMechClient(self): self.c1.collect(self.collector) self.t1.bind(self.c1) self.t2.bind(self.c2) _testSaslMech(self, 'IMPOSSIBLE', authenticated=False) self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND, Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT, Event.TRANSPORT_ERROR, Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED) def testDisallowedMechServer(self): self.c2.collect(self.collector) self.t1.bind(self.c1) self.t2.bind(self.c2) _testSaslMech(self, 'IMPOSSIBLE', authenticated=False) self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND, Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT, Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_ERROR, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED) def testDisallowedPlainClient(self): self.c1.collect(self.collector) self.t1.bind(self.c1) self.t2.bind(self.c2) _testSaslMech(self, 'PLAIN', authenticated=False) self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND, Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT, Event.TRANSPORT_ERROR, Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED) def testDisallowedPlainServer(self): self.c2.collect(self.collector) self.t1.bind(self.c1) self.t2.bind(self.c2) _testSaslMech(self, 'PLAIN', authenticated=False) self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND, Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT, Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_ERROR, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)