Skip to content

Commit

Permalink
fixup: add test for error path
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Mar 14, 2024
1 parent 1839756 commit 59c717f
Showing 1 changed file with 125 additions and 0 deletions.
125 changes: 125 additions & 0 deletions tests/system_tests_tcp_adaptor_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
#
import os
import shutil
from system_test import unittest, TestCase, Qdrouterd, NcatException, Logger, Process, run_curl, \
CA_CERT, CLIENT_CERTIFICATE, CLIENT_PRIVATE_KEY, CLIENT_PRIVATE_KEY_PASSWORD, \
SERVER_CERTIFICATE, SERVER_PRIVATE_KEY, SERVER_PRIVATE_KEY_PASSWORD, SERVER_PRIVATE_KEY_NO_PASS, BAD_CA_CERT, \
Expand Down Expand Up @@ -801,3 +802,127 @@ def setUpClass(cls):
super(HttpOverTcpLiteTestTlsTwoRouterNginx, cls).setUpClass(encap='lite')
if skip_nginx_test():
return


class TcpAdaptorCertCorruptionTests(TestCase):
"""
The TCP adaptor re-loads the certificate files every time a new client or
server TCP connection is created. This test verifies that the router can
handle the case where the certificate files are removed or corrupted while
new connections are created.
"""
@classmethod
def setUpClass(cls):
super(TcpAdaptorCertCorruptionTests, cls).setUpClass()
cls.openssl_server_listening_port = cls.tester.get_port()
cls.router_listener_port = cls.tester.get_port()

# make a local copy of the client and server certificates. This test
# destroys these files, so we do not want to destroy the originals!

cls.server_cert_file = shutil.copy(SERVER_CERTIFICATE,
os.path.dirname(os.getcwd()))
cls.client_cert_file = shutil.copy(CLIENT_CERTIFICATE,
os.path.dirname(os.getcwd()))

# Set up two routers, TCP listener on one, connector on another

inter_router_port = cls.tester.get_port()

config_qdra = Qdrouterd.Config([
('router', {'mode': 'interior', 'id': 'QDR.A'}),
('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
('listener', {'role': 'inter-router', 'port': inter_router_port}),
('sslProfile', {'name': 'listener-ssl-profile',
'caCertFile': CA_CERT,
'certFile': cls.server_cert_file,
'privateKeyFile': SERVER_PRIVATE_KEY,
'password': SERVER_PRIVATE_KEY_PASSWORD}),
('tcpListener', {'port': cls.router_listener_port,
'address': 'corrupt/me',
'host': 'localhost',
'authenticatePeer': 'yes',
'encapsulation': 'lite',
'sslProfile': 'listener-ssl-profile'})
])

config_qdrb = Qdrouterd.Config([
('router', {'mode': 'interior', 'id': 'QDR.B'}),
('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
('connector', {'name': 'connectorToA', 'role': 'inter-router',
'port': inter_router_port}),
('sslProfile', {'name': 'connector-ssl-profile',
'caCertFile': CA_CERT,
'certFile': cls.client_cert_file,
'privateKeyFile': CLIENT_PRIVATE_KEY,
'password': CLIENT_PRIVATE_KEY_PASSWORD}),
('tcpConnector', {'port': cls.openssl_server_listening_port,
'address': 'corrupt/me',
'host': 'localhost',
'verifyHostname': 'yes',
'encapsulation': 'lite',
'sslProfile': 'connector-ssl-profile'}),
])

cls.router_qdra = cls.tester.qdrouterd("QDR.A", config_qdra, wait=True)
cls.router_qdrb = cls.tester.qdrouterd("QDR.B", config_qdrb)
cls.router_qdra.wait_router_connected('QDR.B')
cls.router_qdrb.wait_router_connected('QDR.A')
wait_tcp_listeners_up(cls.router_qdra.addresses[0])

def test_cert_corruption(self):
# Note we do not want to corrupt the certs held by the ssl server or
# client. We need them to be valid so the test client/server do not
# fail themselves. We only want the router to be affected!

server_ssl_info = dict()
server_ssl_info['CA_CERT'] = CA_CERT
server_ssl_info['SERVER_CERTIFICATE'] = SERVER_CERTIFICATE
server_ssl_info['SERVER_PRIVATE_KEY'] = SERVER_PRIVATE_KEY
server_ssl_info['SERVER_PRIVATE_KEY_PASSWORD'] = SERVER_PRIVATE_KEY_PASSWORD

openssl_server = self.tester.openssl_server
self.openssl_server = openssl_server(listening_port=self.openssl_server_listening_port,
ssl_info=server_ssl_info,
name="OpenSSLServerAuthPeer",
cl_args=['-Verify', '1'])

client_ssl_info = dict()
client_ssl_info['CA_CERT'] = CA_CERT
client_ssl_info['CLIENT_CERTIFICATE'] = CLIENT_CERTIFICATE
client_ssl_info['CLIENT_PRIVATE_KEY'] = CLIENT_PRIVATE_KEY
client_ssl_info['CLIENT_PRIVATE_KEY_PASSWORD'] = CLIENT_PRIVATE_KEY_PASSWORD

out, error = self.opensslclient(port=self.router_listener_port,
ssl_info=client_ssl_info,
data=b"Sanity Check the Configuration!")
self.assertIn(b"Verification: OK", out)
self.assertIn(b"Verify return code: 0 (ok)", out)

self.openssl_server.wait_out_message("Sanity Check the Configuration!")

#
# Phase 1: corrupt the server-facing certificate
#

with open(self.client_cert_file, mode="w") as clientf:
clientf.write("Oh mercy! I'm corrupt!")

_, _ = self.opensslclient(port=self.router_listener_port,
ssl_info=client_ssl_info,
data=b"Phase 1")
self.router_qdrb.wait_log_message(r'failed to set TLS certificate')

#
# Phase 1: corrupt the client-facing certificate
#

with open(self.server_cert_file, mode="w") as serverf:
serverf.write("Alas! I too have been rendered corrupt!")

_, _ = self.opensslclient(port=self.router_listener_port,
ssl_info=client_ssl_info,
data=b"Phase 2",
expect=Process.EXIT_FAIL)
self.router_qdra.wait_log_message(r'failed to set TLS certificate')

0 comments on commit 59c717f

Please sign in to comment.