Skip to content

Commit

Permalink
fixup: add test for error path
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Mar 14, 2024
1 parent 1839756 commit 59ec667
Showing 1 changed file with 124 additions and 0 deletions.
124 changes: 124 additions & 0 deletions tests/system_tests_tcp_adaptor_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
#
import os
import shutil
from system_test import unittest, TestCase, Qdrouterd, NcatException, Logger, Process, run_curl, \
CA_CERT, CLIENT_CERTIFICATE, CLIENT_PRIVATE_KEY, CLIENT_PRIVATE_KEY_PASSWORD, \
SERVER_CERTIFICATE, SERVER_PRIVATE_KEY, SERVER_PRIVATE_KEY_PASSWORD, SERVER_PRIVATE_KEY_NO_PASS, BAD_CA_CERT, \
Expand Down Expand Up @@ -801,3 +802,126 @@ def setUpClass(cls):
super(HttpOverTcpLiteTestTlsTwoRouterNginx, cls).setUpClass(encap='lite')
if skip_nginx_test():
return


class TcpAdaptorCertCorruptionTests(TestCase):
"""
The TCP adaptor re-loads the certificate files every time a new client or
server TCP connection is created. This test verifies that the router can
handle the case where the certificate files are removed or corrupted while
new connections are created.
"""
@classmethod
def setUpClass(cls):
super(TcpAdaptorCertCorruptionTests, cls).setUpClass()
cls.openssl_server_listening_port = cls.tester.get_port()
cls.router_listener_port = cls.tester.get_port()

# make a local copy of the client and server certificates. This test
# destroys these files, so we do not want to destroy the originals!

cls.server_cert_file = shutil.copy(SERVER_CERTIFICATE,
os.path.dirname(os.getcwd()))
cls.client_cert_file = shutil.copy(CLIENT_CERTIFICATE,
os.path.dirname(os.getcwd()))

# Set up two routers, TCP listener on one, connector on another

inter_router_port = cls.tester.get_port()

config_qdra = Qdrouterd.Config([
('router', {'mode': 'interior', 'id': 'QDR.A'}),
('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
('listener', {'role': 'inter-router', 'port': inter_router_port}),
('sslProfile', {'name': 'listener-ssl-profile',
'caCertFile': CA_CERT,
'certFile': cls.server_cert_file,
'privateKeyFile': SERVER_PRIVATE_KEY,
'password': SERVER_PRIVATE_KEY_PASSWORD}),
('tcpListener', {'port': cls.router_listener_port,
'address': 'corrupt/me',
'host': 'localhost',
'authenticatePeer': 'yes',
'encapsulation': 'lite',
'sslProfile': 'listener-ssl-profile'})
])

config_qdrb = Qdrouterd.Config([
('router', {'mode': 'interior', 'id': 'QDR.B'}),
('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
('connector', {'name': 'connectorToA', 'role': 'inter-router',
'port': inter_router_port}),
('sslProfile', {'name': 'connector-ssl-profile',
'caCertFile': CA_CERT,
'certFile': cls.client_cert_file,
'privateKeyFile': CLIENT_PRIVATE_KEY,
'password': CLIENT_PRIVATE_KEY_PASSWORD}),
('tcpConnector', {'port': cls.openssl_server_listening_port,
'address': 'corrupt/me',
'host': 'localhost',
'verifyHostname': 'yes',
'encapsulation': 'lite',
'sslProfile': 'connector-ssl-profile'}),
])

cls.router_qdra = cls.tester.qdrouterd("QDR.A", config_qdra, wait=True)
cls.router_qdrb = cls.tester.qdrouterd("QDR.B", config_qdrb)
cls.router_qdra.wait_router_connected('QDR.B')
cls.router_qdrb.wait_router_connected('QDR.A')
wait_tcp_listeners_up(cls.router_qdra.addresses[0])

def test_cert_corruption(self):
# Note we do not want to corrupt the certs held by the ssl server or
# client. We need them to be valid so the test client/server do not
# fail themselves. We only want the router to be affected!

server_ssl_info = dict()
server_ssl_info['CA_CERT'] = CA_CERT
server_ssl_info['SERVER_CERTIFICATE'] = SERVER_CERTIFICATE
server_ssl_info['SERVER_PRIVATE_KEY'] = SERVER_PRIVATE_KEY
server_ssl_info['SERVER_PRIVATE_KEY_PASSWORD'] = SERVER_PRIVATE_KEY_PASSWORD

openssl_server = self.tester.openssl_server
self.openssl_server = openssl_server(listening_port=self.openssl_server_listening_port,
ssl_info=server_ssl_info,
name="OpenSSLServerAuthPeer",
cl_args=['-Verify', '1'])

client_ssl_info = dict()
client_ssl_info['CA_CERT'] = CA_CERT
client_ssl_info['CLIENT_CERTIFICATE'] = CLIENT_CERTIFICATE
client_ssl_info['CLIENT_PRIVATE_KEY'] = CLIENT_PRIVATE_KEY
client_ssl_info['CLIENT_PRIVATE_KEY_PASSWORD'] = CLIENT_PRIVATE_KEY_PASSWORD

out, error = self.opensslclient(port=self.router_listener_port,
ssl_info=client_ssl_info,
data=b"Sanity Check the Configuration!")
self.assertIn(b"Verification: OK", out)
self.assertIn(b"Verify return code: 0 (ok)", out)

self.openssl_server.wait_out_message("Sanity Check the Configuration!")

#
# Phase 1: corrupt the server-facing certificate
#

with open(self.client_cert_file, mode="w") as clientf:
clientf.write("Oh mercy! I'm corrupt!")

_, _ = self.opensslclient(port=self.router_listener_port,
ssl_info=client_ssl_info,
data=b"Phase 1")
self.router_qdrb.wait_log_message(r'failed to set TLS certificate')

#
# Phase 1: corrupt the client-facing certificate
#

with open(self.server_cert_file, mode="w") as serverf:
serverf.write("Alas! I too have been rendered corrupt!")

_, _ = self.opensslclient(port=self.router_listener_port,
ssl_info=client_ssl_info,
data=b"Phase 2",
expect=Process.EXIT_FAIL)
self.router_qdra.wait_log_message(r'failed to set TLS certificate')

0 comments on commit 59ec667

Please sign in to comment.