From e50c2d502e1c7a483bfebe97f53c7a46b77844aa Mon Sep 17 00:00:00 2001 From: Ganesh Murthy Date: Thu, 15 Aug 2024 22:41:08 -0400 Subject: [PATCH] Issue 1563 (#1601) * Fixes #1563: Delete corresponding connections optionally on deletion of tcpListener and tcpConnector Signed-off-by: Gabor Dozsa * Add the TCP connection termination test to the system tests list Signed-off-by: Gabor Dozsa * Add SSL scenario to the TCP connections termination system test * Use Qdrouterd.SKManager to run skmanage commands * Fix python-checker warnings * Take the listener/connector lock for traversing the connections list * Make the new config flag global to the router * Change the name of the new config flag to dropTcpConnections * Fix comments * Make the the default value True for the new config flag * Take activation_lock and check raw_opened flag when trigger closing connection * Check if core_conn is not zero when trigger closing connection * Add pending_close flag to qd_tcp_connection struct to handle early termination * Fix data race warning for conn->core_conn * Couple conn->core_conn setup and delete with DEQ_INSERT(conn) and DEQ_REMOVE(conn), repectively * Try to remove a connection from the connections list only if core_conn is not zero * Only set closing flag in delete loop and wake up the connections * Fix and adjust system test timeouts * Only wake up the HEAD conn at delete. Wake up the NEXT conn in the close handler. * Check for conn->raw_conn in delete loop --------- Signed-off-by: Gabor Dozsa Co-authored-by: Gabor Dozsa --- src/adaptors/tcp/tcp_adaptor.c | 219 +++++++---- src/adaptors/tcp/tcp_adaptor.h | 4 +- src/dispatch.c | 4 +- src/dispatch_private.h | 1 + tests/CMakeLists.txt | 1 + tests/system_tests_tcp_conns_terminate.py | 436 ++++++++++++++++++++++ 6 files changed, 582 insertions(+), 83 deletions(-) create mode 100644 tests/system_tests_tcp_conns_terminate.py diff --git a/src/adaptors/tcp/tcp_adaptor.c b/src/adaptors/tcp/tcp_adaptor.c index eead8a03f..20573aa0d 100644 --- a/src/adaptors/tcp/tcp_adaptor.c +++ b/src/adaptors/tcp/tcp_adaptor.c @@ -447,7 +447,7 @@ static void set_state_XSIDE_IO(qd_tcp_connection_t *conn, qd_tcp_connection_stat // thread for activation (see CORE_activate()). During cleanup of these objects we need to ensure that both the I/O and // Core threads do not reference them after they have been deallocated. To do this we use a two-phase approach to // freeing these objects. In the first phase all non-activation-related resources are released by the I/O thread (see -// qd_tcp_connector_decref(), free_connection_IO). Then the object is passed to the Core thread for cleanup of the activation +// qd_tcp_connector_decref()). Then the object is passed to the Core thread for cleanup of the activation // resources and freeing the base object (see free_tcp_resource(), qdr_core_free_tcp_resource_CT()). // // tcp_listener_t does not use a qdr_connection_t so this process does not apply to it. @@ -481,49 +481,6 @@ static void free_tcp_resource(qd_tcp_common_t *resource) qdr_action_enqueue(tcp_context->core, action); } -static void free_connection_IO(void *context) -{ - // No thread assertion here - can be RAW_IO or TIMER_IO - qd_tcp_connection_t *conn = (qd_tcp_connection_t*) context; - qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Cleaning up resources", conn->conn_id); - - // Disable activation via Core thread. The lock needs to be taken to ensure the core thread is not currently - // attempting to activate the connection: after the mutex is unlocked we're guaranteed no further activations can - // take place. - sys_mutex_lock(&conn->activation_lock); - CLEAR_ATOMIC_FLAG(&conn->raw_opened); - sys_mutex_unlock(&conn->activation_lock); - // Do NOT free the core_activation lock since the core may be holding it - - if (conn->common.parent) { - if (conn->common.parent->context_type == TL_LISTENER) { - qd_tcp_listener_t *listener = (qd_tcp_listener_t*) conn->common.parent; - sys_mutex_lock(&listener->lock); - DEQ_REMOVE(listener->connections, conn); - sys_mutex_unlock(&listener->lock); - // - // Call listener decref when a connection associated with the listener is removed (DEQ_REMOVE(listener->connections, conn)) - // - conn->common.parent = 0; - qd_tcp_listener_decref(listener); - } else { - qd_tcp_connector_t *connector = (qd_tcp_connector_t*) conn->common.parent; - sys_mutex_lock(&connector->lock); - DEQ_REMOVE(connector->connections, conn); - sys_mutex_unlock(&connector->lock); - // - // Call connector decref when a connection associated with the connector is removed (DEQ_REMOVE(connector->connections, conn)) - // - conn->common.parent = 0; - qd_tcp_connector_decref(connector); - } - } - - // Pass connection to Core for final deallocation. The Core will free the activation_lock and the related flags. See - // qdr_core_free_tcp_resource_CT() - free_tcp_resource(&conn->common); -} - // Initate close of the raw connection. // // The close will be complete when the PN_RAW_CONNECTION_DISCONNECTED event is handled. At that point any associated @@ -561,6 +518,46 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn) if (conn->state != XSIDE_CLOSING) set_state_XSIDE_IO(conn, XSIDE_CLOSING); + if (conn->common.parent) { + if (conn->common.parent->context_type == TL_LISTENER) { + qd_tcp_listener_t *listener = (qd_tcp_listener_t*) conn->common.parent; + sys_mutex_lock(&listener->lock); + listener->connections_closed++; + if (IS_ATOMIC_FLAG_SET(&listener->closing)) { + // Wake up the next conn on the list to get it closed + // See qd_dispatch_delete_tcp_listener() where the head connection is woken up. + qd_tcp_connection_t *next_conn = DEQ_NEXT(conn); + if (!!next_conn) + pn_raw_connection_wake(next_conn->raw_conn); + } + DEQ_REMOVE(listener->connections, conn); + sys_mutex_unlock(&listener->lock); + // + // Call listener decref when a connection associated with the listener is removed (DEQ_REMOVE(listener->connections, conn)) + // + conn->common.parent = 0; + qd_tcp_listener_decref(listener); + } else { + qd_tcp_connector_t *connector = (qd_tcp_connector_t*) conn->common.parent; + sys_mutex_lock(&connector->lock); + connector->connections_closed++; + if (IS_ATOMIC_FLAG_SET(&connector->closing)) { + // Wake up the next conn on the list to get it closed + // See qd_dispatch_delete_tcp_connector() where the head connection is woken up. + qd_tcp_connection_t *next_conn = DEQ_NEXT(conn); + if (!!next_conn) + pn_raw_connection_wake(next_conn->raw_conn); + } + DEQ_REMOVE(connector->connections, conn); + sys_mutex_unlock(&connector->lock); + // + // Call connector decref when a connection associated with the connector is removed (DEQ_REMOVE(connector->connections, conn)) + // + conn->common.parent = 0; + qd_tcp_connector_decref(connector); + } + } + if (!!conn->raw_conn) { CLEAR_ATOMIC_FLAG(&conn->raw_opened); qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] close_connection_XSIDE_IO", conn->conn_id); @@ -576,8 +573,6 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn) sys_mutex_unlock(&conn->activation_lock); } - free(conn->reply_to); - if (!!conn->inbound_stream) { qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP cancel producer activation", DLV_ARGS(conn->inbound_delivery)); qd_message_cancel_producer_activation(conn->inbound_stream); @@ -633,6 +628,7 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn) qd_tls_free2(conn->tls); qd_tls_domain_decref(conn->tls_domain); free(conn->alpn_protocol); + free(conn->reply_to); conn->reply_to = 0; conn->inbound_link = 0; @@ -643,25 +639,23 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn) conn->outbound_delivery = 0; conn->observer_handle = 0; conn->common.vflow = 0; - conn->core_conn = 0; conn->tls = 0; conn->tls_domain = 0; - if (conn->common.parent) { - if (conn->common.parent->context_type == TL_LISTENER) { - qd_tcp_listener_t *li = (qd_tcp_listener_t*) conn->common.parent; - sys_mutex_lock(&li->lock); - li->connections_closed++; - sys_mutex_unlock(&li->lock); - } else { - qd_tcp_connector_t *cr = (qd_tcp_connector_t*) conn->common.parent; - sys_mutex_lock(&cr->lock); - cr->connections_closed++; - sys_mutex_unlock(&cr->lock); - } - } + // No thread assertion here - can be RAW_IO or TIMER_IO + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Cleaning up resources", conn->conn_id); + + // Disable activation via Core thread. The lock needs to be taken to ensure the core thread is not currently + // attempting to activate the connection: after the mutex is unlocked we're guaranteed no further activations can + // take place. + sys_mutex_lock(&conn->activation_lock); + CLEAR_ATOMIC_FLAG(&conn->raw_opened); + sys_mutex_unlock(&conn->activation_lock); + // Do NOT free the core_activation lock since the core may be holding it - free_connection_IO(conn); + // Pass connection to Core for final deallocation. The Core will free the activation_lock and the related flags. See + // qdr_core_free_tcp_resource_CT() + free_tcp_resource(&conn->common); } @@ -921,7 +915,7 @@ static void link_setup_LSIDE_IO(qd_tcp_connection_t *conn) qd_tcp_listener_t *li = (qd_tcp_listener_t*) conn->common.parent; qdr_terminus_t *target = qdr_terminus(0); qdr_terminus_t *source = qdr_terminus(0); - char host[64]; // for numeric remote client IP:port address + char host[64]; // for numeric remote client IP:port address qdr_terminus_set_address(target, li->adaptor_config->address); qdr_terminus_set_dynamic(source); @@ -929,7 +923,6 @@ static void link_setup_LSIDE_IO(qd_tcp_connection_t *conn) qd_raw_conn_get_address_buf(conn->raw_conn, host, sizeof(host)); conn->core_conn = TL_open_core_connection(conn->conn_id, true, host); qdr_connection_set_context(conn->core_conn, conn); - conn->inbound_link = qdr_link_first_attach(conn->core_conn, QD_INCOMING, qdr_terminus(0), target, "tcp.lside.in", 0, false, 0, &conn->inbound_link_id); qdr_link_set_context(conn->inbound_link, conn); conn->outbound_link = qdr_link_first_attach(conn->core_conn, QD_OUTGOING, source, qdr_terminus(0), "tcp.lside.out", 0, false, 0, &conn->outbound_link_id); @@ -944,7 +937,7 @@ static void link_setup_CSIDE_IO(qd_tcp_connection_t *conn, qdr_delivery_t *deliv ASSERT_RAW_IO; assert(conn->common.parent->context_type == TL_CONNECTOR); - const char *host = ((qd_tcp_connector_t *)conn->common.parent)->adaptor_config->host_port; + const char *host = ((qd_tcp_connector_t *) conn->common.parent)->adaptor_config->host_port; conn->core_conn = TL_open_core_connection(conn->conn_id, false, host); qdr_connection_set_context(conn->core_conn, conn); @@ -1237,17 +1230,16 @@ static uint64_t handle_first_outbound_delivery_CSIDE(qd_tcp_connector_t *connect conn->context.context = conn; conn->context.handler = on_connection_event_CSIDE_IO; + conn->raw_conn = pn_raw_connection(); + pn_raw_connection_set_context(conn->raw_conn, &conn->context); + sys_mutex_lock(&connector->lock); DEQ_INSERT_TAIL(connector->connections, conn); - connector->connections_opened++; vflow_set_uint64(connector->common.vflow, VFLOW_ATTRIBUTE_FLOW_COUNT_L4, connector->connections_opened); vflow_set_ref_from_record(conn->common.vflow, VFLOW_ATTRIBUTE_CONNECTOR, connector->common.vflow); sys_mutex_unlock(&connector->lock); - conn->raw_conn = pn_raw_connection(); - pn_raw_connection_set_context(conn->raw_conn, &conn->context); - // // The raw connection establishment must be the last thing done in this function. // After this call, a separate IO thread may immediately be invoked in the context @@ -1736,13 +1728,25 @@ static void connection_run_LSIDE_IO(qd_tcp_connection_t *conn) // // Don't do anything // - break; + return; default: assert(false); break; } } while (repeat); + + bool closing = false; + if (conn->common.parent) + closing = IS_ATOMIC_FLAG_SET(&((qd_tcp_listener_t *) conn->common.parent)->closing); + if (closing) { + if (!!conn->core_conn) { + qdr_core_close_connection(conn->core_conn); + } else { + close_raw_connection(conn, "Parent-deleted", "Forced closed"); + set_state_XSIDE_IO(conn, XSIDE_CLOSING); // prevent further connection I/O + } + } } @@ -1804,13 +1808,25 @@ static void connection_run_CSIDE_IO(qd_tcp_connection_t *conn) // // Don't do anything // - break; + return; default: assert(false); break; - } + } } while(repeat); + + bool closing = false; + if (conn->common.parent) + closing = IS_ATOMIC_FLAG_SET(&((qd_tcp_connector_t *) conn->common.parent)->closing); + if (closing) { + if (!!conn->core_conn) { + qdr_core_close_connection(conn->core_conn); + } else { + close_raw_connection(conn, "Parent-deleted", "Forced closed"); + set_state_XSIDE_IO(conn, XSIDE_CLOSING); // prevent further connection I/O + } + } } @@ -1949,7 +1965,6 @@ static int setup_tls_session(qd_tcp_connection_t *conn, const qd_tls_domain_t *p return 0; } - //================================================================================= // Handlers for events from the Raw Connections //================================================================================= @@ -2038,19 +2053,19 @@ static void on_accept(qd_adaptor_listener_t *adaptor_listener, pn_listener_t *pn conn->context.context = conn; conn->context.handler = on_connection_event_LSIDE_IO; + conn->raw_conn = pn_raw_connection(); + pn_raw_connection_set_context(conn->raw_conn, &conn->context); + + if (listener->protocol_observer) { + conn->observer_handle = qdpo_begin(listener->protocol_observer, conn->common.vflow, conn, conn->conn_id); + } + sys_mutex_lock(&listener->lock); DEQ_INSERT_TAIL(listener->connections, conn); - listener->connections_opened++; vflow_set_uint64(listener->common.vflow, VFLOW_ATTRIBUTE_FLOW_COUNT_L4, listener->connections_opened); sys_mutex_unlock(&listener->lock); - if (listener->protocol_observer) { - conn->observer_handle = qdpo_begin(listener->protocol_observer, conn->common.vflow, conn, conn->conn_id); - } - - conn->raw_conn = pn_raw_connection(); - pn_raw_connection_set_context(conn->raw_conn, &conn->context); // Note: this will trigger the connection's event handler on another thread: pn_listener_raw_accept(pn_listener, conn->raw_conn); } @@ -2274,7 +2289,6 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di static void CORE_connection_close(void *context, qdr_connection_t *conn, qdr_error_t *error) { - // hahaha qd_tcp_common_t *common = (qd_tcp_common_t*) qdr_connection_get_context(conn); qd_tcp_connection_t *tcp_conn = (qd_tcp_connection_t*) common; if (tcp_conn) { @@ -2356,6 +2370,7 @@ QD_EXPORT void *qd_dispatch_configure_tcp_listener(qd_dispatch_t *qd, qd_entity_ listener->common.context_type = TL_LISTENER; sys_mutex_init(&listener->lock); + sys_atomic_init(&listener->closing, 0); sys_mutex_lock(&tcp_context->lock); DEQ_INSERT_TAIL(tcp_context->listeners, listener); @@ -2387,6 +2402,27 @@ QD_EXPORT void qd_dispatch_delete_tcp_listener(qd_dispatch_t *qd, void *impl) listener->common.vflow = 0; } // + // Initiate termination of existing connections + // + if (tcp_context->qd->terminate_tcp_conns) { + // Note: PN_RAW_CONNECTION_CONNECTED event or PN_RAW_CONNECTION_DISCONNECTED event + // could come upon any of the connections. We need to hold the listener->lock + // to prevent any modification of the connections list while it is being traversed. + sys_mutex_lock(&listener->lock); + SET_ATOMIC_FLAG(&listener->closing); + + // + // Only the head connection is woken when holding the lock. + // This is an optimization. The next connection in the list is woken up in the handler of the PN_RAW_DISCONNECTED + // event of this connection (See close_connection_XSIDE_IO() to see where the next connection in the list is woken up). + // That way, we don't have to wake all the connections when holding the lock. + // + qd_tcp_connection_t *conn = DEQ_HEAD(listener->connections); + if (conn) + pn_raw_connection_wake(conn->raw_conn); + sys_mutex_unlock(&listener->lock); + } + // // If all the connections associated with this listener has been closed, this call to // qd_tcp_listener_decref should free the listener // @@ -2413,6 +2449,27 @@ QD_EXPORT void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, void *impl) connector->core_conn = 0; qd_connection_counter_dec(QD_PROTOCOL_TCP); // + // Initiate termination of existing connections + // + if (tcp_context->qd->terminate_tcp_conns) { + // Note: PN_RAW_CONNECTION_CONNECTED event or PN_RAW_CONNECTION_DISCONNECTED event + // could come upon any of the connections. We need to hold the connector->lock + // to prevent any modification of the connections list while it is being traversed. + sys_mutex_lock(&connector->lock); + SET_ATOMIC_FLAG(&connector->closing); + // + // Only the head connection is woken when holding the lock. + // This is an optimization. The next connection in the list is woken up in the handler of the PN_RAW_DISCONNECTED + // event of this connection (See close_connection_XSIDE_IO() to see where the next connection in the list is woken up). + // That way, we don't have to wake all the connections when holding the lock. + // + qd_tcp_connection_t *conn = DEQ_HEAD(connector->connections); + if (conn) + pn_raw_connection_wake(conn->raw_conn); + sys_mutex_unlock(&connector->lock); + } + // + // // If all the connections associated with this listener has been closed, this call to // qd_tcp_listener_decref should free the listener // @@ -2492,6 +2549,8 @@ qd_tcp_connector_t *qd_dispatch_configure_tcp_connector(qd_dispatch_t *qd, qd_en connector->activate_timer = qd_timer(tcp_context->qd, on_core_activate_TIMER_IO, connector); connector->common.context_type = TL_CONNECTOR; sys_mutex_init(&connector->lock); + sys_atomic_init(&connector->closing, 0); + qd_log(LOG_TCP_ADAPTOR, QD_LOG_INFO, "Configured TcpConnector for %s, %s:%s", diff --git a/src/adaptors/tcp/tcp_adaptor.h b/src/adaptors/tcp/tcp_adaptor.h index 3a84c9e9d..30fbae5f1 100644 --- a/src/adaptors/tcp/tcp_adaptor.h +++ b/src/adaptors/tcp/tcp_adaptor.h @@ -68,7 +68,7 @@ struct qd_tcp_listener_t { uint64_t connections_opened; uint64_t connections_closed; sys_atomic_t ref_count; - bool closing; + sys_atomic_t closing; }; @@ -88,7 +88,7 @@ typedef struct qd_tcp_connector_t { uint64_t connections_opened; uint64_t connections_closed; sys_atomic_t ref_count; - bool closing; + sys_atomic_t closing; } qd_tcp_connector_t; diff --git a/src/dispatch.c b/src/dispatch.c index e1972c99d..9f81ce1e4 100644 --- a/src/dispatch.c +++ b/src/dispatch.c @@ -231,7 +231,7 @@ qd_error_t qd_dispatch_configure_router(qd_dispatch_t *qd, qd_entity_t *entity) case QD_ROUTER_MODE_EDGE: mode = "Edge_"; break; case QD_ROUTER_MODE_ENDPOINT: mode = "Endpoint_"; break; } - + qd->router_id = (char*) malloc(strlen(mode) + QD_DISCRIMINATOR_SIZE + 2); strcpy(qd->router_id, mode); qd_generate_discriminator(qd->router_id + strlen(qd->router_id)); @@ -242,6 +242,8 @@ qd_error_t qd_dispatch_configure_router(qd_dispatch_t *qd, qd_entity_t *entity) qd->timestamps_in_utc = qd_entity_opt_bool(entity, "timestampsInUTC", false); QD_ERROR_RET(); qd->timestamp_format = qd_entity_opt_string(entity, "timestampFormat", 0); QD_ERROR_RET(); qd->metadata = qd_entity_opt_string(entity, "metadata", 0); QD_ERROR_RET(); + qd->terminate_tcp_conns = qd_entity_opt_bool(entity, "dropTcpConnections", true); + QD_ERROR_RET(); if (! qd->sasl_config_path) { qd->sasl_config_path = qd_entity_opt_string(entity, "saslConfigDir", 0); QD_ERROR_RET(); diff --git a/src/dispatch_private.h b/src/dispatch_private.h index f88ce1dad..5f4a7fbcf 100644 --- a/src/dispatch_private.h +++ b/src/dispatch_private.h @@ -58,6 +58,7 @@ struct qd_dispatch_t { char *metadata; bool timestamps_in_utc; char *data_connection_count; + bool terminate_tcp_conns; }; qd_dispatch_t *qd_dispatch_get_dispatch(void); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index b2f035ce5..4f8c732cc 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -172,6 +172,7 @@ foreach(py_test_module system_tests_http1_observer system_tests_tcp_adaptor system_tests_tcp_adaptor_tls + system_tests_tcp_conns_terminate system_tests_http2_tls system_tests_address_watch system_tests_router_annotations diff --git a/tests/system_tests_tcp_conns_terminate.py b/tests/system_tests_tcp_conns_terminate.py new file mode 100644 index 000000000..bbc37ec69 --- /dev/null +++ b/tests/system_tests_tcp_conns_terminate.py @@ -0,0 +1,436 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License +# + +import json +import os + +from system_test import TestCase, Qdrouterd, retry, retry_assertion +from system_test import Logger, TCP_LISTENER_TYPE, TCP_CONNECTOR_TYPE +from system_test import SERVER_CERTIFICATE, SERVER_PRIVATE_KEY_NO_PASS, CA_CERT +from system_test import CLIENT_CERTIFICATE, CLIENT_PRIVATE_KEY +from system_test import SERVER_PRIVATE_KEY, CLIENT_PRIVATE_KEY_PASSWORD +from system_test import SERVER_PRIVATE_KEY_PASSWORD +from vanflow_snooper import VFlowSnooperThread, ANY_VALUE +from TCP_echo_client import TcpEchoClient +from TCP_echo_server import TcpEchoServer + + +class TerminateTcpConnectionsTest(TestCase): + """ + Test the router config flag 'dropTcpConnections'. All corresponding active + tcp flows are expected to terminate when a tcpListener or tcpConnector is + deleted if the config flag is set. Otherwise, the flows should stay alive. + """ + + @classmethod + def setUpClass(cls): + """ + Start two routers: R1 and R2. Both have two tcpConnetors with two TCP echo + server attached. Both router also has four tcpListeners. One of the + tcpConnectors and two of the tcpListeners are using SSL at both routers. + The new config flag is tuned on for R1 but not for R2. + Each tcpConnector has a unique VAN address which is also used by one tcpListener + at each router. The four distinct VAN addresses are used to test deletion of a + tcpListeners and tcpConnectors with ot without using SSL. + """ + super(TerminateTcpConnectionsTest, cls).setUpClass() + + cls.test_name = 'TerminateTcpConnectionsTest' + + router_1_id = 'R1' + router_2_id = 'R2' + + # SSL info + cls.ssl_info = {'SERVER_CERTIFICATE': SERVER_CERTIFICATE, + 'SERVER_PRIVATE_KEY': SERVER_PRIVATE_KEY_NO_PASS, + 'CA_CERT': CA_CERT} + cls.client_ssl_info = {'CLIENT_CERTIFICATE': CLIENT_CERTIFICATE, + 'CLIENT_PRIVATE_KEY': CLIENT_PRIVATE_KEY, + 'CLIENT_PRIVATE_KEY_PASSWORD': CLIENT_PRIVATE_KEY_PASSWORD, + 'CA_CERT': CA_CERT} + tcp_listener_ssl_profile_name = 'tcp-listener-ssl-profile' + + # VAN addresses to use without SSL config + cls.address_no_ssl = [cls.test_name + '_no_ssl_1', cls.test_name + '_no_ssl_2'] + + # VAN addresses to use with SSL config + cls.address_ssl = [cls.test_name + '_ssl_1', cls.test_name + '_ssl_2'] + + # Launch TCP echo servers + server_logger = Logger(title=cls.test_name, + print_to_console=True, + save_for_dump=False, + ofilename=os.path.join(os.path.dirname(os.getcwd()), + f"{cls.test_name}_echo_server.log")) + echo_servers = {} + server_prefix = f"{cls.test_name} ECHO_SERVER_1_no_ssl" + echo_servers[cls.address_no_ssl[0]] = TcpEchoServer(prefix=server_prefix, + port=0, + logger=server_logger) + assert echo_servers[cls.address_no_ssl[0]].is_running + server_prefix = f"{cls.test_name} ECHO_SERVER_2_no_ssl" + echo_servers[cls.address_no_ssl[1]] = TcpEchoServer(prefix=server_prefix, + port=0, + logger=server_logger) + assert echo_servers[cls.address_no_ssl[1]].is_running + server_prefix = f"{cls.test_name} ECHO_SERVER_1_ssl" + echo_servers[cls.address_ssl[0]] = TcpEchoServer(prefix=server_prefix, + port=0, + ssl_info=cls.ssl_info, + logger=server_logger) + assert echo_servers[cls.address_ssl[0]].is_running + server_prefix = f"{cls.test_name} ECHO_SERVER_2_ssl" + echo_servers[cls.address_ssl[1]] = TcpEchoServer(prefix=server_prefix, + port=0, + ssl_info=cls.ssl_info, + logger=server_logger) + assert echo_servers[cls.address_ssl[1]].is_running + cls.echo_servers = echo_servers + + # Create listener ports + cls.listener_ports = {router_1_id: {}, router_2_id: {}} + for rtr in [router_1_id, router_2_id]: + for addr in cls.address_no_ssl + cls.address_ssl: + cls.listener_ports[rtr][addr] = cls.tester.get_port() + + # Launch routers: router_1 has the TCP connections termination flag turned on (by default), + # router_2 has the flag turned off explicitly + inter_router_port = cls.tester.get_port() + config_1 = Qdrouterd.Config([ + ('router', {'mode': 'interior', 'id': router_1_id}), + ('sslProfile', {'name': 'tcp-listener-ssl-profile', + 'caCertFile': CA_CERT, + 'certFile': SERVER_CERTIFICATE, + 'privateKeyFile': SERVER_PRIVATE_KEY, + 'password': SERVER_PRIVATE_KEY_PASSWORD}), + ('sslProfile', {'name': 'tcp-connector-ssl-profile', + 'caCertFile': CA_CERT, + 'certFile': CLIENT_CERTIFICATE, + 'privateKeyFile': CLIENT_PRIVATE_KEY, + 'password': CLIENT_PRIVATE_KEY_PASSWORD}), + ('listener', {'port': cls.tester.get_port()}), + ('connector', {'role': 'inter-router', 'port': inter_router_port}), + ('tcpListener', {'host': "0.0.0.0", + 'address': cls.address_no_ssl[0], + 'port': cls.listener_ports[router_1_id][cls.address_no_ssl[0]]}), + ('tcpListener', {'host': "0.0.0.0", + 'address': cls.address_no_ssl[1], + 'port': cls.listener_ports[router_1_id][cls.address_no_ssl[1]]}), + ('tcpConnector', {'host': "localhost", + 'address': cls.address_no_ssl[0], + 'port': echo_servers[cls.address_no_ssl[0]].port}), + ('tcpListener', {'host': "0.0.0.0", + 'sslProfile': tcp_listener_ssl_profile_name, + 'address': cls.address_ssl[0], + 'port': cls.listener_ports[router_1_id][cls.address_ssl[0]]}), + ('tcpListener', {'host': "0.0.0.0", + 'sslProfile': tcp_listener_ssl_profile_name, + 'address': cls.address_ssl[1], + 'port': cls.listener_ports[router_1_id][cls.address_ssl[1]]}), + ('tcpConnector', {'host': "localhost", + 'sslProfile': 'tcp-connector-ssl-profile', + 'address': cls.address_ssl[0], + 'port': echo_servers[cls.address_ssl[0]].port}) + ]) + config_2 = Qdrouterd.Config([ + ('router', {'mode': 'interior', 'id': router_2_id, 'dropTcpConnections': False}), + ('sslProfile', {'name': 'tcp-listener-ssl-profile', + 'caCertFile': CA_CERT, + 'certFile': SERVER_CERTIFICATE, + 'privateKeyFile': SERVER_PRIVATE_KEY, + 'password': SERVER_PRIVATE_KEY_PASSWORD}), + ('sslProfile', {'name': 'tcp-connector-ssl-profile', + 'caCertFile': CA_CERT, + 'certFile': CLIENT_CERTIFICATE, + 'privateKeyFile': CLIENT_PRIVATE_KEY, + 'password': CLIENT_PRIVATE_KEY_PASSWORD}), + ('listener', {'port': cls.tester.get_port()}), + ('listener', {'role': 'inter-router', 'port': inter_router_port}), + ('tcpListener', {'host': "0.0.0.0", + 'address': cls.address_no_ssl[0], + 'port': cls.listener_ports[router_2_id][cls.address_no_ssl[0]]}), + ('tcpListener', {'host': "0.0.0.0", + 'address': cls.address_no_ssl[1], + 'port': cls.listener_ports[router_2_id][cls.address_no_ssl[1]]}), + ('tcpConnector', {'host': "localhost", + 'address': cls.address_no_ssl[1], + 'port': echo_servers[cls.address_no_ssl[1]].port}), + ('tcpListener', {'host': "0.0.0.0", + 'sslProfile': tcp_listener_ssl_profile_name, + 'address': cls.address_ssl[0], + 'port': cls.listener_ports[router_2_id][cls.address_ssl[0]]}), + ('tcpListener', {'host': "0.0.0.0", + 'sslProfile': tcp_listener_ssl_profile_name, + 'address': cls.address_ssl[1], + 'port': cls.listener_ports[router_2_id][cls.address_ssl[1]]}), + ('tcpConnector', {'host': "localhost", + 'sslProfile': 'tcp-connector-ssl-profile', + 'address': cls.address_ssl[1], + 'port': echo_servers[cls.address_ssl[1]].port}) + ]) + + cls.router_2 = cls.tester.qdrouterd('test_router_2', config_2) + cls.router_1 = cls.tester.qdrouterd('test_router_1', config_1) + + cls.router_1.wait_router_connected('R2') + cls.router_2.wait_router_connected('R1') + + cls.snooper_thread = VFlowSnooperThread(cls.router_1.addresses[0]) + + # wait for the TCP listeners and connectors + expected = { + router_1_id : [ + ('LISTENER', {'VAN_ADDRESS': cls.address_no_ssl[0]}), + ('LISTENER', {'VAN_ADDRESS': cls.address_no_ssl[1]}), + ('LISTENER', {'VAN_ADDRESS': cls.address_ssl[0]}), + ('LISTENER', {'VAN_ADDRESS': cls.address_ssl[1]}), + ('CONNECTOR', {'VAN_ADDRESS': cls.address_no_ssl[0]}), + ('CONNECTOR', {'VAN_ADDRESS': cls.address_ssl[0]}) + ], + router_2_id : [ + ('LISTENER', {'VAN_ADDRESS': cls.address_no_ssl[0]}), + ('LISTENER', {'VAN_ADDRESS': cls.address_no_ssl[1]}), + ('LISTENER', {'VAN_ADDRESS': cls.address_ssl[0]}), + ('LISTENER', {'VAN_ADDRESS': cls.address_ssl[1]}), + ('CONNECTOR', {'VAN_ADDRESS': cls.address_no_ssl[1]}), + ('CONNECTOR', {'VAN_ADDRESS': cls.address_ssl[1]}) + ] + } + success = retry(lambda: cls.snooper_thread.match_records(expected)) + result = cls.snooper_thread.get_results() + cls.assertTrue(success, f"Failed to match records {result}") + + # vflow ids are necessary to relate flows to tcp listeners + cls.router_1_vflow_id = None + cls.router_2_vflow_id = None + for router_key, router_attrs in result.items(): + for rec in router_attrs: + if rec['RECORD_TYPE'] == 'ROUTER': + if rec['NAME'].endswith(f"/{router_1_id}"): + cls.router_1_vflow_id = rec['IDENTITY'] + elif rec['NAME'].endswith(f"/{router_2_id}"): + cls.router_2_vflow_id = rec['IDENTITY'] + + # retry() parameters for VanFlowSnooper.match_record() tests where we + # expect failure. E.g. we try to match the 'END_TIME' attribute in the + # 'BIFLOW_TPORT' record - and expect failure - in order to check that a flow + # is still acive. + cls.timeout = 5 + cls.delay = 0.5 + + @classmethod + def tearDownClass(cls): + # stop echo servers + for _, server in cls.echo_servers.items(): + server.wait() + super(TerminateTcpConnectionsTest, cls).tearDownClass() + + def get_tcp_entity_vflow_id(self, router_vflow_id, record_type, address): + res = self.snooper_thread.get_results() + try: + for rec in res[router_vflow_id]: + if rec['RECORD_TYPE'] == record_type and rec['PROTOCOL'] == 'tcp' and rec['VAN_ADDRESS'] == address: + return rec['IDENTITY'] + except KeyError: + pass + return None + + def delete_tcp_entity(self, address, entity_type, router): + def find_entity(address, entity_type, router, expected=True): + query_cmd = f'QUERY --type={entity_type}' + output = json.loads(router.sk_manager(query_cmd)) + res = [e for e in output if e['address'] == address] + if expected: + self.assertTrue(len(res) == 1) + return res[0] + else: + self.assertTrue(len(res) == 0) + return None + + e = find_entity(address, entity_type, router) + name = e['name'] + delete_cmd = 'DELETE --type=' + entity_type + ' --name=' + name + router.sk_manager(delete_cmd) + # check that rthe entity has been deleted + retry_assertion(lambda: find_entity(address, entity_type, router, expected=False), + timeout=2, delay=1) + + def create_echo_clients(self, client_prefix, client_port, ssl=False): + # We use the delay_close flag to keep the connections open + echo_clients = [] + client_logger = Logger(title=client_prefix, + print_to_console=True) + + ssl_info = self.client_ssl_info if ssl else None + + for i in [1, 2]: + echo_clients.append(TcpEchoClient(client_prefix + '_' + str(i), + host='localhost', + port=client_port, + size=1, + count=1, + logger=client_logger, + ssl_info=ssl_info, + delay_close=True)) + return echo_clients + + def clean_up_echo_clients(self, echo_clients): + for e in echo_clients: + e.wait() + + def setup_flows(self, address, ssl): + """ + Setup tcp flows via the tcpListeners and the tcpConnector which have + the particular VAN address. Two flows are created for each tcpListener. + """ + router_1_id = self.router_1.config.router_id + router_2_id = self.router_2.config.router_id + + # vflow Ids are used to associate flows with tcpListeners and tcpConnectors + vflow_ids = {} + self.assertIsNotNone(self.router_1_vflow_id) + self.assertIsNotNone(self.router_2_vflow_id) + vflow_ids['listener_1'] = self.get_tcp_entity_vflow_id(self.router_1_vflow_id, + 'LISTENER', address) + vflow_ids['listener_2'] = self.get_tcp_entity_vflow_id(self.router_2_vflow_id, + 'LISTENER', address) + self.assertIsNotNone(vflow_ids['listener_1']) + self.assertIsNotNone(vflow_ids['listener_2']) + + # Create two flows for tcpListener at router_1 + client_prefix = self.test_name + " ECHO_CLIENT_1_" + address + client_port = self.listener_ports[router_1_id][address] + echo_clients = self.create_echo_clients(client_prefix, client_port, ssl) + + # Create another two flows for tcpListener at router_2 + client_prefix = self.test_name + " ECHO_CLIENT_2_" + address + client_port = self.listener_ports[router_2_id][address] + echo_clients.extend(self.create_echo_clients(client_prefix, client_port, ssl)) + + # Check if all vflows are created + expected = { + router_1_id : [ + ('BIFLOW_TPORT', {'PARENT': vflow_ids['listener_1']}), + ('BIFLOW_TPORT', {'PARENT': vflow_ids['listener_1']}), + ], + router_2_id : [ + ('BIFLOW_TPORT', {'PARENT': vflow_ids['listener_2']}), + ('BIFLOW_TPORT', {'PARENT': vflow_ids['listener_2']}), + ] + } + success = retry(lambda: self.snooper_thread.match_records(expected), delay=1) + self.assertTrue(success, f"Failed to match records {self.snooper_thread.get_results()}") + + return vflow_ids, echo_clients + + def check_vflows_active(self, router_id, parent_vflow_id, timeout=0): + """ + Check if flows with a specific parent listener are active (i.e. no END_TIME + attributes are present) + """ + expected = { + router_id: [ + ('BIFLOW_TPORT', {'PARENT': parent_vflow_id, 'END_TIME': ANY_VALUE}), + ] + } + success = retry(lambda: self.snooper_thread.match_records(expected), + timeout=timeout, delay=self.delay) + self.assertFalse(success, + f"ParentId {parent_vflow_id} Matched records {self.snooper_thread.get_results()}") + + def check_vflows_terminated(self, router_id, parent_vflow_id): + """ + Check if flows with a specific parent listener have terminated + """ + expected = { + router_id: [ + ('BIFLOW_TPORT', {'PARENT': parent_vflow_id, 'END_TIME': ANY_VALUE}), + ('BIFLOW_TPORT', {'PARENT': parent_vflow_id, 'END_TIME': ANY_VALUE}), + ] + } + success = retry(lambda: self.snooper_thread.match_records(expected)) + self.assertTrue(success, + f"ParentId {parent_vflow_id} Matched records {self.snooper_thread.get_results()}") + + def check_all_vflows_active(self, vflow_ids, timeout=0): + """ + Check if all flows are still active on both routers + """ + self.check_vflows_active(self.router_1.config.router_id, vflow_ids['listener_1'], timeout=timeout) + self.check_vflows_active(self.router_2.config.router_id, vflow_ids['listener_2'], timeout=timeout) + + def delete_tcp_entities_conns_terminate(self, address, ssl=False): + # router_1 has the "dropTcpConnections" config flag turned on + # This test deletes tcpListener and tcpconnector at router_1 + router_1_id = self.router_1.config.router_id + router_2_id = self.router_2.config.router_id + + # Create flows from tcpListener:router_1 to tcpConnector:router_1 and + # from tcpListener:router_2 to tcpConnector:router_1 + vflow_ids, echo_clients = self.setup_flows(address, ssl) + self.assertTrue(len(echo_clients) == 4) + + # Delete tcpListener_1:router_1 + self.delete_tcp_entity(address, TCP_LISTENER_TYPE, self.router_1) + # Flows of deleted listener are expected to terminate + self.check_vflows_terminated(router_1_id, vflow_ids['listener_1']) + # Flows of tcpListener:router_2 to tcpConnector:router_1 should stay active + self.check_vflows_active(router_2_id, vflow_ids['listener_2'], timeout=self.timeout) + + # Delete tcpConnector:router_1 + self.delete_tcp_entity(address, TCP_CONNECTOR_TYPE, self.router_1) + # Flows of tcpListener:router_2 to tcpConnector:router_1 should terminate + self.check_vflows_terminated(router_2_id, vflow_ids['listener_2']) + + self.clean_up_echo_clients(echo_clients) + + def delete_tcp_entities_conns_active(self, address, ssl=False): + # router_2 does not have the "dropTcpConnections" config flag turned on + # This test we deletes tcpListener and tcpconnector at router_2 + + router_1_id = self.router_1.config.router_id + router_2_id = self.router_2.config.router_id + + vflow_ids, echo_clients = self.setup_flows(address, ssl) + self.assertTrue(len(echo_clients) == 4) + self.check_all_vflows_active(vflow_ids) + + # Delete tcpListener:router_2 + self.delete_tcp_entity(address, TCP_LISTENER_TYPE, self.router_2) + # All flows should stay active + self.check_all_vflows_active(vflow_ids, timeout=self.timeout) + + # Delete tcpConnector:router_2 + self.delete_tcp_entity(address, TCP_CONNECTOR_TYPE, self.router_2) + # All flows should stay active + self.check_all_vflows_active(vflow_ids, timeout=self.timeout) + + self.clean_up_echo_clients(echo_clients) + + def test_delete_tcp_entities_conns_terminate(self): + self.delete_tcp_entities_conns_terminate(self.address_no_ssl[0]) + + def test_delete_tcp_entities_conns_active(self): + self.delete_tcp_entities_conns_active(self.address_no_ssl[1]) + + def test_delete_tcp_entities_conns_terminate_ssl(self): + self.delete_tcp_entities_conns_terminate(self.address_ssl[0], ssl=True) + + def test_delete_tcp_entities_conns_active_ssl(self): + self.delete_tcp_entities_conns_active(self.address_ssl[1], ssl=True)