Skip to content

Commit

Permalink
more debug
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Sep 18, 2024
1 parent 01e5b4f commit 5b30012
Showing 1 changed file with 139 additions and 0 deletions.
139 changes: 139 additions & 0 deletions tests/system_tests_tcp_adaptor_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,7 @@ def setUpClass(cls):
cls.router_qdrb.wait_router_connected('QDR.A')
wait_tcp_listeners_up(cls.router_qdra.addresses[0])

@unittest.skip("Disabled for now")
def test_ssl_profile_update(self):
"""
Test management updates to the listener and connector sslProfile
Expand Down Expand Up @@ -1002,3 +1003,141 @@ def ping(self, client_ssl_info, pattern, server_logpath):
path=openssl_server.outfile_path:
ping(self, ssl_info, data, path),
timeout=10.0, delay=0.5))

def test_ssl_profile_update_TEST(self):
"""
Test management updates to the listener and connector sslProfile
configurations
"""
payload = b'?' * 1024 * 65
self.server_port = self.tester.get_port()
server_ssl_info = dict()
server_ssl_info['CA_CERT'] = CA_CERT
server_ssl_info['SERVER_CERTIFICATE'] = SERVER_CERTIFICATE
server_ssl_info['SERVER_PRIVATE_KEY'] = SERVER_PRIVATE_KEY
server_ssl_info['SERVER_PRIVATE_KEY_PASSWORD'] = SERVER_PRIVATE_KEY_PASSWORD

server_create = self.tester.openssl_server
openssl_server = server_create(listening_port=self.server_port,
ssl_info=server_ssl_info,
name="OpenSSLServerAuthPeer",
cl_args=['-Verify', '1',
'-verify_return_error'])

client_ssl_info = dict()
client_ssl_info['CA_CERT'] = CA_CERT
client_ssl_info['CLIENT_CERTIFICATE'] = CLIENT_CERTIFICATE
client_ssl_info['CLIENT_PRIVATE_KEY'] = CLIENT_PRIVATE_KEY
client_ssl_info['CLIENT_PRIVATE_KEY_PASSWORD'] = CLIENT_PRIVATE_KEY_PASSWORD

def ping(self, client_ssl_info, pattern, server_logpath):
# Helper routine: try to create a TLS connection across the
# routers, return True if successful
try:
out, error = self.opensslclient(port=self.server_port,
ssl_info=client_ssl_info,
data=pattern.encode() + payload,
cl_args=['-verify', '10',
'-verify_return_error'])
except Exception as exc:
print(f"s_client failed: '{exc}'", flush=True)
return False
if b"Verification: OK" not in out:
print(f"s_client failed: '{error}'", flush=True)
return False
if b"Verify return code: 0 (ok)" not in out:
print(f"s_client failed: '{error}'", flush=True)
return False

# compensate for the slight delay where the server flushes to the
# log - not critical because we retry on failure
sleep(0.25)
with open(server_logpath, 'rt') as log_file:
if not is_pattern_present(log_file, pattern):
print(f"Server pattern not found: '{pattern}'", flush=True)
return False
return True

# Check the initial configuration

self.assertTrue(retry(lambda ssl_info=client_ssl_info,
data="Sanity Check the Configuration",
path=openssl_server.outfile_path:
ping(self, ssl_info, data, path),
timeout=10.0, delay=0.5))

# Try to connect to the server using an incompatible cert

client_ssl_info = dict()
client_ssl_info['CA_CERT'] = CA2_CERT
client_ssl_info['CLIENT_CERTIFICATE'] = CLIENT2_CERTIFICATE
client_ssl_info['CLIENT_PRIVATE_KEY'] = CLIENT2_PRIVATE_KEY
client_ssl_info['CLIENT_PRIVATE_KEY_PASSWORD'] = CLIENT2_PRIVATE_KEY_PASSWORD

out, error = self.opensslclient(port=self.server_port,
ssl_info=client_ssl_info,
data=b"Expect this to fail!" + payload,
expect=Process.EXIT_FAIL,
cl_args=['-verify', '10',
'-verify_return_error'])

sleep(0.5)
with open(openssl_server.outfile_path, 'rt') as out_file:
self.assertFalse(is_pattern_present(out_file,
"Expect this to fail"),
"TLS connection did not fail")

# Restore

client_ssl_info = dict()
client_ssl_info['CA_CERT'] = CA_CERT
client_ssl_info['CLIENT_CERTIFICATE'] = CLIENT_CERTIFICATE
client_ssl_info['CLIENT_PRIVATE_KEY'] = CLIENT_PRIVATE_KEY
client_ssl_info['CLIENT_PRIVATE_KEY_PASSWORD'] = CLIENT_PRIVATE_KEY_PASSWORD
self.assertTrue(retry(lambda ssl_info=client_ssl_info,
data="Hey we recovered!",
path=openssl_server.outfile_path:
ping(self, ssl_info, data, path),
timeout=10.0, delay=0.5))

#
# Now try incompatible server-side config
#

openssl_server.teardown()
self.server_port = self.tester.get_port()

server_ssl_info = dict()
server_ssl_info['CA_CERT'] = CA2_CERT
server_ssl_info['SERVER_CERTIFICATE'] = SERVER2_CERTIFICATE
server_ssl_info['SERVER_PRIVATE_KEY'] = SERVER2_PRIVATE_KEY
server_ssl_info['SERVER_PRIVATE_KEY_PASSWORD'] = SERVER2_PRIVATE_KEY_PASSWORD
openssl_server = server_create(listening_port=self.server_port,
ssl_info=server_ssl_info,
name="OpenSSLServerAuthPeer2",
cl_args=['-Verify', '1',
'-verify_return_error'])

out, error = self.opensslclient(port=self.server_port,
ssl_info=client_ssl_info,
data=b"Expect this to fail!" + payload,
expect=Process.EXIT_FAIL,
cl_args=['-verify', '10',
'-verify_return_error'])
with open(openssl_server.outfile_path, 'rt') as out_file:
self.assertFalse(is_pattern_present(out_file,
"Expect this to fail"),
"TLS connection did not fail")

# restore client

client_ssl_info = dict()
client_ssl_info['CA_CERT'] = CA2_CERT
client_ssl_info['CLIENT_CERTIFICATE'] = CLIENT2_CERTIFICATE
client_ssl_info['CLIENT_PRIVATE_KEY'] = CLIENT2_PRIVATE_KEY
client_ssl_info['CLIENT_PRIVATE_KEY_PASSWORD'] = CLIENT2_PRIVATE_KEY_PASSWORD
self.assertTrue(retry(lambda ssl_info=client_ssl_info,
data="The server conn must succeed!",
path=openssl_server.outfile_path:
ping(self, ssl_info, data, path),
timeout=10.0, delay=0.5))

0 comments on commit 5b30012

Please sign in to comment.