Skip to content

Commit

Permalink
fixup: add more strenuous AMQP profile update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Aug 22, 2024
1 parent eeb2cb1 commit a71762d
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 45 deletions.
65 changes: 49 additions & 16 deletions tests/system_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1496,6 +1496,10 @@ class AsyncTestReceiver(MessagingHandler):
"""
Empty = Queue.Empty

class TestReceiverException(Exception):
def __init__(self, error=None):
super(AsyncTestReceiver.TestReceiverException, self).__init__(error)

class MyQueue(Queue.Queue):
def __init__(self, receiver):
self._async_receiver = receiver
Expand Down Expand Up @@ -1537,8 +1541,13 @@ def __init__(self, address, source, conn_args=None, container_id=None,
self._thread.start()
self.num_queue_puts = 0
self.num_queue_gets = 0
if wait and self._ready.wait(timeout=TIMEOUT) is False:
raise Exception("Timed out waiting for receiver start")
self._error = None
if wait:
ready = self._ready.wait(timeout=TIMEOUT)
if ready is False:
raise AsyncTestReceiver.TestReceiverException("Timed out waiting for receiver start")
elif self._error is not None:
raise AsyncTestReceiver.TestReceiverException(self._error)
self.queue_stats = "self.num_queue_puts=%d, self.num_queue_gets=%d"

def get_queue_stats(self):
Expand All @@ -1553,23 +1562,34 @@ def _main(self):
if self._conn:
self._conn.close()
self._conn = None
self._ready.set()
self._logger.log("AsyncTestReceiver reactor thread done")

def on_transport_error(self, event):
self._logger.log("AsyncTestReceiver on_transport_error=%s" % event.transport.condition.description)
self._error = f"Connection Error: {event.transport.condition.description}"
self._stop_thread = True

def on_connection_error(self, event):
self._logger.log("AsyncTestReceiver on_connection_error=%s" % event.connection.remote_condition.description)
self._error = f"Connection Error: {event.connection.remote_condition.description}"
self._stop_thread = True

def on_link_error(self, event):
self._logger.log("AsyncTestReceiver on_link_error=%s" % event.link.remote_condition.description)
self._error = f"Link Error: {event.link.remote_condition.description}"
self._stop_thread = True

def stop(self, timeout=TIMEOUT):
self._stop_thread = True
self._container.wakeup()
self._thread.join(timeout=TIMEOUT)
self._logger.log("thread done")
if self._thread.is_alive():
raise Exception("AsyncTestReceiver did not exit")
raise AsyncTestReceiver.TestReceiverException("AsyncTestReceiver did not exit")
del self._conn
del self._container
if self._error is not None:
raise AsyncTestReceiver.TestReceiverException(self._error)

def on_start(self, event):
kwargs = {'url': self.address}
Expand Down Expand Up @@ -1623,7 +1643,8 @@ def __init__(self, error=None):
super(AsyncTestSender.TestSenderException, self).__init__(error)

def __init__(self, address, target, count=1, message=None,
container_id=None, presettle=False, print_to_console=False):
container_id=None, presettle=False, print_to_console=False,
conn_args=None, get_link_info=True):
super(AsyncTestSender, self).__init__(auto_accept=False,
auto_settle=False)
self.address = address
Expand All @@ -1638,6 +1659,8 @@ def __init__(self, address, target, count=1, message=None,
self.error = None
self.link_stats = None
self._conn = None
self.conn_args = conn_args
self._get_link_info = get_link_info
self._sender = None
self._message = message or Message(body="test")
self._container = Container(self)
Expand Down Expand Up @@ -1667,15 +1690,30 @@ def wait(self):
self._thread.join(timeout=TIMEOUT)
self._logger.log("AsyncTestSender wait: thread done")
assert not self._thread.is_alive(), "sender did not complete"
if self.error:
if self.error is not None:
raise AsyncTestSender.TestSenderException(self.error)
del self._sender
del self._conn
del self._container
self._logger.log("AsyncTestSender wait: no errors in wait")

def on_start(self, event):
self._conn = self._container.connect(self.address)
kwargs = {'url': self.address}
if self.conn_args is not None:
kwargs.update(self.conn_args)
self._conn = self._container.connect(**kwargs)

def on_transport_error(self, event):
self._logger.log("AsyncTestSender on_transport_error=%s" % event.transport.condition.description)
self.error = f"Connection Error: {event.transport.condition.description}"

def on_connection_error(self, event):
self._logger.log("AsyncTestSender on_connection_error=%s" % event.connection.remote_condition.description)
self.error = f"Connection Error: {event.connection.remote_condition.description}"

def on_link_error(self, event):
self._logger.log("AsyncTestSender on_link_error=%s" % event.link.remote_condition.description)
self.error = f"Link Error: {event.link.remote_condition.description}"

def on_connection_opened(self, event):
self._logger.log("Connection opened")
Expand All @@ -1696,9 +1734,11 @@ def _check_if_done(self):
and (self.presettle
or (self.accepted + self.released + self.modified
+ self.rejected == self.sent)))
done = done or self.error is not None
if done and self._conn:
self.link_stats = get_link_info(self._link_name,
self.address)
if self._get_link_info:
self.link_stats = get_link_info(self._link_name,
self.address)
self._conn.close()
self._conn = None
self._logger.log("Connection closed")
Expand Down Expand Up @@ -1727,13 +1767,6 @@ def on_rejected(self, event):
event.delivery.settle()
self._logger.log("message %d rejected" % self.rejected)

def on_link_error(self, event):
self.error = "link error:%s" % str(event.link.remote_condition)
self._logger.log(self.error)
if self._conn:
self._conn.close()
self._conn = None

def on_disconnected(self, event):
# if remote terminates the connection kill the thread else it will spin
# on the cpu
Expand Down
2 changes: 1 addition & 1 deletion tests/system_tests_router_mesh.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def test_03_unavailable_link_attach(self):
ats.wait()
self.assertTrue(False) # expect exception
except AsyncTestSender.TestSenderException as exc:
self.assertIn("link error", ats.error)
self.assertIn("Link Error", ats.error)

def test_04_unavailable_anonymous_link_attach(self):
"""
Expand Down
189 changes: 186 additions & 3 deletions tests/system_tests_ssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import cproton
from proton import SASL, Url, SSLDomain, SSLUnavailable, ConnectionException
from proton import Message
from proton.utils import BlockingConnection

from system_test import TIMEOUT, TestCase, main_module, Qdrouterd, Process
Expand All @@ -36,6 +37,7 @@
from system_test import SERVER_CERTIFICATE, SERVER_PRIVATE_KEY, SERVER_PRIVATE_KEY_PASSWORD
from system_test import CLIENT2_CERTIFICATE, CLIENT2_PRIVATE_KEY, CLIENT2_PRIVATE_KEY_PASSWORD
from system_test import SERVER2_CERTIFICATE, SERVER2_PRIVATE_KEY, SERVER2_PRIVATE_KEY_PASSWORD
from system_test import AsyncTestSender, AsyncTestReceiver


def protocol_name(proto):
Expand Down Expand Up @@ -835,6 +837,7 @@ def test_mismatched_ca_and_no_hostname_verification(self):

class RouterTestSslProfileUpdate(RouterTestSslBase):
"""
Verify updates to the sslProfile configurations for inter-router connections.
"""
@classmethod
def setUpClass(cls):
Expand Down Expand Up @@ -915,9 +918,8 @@ def setUpClass(cls):
'port': cls.tester.get_port()}),
('sslProfile', {'name': 'ssl-profile',
'caCertFile': CA_CERT,
#'certFile': CLIENT_CERTIFICATE,
#'privateKeyFile': CLIENT_PRIVATE_KEY,
#'password': CLIENT_PRIVATE_KEY_PASSWORD}),
# listener2 does not request a client cert
# so no self identifying cert is necessary
}),
('connector', {'name': 'BConn1',
'host': 'localhost',
Expand Down Expand Up @@ -1013,5 +1015,186 @@ def check_nodes():
self.assertTrue(ok, f"Unexpected routers found: {get_router_nodes(self.main_router)}")


class RouterTestSslProfileUpdateClients(RouterTestSslBase):
"""
Verify updates to the sslProfile configurations for client connections
"""
@classmethod
def setUpClass(cls):
super(RouterTestSslProfileUpdateClients, cls).setUpClass()
if cls.DISABLE_SSL_TESTING:
cls.skipTest(cls.DISABLE_REASON)
if not SASL.extended():
cls.skipTest("Cyrus library not available. skipping test")

cls.listener1_port = cls.tester.get_port()
cls.listener2_port = cls.tester.get_port()

router_cfg = Qdrouterd.Config([
('router', {'id': 'Router1',
'mode': 'interior'}),
('listener', {'host': '0.0.0.0',
'role': 'normal',
'port': cls.tester.get_port()}),

# Listener1
('sslProfile', {'name': 'ssl-profile-L1',
'caCertFile': CA_CERT,
'certFile': SERVER_CERTIFICATE,
'privateKeyFile': SERVER_PRIVATE_KEY,
'password': SERVER_PRIVATE_KEY_PASSWORD}),
('listener', {'name': 'Listener1',
'host': 'localhost',
'role': 'normal',
'port': cls.listener1_port,
'requireSsl': 'true',
'authenticatePeer': 'true',
'saslMechanisms': 'EXTERNAL',
'requireEncryption': 'yes',
'sslProfile': 'ssl-profile-L1'}),

# Listener2
('sslProfile', {'name': 'ssl-profile-L2',
'caCertFile': CA2_CERT,
'certFile': SERVER2_CERTIFICATE,
'privateKeyFile': SERVER2_PRIVATE_KEY,
'password': SERVER2_PRIVATE_KEY_PASSWORD}),
('listener', {'name': 'Listener2',
'host': 'localhost',
'role': 'normal',
'port': cls.listener2_port,
'requireSsl': 'true',
'authenticatePeer': 'true',
'saslMechanisms': 'EXTERNAL',
'requireEncryption': 'yes',
'sslProfile': 'ssl-profile-L2'})
])
cls.router1 = cls.tester.qdrouterd("router1", router_cfg, wait=True)

def test_ssl_client_profile_update(self):
"""
Verify updates to the sslProfiles for client connections
"""

payload = "?" * 1024 * 65
payload += "TLS Message!"
message = Message(body=payload)

ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT)
ssl_domain.set_trusted_ca_db(CA_CERT)
ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME, CA_CERT)
ssl_domain.set_credentials(CLIENT_CERTIFICATE, CLIENT_PRIVATE_KEY, CLIENT_PRIVATE_KEY_PASSWORD)
conn_args = {'sasl_enabled': True,
'allowed_mechs': "EXTERNAL",
'ssl_domain': ssl_domain}
test_rx = AsyncTestReceiver(f"amqps://localhost:{self.listener1_port}",
source="test/addr",
container_id="FooRx",
conn_args=conn_args)

ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT)
ssl_domain.set_trusted_ca_db(CA2_CERT)
ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME, CA2_CERT)
ssl_domain.set_credentials(CLIENT2_CERTIFICATE, CLIENT2_PRIVATE_KEY, CLIENT2_PRIVATE_KEY_PASSWORD)
conn_args = {'sasl_enabled': True,
'allowed_mechs': "EXTERNAL",
'ssl_domain': ssl_domain}
test_tx = AsyncTestSender(f"amqps://localhost:{self.listener2_port}",
target="test/addr",
message=message,
container_id="FooTx",
conn_args=conn_args,
get_link_info=False)
test_tx.wait()
test_rx.stop()
self.assertEqual(1, test_rx.num_queue_puts, "expected 1 message")
msg = test_rx.queue.get()
self.assertIn("TLS Message!", msg.body, "missing payload")

#
# Now update the listeners certificates and test that clients using the
# old certs fail with verification errors
#

new_cfg = {'caCertFile': CA2_CERT,
'certFile': SERVER2_CERTIFICATE,
'privateKeyFile': SERVER2_PRIVATE_KEY,
'password': SERVER2_PRIVATE_KEY_PASSWORD}
self.router1.sk_manager.update(SSL_PROFILE_TYPE, new_cfg, name='ssl-profile-L1')

new_cfg = {'caCertFile': CA_CERT,
'certFile': SERVER_CERTIFICATE,
'privateKeyFile': SERVER_PRIVATE_KEY,
'password': SERVER_PRIVATE_KEY_PASSWORD}
self.router1.sk_manager.update(SSL_PROFILE_TYPE, new_cfg, name='ssl-profile-L2')

#
# Expect TLS connection failures:
#

ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT)
ssl_domain.set_trusted_ca_db(CA_CERT)
ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME, CA_CERT)
ssl_domain.set_credentials(CLIENT_CERTIFICATE, CLIENT_PRIVATE_KEY, CLIENT_PRIVATE_KEY_PASSWORD)
conn_args = {'sasl_enabled': True,
'allowed_mechs': "EXTERNAL",
'ssl_domain': ssl_domain}
with self.assertRaises(AsyncTestReceiver.TestReceiverException) as exc:
AsyncTestReceiver(f"amqps://localhost:{self.listener1_port}",
source="test/addr",
container_id="FooRx2",
conn_args=conn_args)
self.assertIn("certificate verify failed", str(exc.exception), f"{exc.exception}")

ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT)
ssl_domain.set_trusted_ca_db(CA2_CERT)
ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME, CA2_CERT)
ssl_domain.set_credentials(CLIENT2_CERTIFICATE, CLIENT2_PRIVATE_KEY, CLIENT2_PRIVATE_KEY_PASSWORD)
conn_args = {'sasl_enabled': True,
'allowed_mechs': "EXTERNAL",
'ssl_domain': ssl_domain}
with self.assertRaises(AsyncTestReceiver.TestReceiverException) as exc:
AsyncTestReceiver(f"amqps://localhost:{self.listener2_port}",
source="test/addr",
container_id="FooRx3",
conn_args=conn_args)
self.assertIn("certificate verify failed", str(exc.exception), f"{exc.exception}")

#
# Now verify clients can connect with the proper certifcates
#

ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT)
ssl_domain.set_trusted_ca_db(CA2_CERT)
ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME, CA2_CERT)
ssl_domain.set_credentials(CLIENT2_CERTIFICATE, CLIENT2_PRIVATE_KEY, CLIENT2_PRIVATE_KEY_PASSWORD)
conn_args = {'sasl_enabled': True,
'allowed_mechs': "EXTERNAL",
'ssl_domain': ssl_domain}
test_rx = AsyncTestReceiver(f"amqps://localhost:{self.listener1_port}",
source="test/addr",
container_id="FooRxOk",
conn_args=conn_args)

ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT)
ssl_domain.set_trusted_ca_db(CA_CERT)
ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME, CA_CERT)
ssl_domain.set_credentials(CLIENT_CERTIFICATE, CLIENT_PRIVATE_KEY, CLIENT_PRIVATE_KEY_PASSWORD)
conn_args = {'sasl_enabled': True,
'allowed_mechs': "EXTERNAL",
'ssl_domain': ssl_domain}
test_tx = AsyncTestSender(f"amqps://localhost:{self.listener2_port}",
target="test/addr",
message=message,
container_id="FooTxOk",
conn_args=conn_args,
get_link_info=False)
test_tx.wait()
test_rx.stop()
self.assertEqual(1, test_rx.num_queue_puts, "expected 1 message")
msg = test_rx.queue.get()
self.assertIn("TLS Message!", msg.body, "missing payload")


if __name__ == '__main__':
unittest.main(main_module())
Loading

0 comments on commit a71762d

Please sign in to comment.