diff --git a/tests/system_tests_tcp_adaptor_tls.py b/tests/system_tests_tcp_adaptor_tls.py index 23b38824e..85b7af61b 100644 --- a/tests/system_tests_tcp_adaptor_tls.py +++ b/tests/system_tests_tcp_adaptor_tls.py @@ -17,6 +17,7 @@ # under the License. # import os +import shutil from system_test import unittest, TestCase, Qdrouterd, NcatException, Logger, Process, run_curl, \ CA_CERT, CLIENT_CERTIFICATE, CLIENT_PRIVATE_KEY, CLIENT_PRIVATE_KEY_PASSWORD, \ SERVER_CERTIFICATE, SERVER_PRIVATE_KEY, SERVER_PRIVATE_KEY_PASSWORD, SERVER_PRIVATE_KEY_NO_PASS, BAD_CA_CERT, \ @@ -801,3 +802,126 @@ def setUpClass(cls): super(HttpOverTcpLiteTestTlsTwoRouterNginx, cls).setUpClass(encap='lite') if skip_nginx_test(): return + + +class TcpAdaptorCertCorruptionTests(TestCase): + """ + The TCP adaptor re-loads the certificate files every time a new client or + server TCP connection is created. This test verifies that the router can + handle the case where the certificate files are removed or corrupted while + new connections are created. + """ + @classmethod + def setUpClass(cls): + super(TcpAdaptorCertCorruptionTests, cls).setUpClass() + cls.openssl_server_listening_port = cls.tester.get_port() + cls.router_listener_port = cls.tester.get_port() + + # make a local copy of the client and server certificates. This test + # destroys these files, so we do not want to destroy the originals! + + cls.server_cert_file = shutil.copy(SERVER_CERTIFICATE, + os.path.dirname(os.getcwd())) + cls.client_cert_file = shutil.copy(CLIENT_CERTIFICATE, + os.path.dirname(os.getcwd())) + + # Set up two routers, TCP listener on one, connector on another + + inter_router_port = cls.tester.get_port() + + config_qdra = Qdrouterd.Config([ + ('router', {'mode': 'interior', 'id': 'QDR.A'}), + ('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}), + ('listener', {'role': 'inter-router', 'port': inter_router_port}), + ('sslProfile', {'name': 'listener-ssl-profile', + 'caCertFile': CA_CERT, + 'certFile': cls.server_cert_file, + 'privateKeyFile': SERVER_PRIVATE_KEY, + 'password': SERVER_PRIVATE_KEY_PASSWORD}), + ('tcpListener', {'port': cls.router_listener_port, + 'address': 'corrupt/me', + 'host': 'localhost', + 'authenticatePeer': 'yes', + 'encapsulation': 'lite', + 'sslProfile': 'listener-ssl-profile'}) + ]) + + config_qdrb = Qdrouterd.Config([ + ('router', {'mode': 'interior', 'id': 'QDR.B'}), + ('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}), + ('connector', {'name': 'connectorToA', 'role': 'inter-router', + 'port': inter_router_port}), + ('sslProfile', {'name': 'connector-ssl-profile', + 'caCertFile': CA_CERT, + 'certFile': cls.client_cert_file, + 'privateKeyFile': CLIENT_PRIVATE_KEY, + 'password': CLIENT_PRIVATE_KEY_PASSWORD}), + ('tcpConnector', {'port': cls.openssl_server_listening_port, + 'address': 'corrupt/me', + 'host': 'localhost', + 'verifyHostname': 'yes', + 'encapsulation': 'lite', + 'sslProfile': 'connector-ssl-profile'}), + ]) + + cls.router_qdra = cls.tester.qdrouterd("QDR.A", config_qdra, wait=True) + cls.router_qdrb = cls.tester.qdrouterd("QDR.B", config_qdrb) + cls.router_qdra.wait_router_connected('QDR.B') + cls.router_qdrb.wait_router_connected('QDR.A') + wait_tcp_listeners_up(cls.router_qdra.addresses[0]) + + def test_cert_corruption(self): + # Note we do not want to corrupt the certs held by the ssl server or + # client. We need them to be valid so the test client/server do not + # fail themselves. We only want the router to be affected! + + server_ssl_info = dict() + server_ssl_info['CA_CERT'] = CA_CERT + server_ssl_info['SERVER_CERTIFICATE'] = SERVER_CERTIFICATE + server_ssl_info['SERVER_PRIVATE_KEY'] = SERVER_PRIVATE_KEY + server_ssl_info['SERVER_PRIVATE_KEY_PASSWORD'] = SERVER_PRIVATE_KEY_PASSWORD + + openssl_server = self.tester.openssl_server + self.openssl_server = openssl_server(listening_port=self.openssl_server_listening_port, + ssl_info=server_ssl_info, + name="OpenSSLServerAuthPeer", + cl_args=['-Verify', '1']) + + client_ssl_info = dict() + client_ssl_info['CA_CERT'] = CA_CERT + client_ssl_info['CLIENT_CERTIFICATE'] = CLIENT_CERTIFICATE + client_ssl_info['CLIENT_PRIVATE_KEY'] = CLIENT_PRIVATE_KEY + client_ssl_info['CLIENT_PRIVATE_KEY_PASSWORD'] = CLIENT_PRIVATE_KEY_PASSWORD + + out, error = self.opensslclient(port=self.router_listener_port, + ssl_info=client_ssl_info, + data=b"Sanity Check the Configuration!") + self.assertIn(b"Verification: OK", out) + self.assertIn(b"Verify return code: 0 (ok)", out) + + self.openssl_server.wait_out_message("Sanity Check the Configuration!") + + # + # Phase 1: corrupt the server-facing certificate + # + + with open(self.client_cert_file, mode="w") as clientf: + clientf.write("Oh mercy! I'm corrupt!") + + _, _ = self.opensslclient(port=self.router_listener_port, + ssl_info=client_ssl_info, + data=b"Phase 1") + self.router_qdrb.wait_log_message(r'failed to set TLS certificate') + + # + # Phase 1: corrupt the client-facing certificate + # + + with open(self.server_cert_file, mode="w") as serverf: + serverf.write("Alas! I too have been rendered corrupt!") + + _, _ = self.opensslclient(port=self.router_listener_port, + ssl_info=client_ssl_info, + data=b"Phase 2", + expect=Process.EXIT_FAIL) + self.router_qdra.wait_log_message(r'failed to set TLS certificate')