From 9a77d54b7c8ddfd75ce15815d038959227ebfc0d Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Thu, 22 Aug 2024 09:53:19 -0400 Subject: [PATCH] fixup: add more strenuous AMQP profile update tests --- tests/system_test.py | 65 ++++++--- tests/system_tests_router_mesh.py | 2 +- tests/system_tests_ssl.py | 189 +++++++++++++++++++++++++- tests/system_tests_tcp_adaptor_tls.py | 57 ++++---- tests/system_tests_topology.py | 10 +- 5 files changed, 278 insertions(+), 45 deletions(-) diff --git a/tests/system_test.py b/tests/system_test.py index 836d51b09..ede880018 100755 --- a/tests/system_test.py +++ b/tests/system_test.py @@ -1496,6 +1496,10 @@ class AsyncTestReceiver(MessagingHandler): """ Empty = Queue.Empty + class TestReceiverException(Exception): + def __init__(self, error=None): + super(AsyncTestReceiver.TestReceiverException, self).__init__(error) + class MyQueue(Queue.Queue): def __init__(self, receiver): self._async_receiver = receiver @@ -1537,8 +1541,13 @@ def __init__(self, address, source, conn_args=None, container_id=None, self._thread.start() self.num_queue_puts = 0 self.num_queue_gets = 0 - if wait and self._ready.wait(timeout=TIMEOUT) is False: - raise Exception("Timed out waiting for receiver start") + self._error = None + if wait: + ready = self._ready.wait(timeout=TIMEOUT) + if ready is False: + raise AsyncTestReceiver.TestReceiverException("Timed out waiting for receiver start") + elif self._error is not None: + raise AsyncTestReceiver.TestReceiverException(self._error) self.queue_stats = "self.num_queue_puts=%d, self.num_queue_gets=%d" def get_queue_stats(self): @@ -1553,23 +1562,34 @@ def _main(self): if self._conn: self._conn.close() self._conn = None + self._ready.set() self._logger.log("AsyncTestReceiver reactor thread done") + def on_transport_error(self, event): + self._logger.log("AsyncTestReceiver on_transport_error=%s" % event.transport.condition.description) + self._error = f"Connection Error: {event.transport.condition.description}" + self._stop_thread = True + def on_connection_error(self, event): self._logger.log("AsyncTestReceiver on_connection_error=%s" % event.connection.remote_condition.description) + self._error = f"Connection Error: {event.connection.remote_condition.description}" + self._stop_thread = True def on_link_error(self, event): self._logger.log("AsyncTestReceiver on_link_error=%s" % event.link.remote_condition.description) + self._error = f"Link Error: {event.link.remote_condition.description}" + self._stop_thread = True def stop(self, timeout=TIMEOUT): self._stop_thread = True self._container.wakeup() self._thread.join(timeout=TIMEOUT) - self._logger.log("thread done") if self._thread.is_alive(): - raise Exception("AsyncTestReceiver did not exit") + raise AsyncTestReceiver.TestReceiverException("AsyncTestReceiver did not exit") del self._conn del self._container + if self._error is not None: + raise AsyncTestReceiver.TestReceiverException(self._error) def on_start(self, event): kwargs = {'url': self.address} @@ -1623,7 +1643,8 @@ def __init__(self, error=None): super(AsyncTestSender.TestSenderException, self).__init__(error) def __init__(self, address, target, count=1, message=None, - container_id=None, presettle=False, print_to_console=False): + container_id=None, presettle=False, print_to_console=False, + conn_args=None, get_link_info=True): super(AsyncTestSender, self).__init__(auto_accept=False, auto_settle=False) self.address = address @@ -1638,6 +1659,8 @@ def __init__(self, address, target, count=1, message=None, self.error = None self.link_stats = None self._conn = None + self.conn_args = conn_args + self._get_link_info = get_link_info self._sender = None self._message = message or Message(body="test") self._container = Container(self) @@ -1667,7 +1690,7 @@ def wait(self): self._thread.join(timeout=TIMEOUT) self._logger.log("AsyncTestSender wait: thread done") assert not self._thread.is_alive(), "sender did not complete" - if self.error: + if self.error is not None: raise AsyncTestSender.TestSenderException(self.error) del self._sender del self._conn @@ -1675,7 +1698,22 @@ def wait(self): self._logger.log("AsyncTestSender wait: no errors in wait") def on_start(self, event): - self._conn = self._container.connect(self.address) + kwargs = {'url': self.address} + if self.conn_args is not None: + kwargs.update(self.conn_args) + self._conn = self._container.connect(**kwargs) + + def on_transport_error(self, event): + self._logger.log("AsyncTestSender on_transport_error=%s" % event.transport.condition.description) + self.error = f"Connection Error: {event.transport.condition.description}" + + def on_connection_error(self, event): + self._logger.log("AsyncTestSender on_connection_error=%s" % event.connection.remote_condition.description) + self.error = f"Connection Error: {event.connection.remote_condition.description}" + + def on_link_error(self, event): + self._logger.log("AsyncTestSender on_link_error=%s" % event.link.remote_condition.description) + self.error = f"Link Error: {event.link.remote_condition.description}" def on_connection_opened(self, event): self._logger.log("Connection opened") @@ -1696,9 +1734,11 @@ def _check_if_done(self): and (self.presettle or (self.accepted + self.released + self.modified + self.rejected == self.sent))) + done = done or self.error is not None if done and self._conn: - self.link_stats = get_link_info(self._link_name, - self.address) + if self._get_link_info: + self.link_stats = get_link_info(self._link_name, + self.address) self._conn.close() self._conn = None self._logger.log("Connection closed") @@ -1727,13 +1767,6 @@ def on_rejected(self, event): event.delivery.settle() self._logger.log("message %d rejected" % self.rejected) - def on_link_error(self, event): - self.error = "link error:%s" % str(event.link.remote_condition) - self._logger.log(self.error) - if self._conn: - self._conn.close() - self._conn = None - def on_disconnected(self, event): # if remote terminates the connection kill the thread else it will spin # on the cpu diff --git a/tests/system_tests_router_mesh.py b/tests/system_tests_router_mesh.py index f0242dc40..2fe9ed77a 100644 --- a/tests/system_tests_router_mesh.py +++ b/tests/system_tests_router_mesh.py @@ -166,7 +166,7 @@ def test_03_unavailable_link_attach(self): ats.wait() self.assertTrue(False) # expect exception except AsyncTestSender.TestSenderException as exc: - self.assertIn("link error", ats.error) + self.assertIn("Link Error", ats.error) def test_04_unavailable_anonymous_link_attach(self): """ diff --git a/tests/system_tests_ssl.py b/tests/system_tests_ssl.py index e1ebb4baa..bd1778cac 100644 --- a/tests/system_tests_ssl.py +++ b/tests/system_tests_ssl.py @@ -27,6 +27,7 @@ import cproton from proton import SASL, Url, SSLDomain, SSLUnavailable, ConnectionException +from proton import Message from proton.utils import BlockingConnection from system_test import TIMEOUT, TestCase, main_module, Qdrouterd, Process @@ -36,6 +37,7 @@ from system_test import SERVER_CERTIFICATE, SERVER_PRIVATE_KEY, SERVER_PRIVATE_KEY_PASSWORD from system_test import CLIENT2_CERTIFICATE, CLIENT2_PRIVATE_KEY, CLIENT2_PRIVATE_KEY_PASSWORD from system_test import SERVER2_CERTIFICATE, SERVER2_PRIVATE_KEY, SERVER2_PRIVATE_KEY_PASSWORD +from system_test import AsyncTestSender, AsyncTestReceiver def protocol_name(proto): @@ -835,6 +837,7 @@ def test_mismatched_ca_and_no_hostname_verification(self): class RouterTestSslProfileUpdate(RouterTestSslBase): """ + Verify updates to the sslProfile configurations for inter-router connections. """ @classmethod def setUpClass(cls): @@ -915,9 +918,8 @@ def setUpClass(cls): 'port': cls.tester.get_port()}), ('sslProfile', {'name': 'ssl-profile', 'caCertFile': CA_CERT, - #'certFile': CLIENT_CERTIFICATE, - #'privateKeyFile': CLIENT_PRIVATE_KEY, - #'password': CLIENT_PRIVATE_KEY_PASSWORD}), + # listener2 does not request a client cert + # so no self identifying cert is necessary }), ('connector', {'name': 'BConn1', 'host': 'localhost', @@ -1013,5 +1015,186 @@ def check_nodes(): self.assertTrue(ok, f"Unexpected routers found: {get_router_nodes(self.main_router)}") +class RouterTestSslProfileUpdateClients(RouterTestSslBase): + """ + Verify updates to the sslProfile configurations for client connections + """ + @classmethod + def setUpClass(cls): + super(RouterTestSslProfileUpdateClients, cls).setUpClass() + if cls.DISABLE_SSL_TESTING: + cls.skipTest(cls.DISABLE_REASON) + if not SASL.extended(): + cls.skipTest("Cyrus library not available. skipping test") + + cls.listener1_port = cls.tester.get_port() + cls.listener2_port = cls.tester.get_port() + + router_cfg = Qdrouterd.Config([ + ('router', {'id': 'Router1', + 'mode': 'interior'}), + ('listener', {'host': '0.0.0.0', + 'role': 'normal', + 'port': cls.tester.get_port()}), + + # Listener1 + ('sslProfile', {'name': 'ssl-profile-L1', + 'caCertFile': CA_CERT, + 'certFile': SERVER_CERTIFICATE, + 'privateKeyFile': SERVER_PRIVATE_KEY, + 'password': SERVER_PRIVATE_KEY_PASSWORD}), + ('listener', {'name': 'Listener1', + 'host': 'localhost', + 'role': 'normal', + 'port': cls.listener1_port, + 'requireSsl': 'true', + 'authenticatePeer': 'true', + 'saslMechanisms': 'EXTERNAL', + 'requireEncryption': 'yes', + 'sslProfile': 'ssl-profile-L1'}), + + # Listener2 + ('sslProfile', {'name': 'ssl-profile-L2', + 'caCertFile': CA2_CERT, + 'certFile': SERVER2_CERTIFICATE, + 'privateKeyFile': SERVER2_PRIVATE_KEY, + 'password': SERVER2_PRIVATE_KEY_PASSWORD}), + ('listener', {'name': 'Listener2', + 'host': 'localhost', + 'role': 'normal', + 'port': cls.listener2_port, + 'requireSsl': 'true', + 'authenticatePeer': 'true', + 'saslMechanisms': 'EXTERNAL', + 'requireEncryption': 'yes', + 'sslProfile': 'ssl-profile-L2'}) + ]) + cls.router1 = cls.tester.qdrouterd("router1", router_cfg, wait=True) + + def test_ssl_client_profile_update(self): + """ + Verify updates to the sslProfiles for client connections + """ + + payload = "?" * 1024 * 65 + payload += "TLS Message!" + message = Message(body=payload) + + ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT) + ssl_domain.set_trusted_ca_db(CA_CERT) + ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME, CA_CERT) + ssl_domain.set_credentials(CLIENT_CERTIFICATE, CLIENT_PRIVATE_KEY, CLIENT_PRIVATE_KEY_PASSWORD) + conn_args = {'sasl_enabled': True, + 'allowed_mechs': "EXTERNAL", + 'ssl_domain': ssl_domain} + test_rx = AsyncTestReceiver(f"amqps://localhost:{self.listener1_port}", + source="test/addr", + container_id="FooRx", + conn_args=conn_args) + + ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT) + ssl_domain.set_trusted_ca_db(CA2_CERT) + ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME, CA2_CERT) + ssl_domain.set_credentials(CLIENT2_CERTIFICATE, CLIENT2_PRIVATE_KEY, CLIENT2_PRIVATE_KEY_PASSWORD) + conn_args = {'sasl_enabled': True, + 'allowed_mechs': "EXTERNAL", + 'ssl_domain': ssl_domain} + test_tx = AsyncTestSender(f"amqps://localhost:{self.listener2_port}", + target="test/addr", + message=message, + container_id="FooTx", + conn_args=conn_args, + get_link_info=False) + test_tx.wait() + test_rx.stop() + self.assertEqual(1, test_rx.num_queue_puts, "expected 1 message") + msg = test_rx.queue.get() + self.assertIn("TLS Message!", msg.body, "missing payload") + + # + # Now update the listeners certificates and test that clients using the + # old certs fail with verification errors + # + + new_cfg = {'caCertFile': CA2_CERT, + 'certFile': SERVER2_CERTIFICATE, + 'privateKeyFile': SERVER2_PRIVATE_KEY, + 'password': SERVER2_PRIVATE_KEY_PASSWORD} + self.router1.sk_manager.update(SSL_PROFILE_TYPE, new_cfg, name='ssl-profile-L1') + + new_cfg = {'caCertFile': CA_CERT, + 'certFile': SERVER_CERTIFICATE, + 'privateKeyFile': SERVER_PRIVATE_KEY, + 'password': SERVER_PRIVATE_KEY_PASSWORD} + self.router1.sk_manager.update(SSL_PROFILE_TYPE, new_cfg, name='ssl-profile-L2') + + # + # Expect TLS connection failures: + # + + ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT) + ssl_domain.set_trusted_ca_db(CA_CERT) + ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME, CA_CERT) + ssl_domain.set_credentials(CLIENT_CERTIFICATE, CLIENT_PRIVATE_KEY, CLIENT_PRIVATE_KEY_PASSWORD) + conn_args = {'sasl_enabled': True, + 'allowed_mechs': "EXTERNAL", + 'ssl_domain': ssl_domain} + with self.assertRaises(AsyncTestReceiver.TestReceiverException) as exc: + AsyncTestReceiver(f"amqps://localhost:{self.listener1_port}", + source="test/addr", + container_id="FooRx2", + conn_args=conn_args) + self.assertIn("certificate verify failed", str(exc.exception), f"{exc.exception}") + + ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT) + ssl_domain.set_trusted_ca_db(CA2_CERT) + ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME, CA2_CERT) + ssl_domain.set_credentials(CLIENT2_CERTIFICATE, CLIENT2_PRIVATE_KEY, CLIENT2_PRIVATE_KEY_PASSWORD) + conn_args = {'sasl_enabled': True, + 'allowed_mechs': "EXTERNAL", + 'ssl_domain': ssl_domain} + with self.assertRaises(AsyncTestReceiver.TestReceiverException) as exc: + AsyncTestReceiver(f"amqps://localhost:{self.listener2_port}", + source="test/addr", + container_id="FooRx3", + conn_args=conn_args) + self.assertIn("certificate verify failed", str(exc.exception), f"{exc.exception}") + + # + # Now verify clients can connect with the proper certifcates + # + + ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT) + ssl_domain.set_trusted_ca_db(CA2_CERT) + ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME, CA2_CERT) + ssl_domain.set_credentials(CLIENT2_CERTIFICATE, CLIENT2_PRIVATE_KEY, CLIENT2_PRIVATE_KEY_PASSWORD) + conn_args = {'sasl_enabled': True, + 'allowed_mechs': "EXTERNAL", + 'ssl_domain': ssl_domain} + test_rx = AsyncTestReceiver(f"amqps://localhost:{self.listener1_port}", + source="test/addr", + container_id="FooRxOk", + conn_args=conn_args) + + ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT) + ssl_domain.set_trusted_ca_db(CA_CERT) + ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME, CA_CERT) + ssl_domain.set_credentials(CLIENT_CERTIFICATE, CLIENT_PRIVATE_KEY, CLIENT_PRIVATE_KEY_PASSWORD) + conn_args = {'sasl_enabled': True, + 'allowed_mechs': "EXTERNAL", + 'ssl_domain': ssl_domain} + test_tx = AsyncTestSender(f"amqps://localhost:{self.listener2_port}", + target="test/addr", + message=message, + container_id="FooTxOk", + conn_args=conn_args, + get_link_info=False) + test_tx.wait() + test_rx.stop() + self.assertEqual(1, test_rx.num_queue_puts, "expected 1 message") + msg = test_rx.queue.get() + self.assertIn("TLS Message!", msg.body, "missing payload") + + if __name__ == '__main__': unittest.main(main_module()) diff --git a/tests/system_tests_tcp_adaptor_tls.py b/tests/system_tests_tcp_adaptor_tls.py index 97c5d964c..247391afe 100644 --- a/tests/system_tests_tcp_adaptor_tls.py +++ b/tests/system_tests_tcp_adaptor_tls.py @@ -819,6 +819,7 @@ def test_ssl_profile_update(self): Test management updates to the listener and connector sslProfile configurations """ + payload = b'?' * 1024 * 65 server_ssl_info = dict() server_ssl_info['CA_CERT'] = CA_CERT server_ssl_info['SERVER_CERTIFICATE'] = SERVER_CERTIFICATE @@ -840,7 +841,7 @@ def test_ssl_profile_update(self): out, error = self.opensslclient(port=self.router_listener_port, ssl_info=client_ssl_info, - data=b"Sanity Check the Configuration!", + data=b"Sanity Check the Configuration!" + payload, cl_args=['-verify', '10', '-verify_return_error']) self.assertIn(b"Verification: OK", out, f"{error}") @@ -875,7 +876,7 @@ def test_ssl_profile_update(self): out, error = self.opensslclient(port=self.router_listener_port, ssl_info=client_ssl_info, - data=b"Hey password is good!", + data=b"Hey password is good!" + payload, cl_args=['-verify', '10', '-verify_return_error']) self.assertIn(b"Verification: OK", out, f"{error}") @@ -884,30 +885,36 @@ def test_ssl_profile_update(self): openssl_server.wait_out_message("Hey password is good!") # - # Now update the sslProfile with a valid config, but one that will not - # allow the client to connect + # Now update the listener sslProfile with a valid config, but one that + # will not allow the client to connect # + new_cfg = {'caCertFile': CA2_CERT, + 'certFile': SERVER2_CERTIFICATE, + 'privateKeyFile': SERVER2_PRIVATE_KEY, + 'password': SERVER2_PRIVATE_KEY_PASSWORD} + skmgr_a.update(SSL_PROFILE_TYPE, new_cfg, name='listener-ssl-profile') - skmgr_a.update(SSL_PROFILE_TYPE, {'caCertFile': BAD_CA_CERT}, - name='listener-ssl-profile') - - _, _ = self.opensslclient(port=self.router_listener_port, - ssl_info=client_ssl_info, - data=b"The CA will not allow this!", - cl_args=['-verify', '10', - '-verify_return_error']) + out, error = self.opensslclient(port=self.router_listener_port, + ssl_info=client_ssl_info, + data=b"The CA will not allow this!" + payload, + expect=Process.EXIT_FAIL, + cl_args=['-verify', '10', + '-verify_return_error']) self.router_qdra.wait_log_message(r'TLS connection failed') # - # Restore the sslProfile configuration and verify all is well + # Update the client ssl_info to use a compatible client cert and verify + # all is well: # - skmgr_a.update(SSL_PROFILE_TYPE, {'caCertFile': CA_CERT}, - name='listener-ssl-profile') - + client_ssl_info = dict() + client_ssl_info['CA_CERT'] = CA2_CERT + client_ssl_info['CLIENT_CERTIFICATE'] = CLIENT2_CERTIFICATE + client_ssl_info['CLIENT_PRIVATE_KEY'] = CLIENT2_PRIVATE_KEY + client_ssl_info['CLIENT_PRIVATE_KEY_PASSWORD'] = CLIENT2_PRIVATE_KEY_PASSWORD out, error = self.opensslclient(port=self.router_listener_port, ssl_info=client_ssl_info, - data=b"Hey we recovered!", + data=b"Hey we recovered!" + payload, cl_args=['-verify', '10', '-verify_return_error']) self.assertIn(b"Verification: OK", out, f"{error}") @@ -919,7 +926,11 @@ def test_ssl_profile_update(self): # Test updates on the connector sslProfile # + # # start a new ssl server that uses an incompatible TLS configuration + # The connection should fail when router QDR.B attempts to connect to + # it + # openssl_server.teardown() server_ssl_info = dict() @@ -933,11 +944,11 @@ def test_ssl_profile_update(self): cl_args=['-Verify', '1', '-verify_return_error']) - _, _ = self.opensslclient(port=self.router_listener_port, - ssl_info=client_ssl_info, - data=b"The server conn must fail", - cl_args=['-verify', '10', - '-verify_return_error']) + out, error = self.opensslclient(port=self.router_listener_port, + ssl_info=client_ssl_info, + data=b"The server conn must fail" + payload, + cl_args=['-verify', '10', + '-verify_return_error']) self.router_qdrb.wait_log_message(r'TLS connection failed') with open(openssl_server.outfile_path, 'rt') as out_file: self.assertFalse(is_pattern_present(out_file, @@ -955,7 +966,7 @@ def test_ssl_profile_update(self): skmgr_b.update(SSL_PROFILE_TYPE, new_cfg, name='connector-ssl-profile') out, error = self.opensslclient(port=self.router_listener_port, ssl_info=client_ssl_info, - data=b"The server conn must succeed!", + data=b"The server conn must succeed!" + payload, cl_args=['-verify', '10', '-verify_return_error']) self.assertIn(b"Verification: OK", out, f"{error}") diff --git a/tests/system_tests_topology.py b/tests/system_tests_topology.py index 89ead526e..aa43b2fcf 100644 --- a/tests/system_tests_topology.py +++ b/tests/system_tests_topology.py @@ -764,7 +764,10 @@ def test_01_reboot_INT_A(self): # stop consumers so INT_A's route table will be different when it comes # back online so it will require an immediate sync for c in consumers: - c.stop() + with self.assertRaises(AsyncTestReceiver.TestReceiverException) as exc: + c.stop() + # expect that the connection failed due to the teardown of INT_A + self.assertIn("connection aborted", str(exc.exception), f"{exc.exception}") time.sleep(1.0) INT_A = self._create_router('INT.A', @@ -808,7 +811,10 @@ def test_02_shutdown_INT_A(self): # propagated to INT_C. Now remove INT_A INT_A.teardown() for c in consumers: - c.stop() + with self.assertRaises(AsyncTestReceiver.TestReceiverException) as exc: + c.stop() + # expect that the connection failed due to the teardown of INT_A + self.assertIn("connection aborted", str(exc.exception), f"{exc.exception}") start = time.time()