From bb4c196831084aba1d361f841ffdf7fdb67ece32 Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Thu, 8 Feb 2024 14:44:19 -0500 Subject: [PATCH] Fixes #1407 - use adaptor_listener to manage listening socket lifecycle --- src/adaptors/tcp_lite/tcp_lite.c | 143 ++++++++++++------------------ src/adaptors/tcp_lite/tcp_lite.h | 7 +- tests/system_tests_tcp_adaptor.py | 14 ++- 3 files changed, 69 insertions(+), 95 deletions(-) diff --git a/src/adaptors/tcp_lite/tcp_lite.c b/src/adaptors/tcp_lite/tcp_lite.c index 1663ae25f..294aaabc4 100644 --- a/src/adaptors/tcp_lite/tcp_lite.c +++ b/src/adaptors/tcp_lite/tcp_lite.c @@ -118,6 +118,7 @@ static void connection_run_LSIDE_IO(tcplite_connection_t *conn); static void connection_run_CSIDE_IO(tcplite_connection_t *conn); static void connection_run_XSIDE_IO(tcplite_connection_t *conn); static uint64_t validate_outbound_message(const qdr_delivery_t *out_dlv); +static void on_accept(qd_adaptor_listener_t *listener, pn_listener_t *pn_listener, void *context); //================================================================================= @@ -146,15 +147,15 @@ __thread tcplite_thread_state_t tcplite_thread_state; // Core Activation Handler //================================================================================= /** - * This function in invoked in a timer thread, not associated with any IO context, in order to - * process core connections terminated in the adaptor. The core connections processed here are - * for listeners and connectors only. Connection activation happens elsewhere, in the context of - * a Proton raw IO connection. + * This function in invoked in a timer thread, not associated with any IO context, in order to process core connections + * terminated in the adaptor. The core connections processed here are for connectors only. Connection activation + * happens elsewhere, in the context of a Proton raw IO connection. */ static void on_core_activate_TIMER_IO(void *context) { SET_THREAD_TIMER_IO; - qdr_connection_t *core_conn = ((tcplite_common_t*) context)->core_conn; + assert(((tcplite_common_t*) context)->context_type == TL_CONNECTOR); + qdr_connection_t *core_conn = ((tcplite_connector_t*) context)->core_conn; qdr_connection_process(core_conn); } @@ -228,22 +229,6 @@ static qdr_connection_t *TL_open_core_connection(uint64_t conn_id, bool incoming static void TL_setup_listener(tcplite_listener_t *li) { - // - // Set up a core connection to handle all of the links and deliveries for this listener - // - li->common.conn_id = qd_server_allocate_connection_id(tcplite_context->server); - li->common.core_conn = TL_open_core_connection(li->common.conn_id, true); - qdr_connection_set_context(li->common.core_conn, li); - - // - // Attach an in-link to represent the desire to send connection streams to the address - // - qdr_terminus_t *target = qdr_terminus(0); - qdr_terminus_set_address(target, li->adaptor_config->address); - - li->in_link = qdr_link_first_attach(li->common.core_conn, QD_INCOMING, 0, target, "tcp.listener.in", 0, false, 0, &li->link_id); - qdr_link_set_context(li->in_link, li); - // // Create a vflow record for this listener // @@ -263,6 +248,15 @@ static void TL_setup_listener(tcplite_listener_t *li) // li->protocol_observer_config = qdpo_config(0, true); li->protocol_observer = protocol_observer("tcp", li->protocol_observer_config); + + // + // Create an adaptor listener. This listener will automatically create a listening socket when there is at least one + // consumer for the service address. Once the last consumer for the service address goes away the adaptor listener + // will automatically close the listening socket. When a client connects to the listening socket the "on_accept" + // callback will be invoked on the proactor listener thread. + // + li->adaptor_listener = qd_adaptor_listener(tcplite_context->qd, li->adaptor_config, LOG_TCP_ADAPTOR); + qd_adaptor_listener_listen(li->adaptor_listener, on_accept, li); } @@ -271,9 +265,9 @@ static void TL_setup_connector(tcplite_connector_t *cr) // // Set up a core connection to handle all of the links and deliveries for this connector // - cr->common.conn_id = qd_server_allocate_connection_id(tcplite_context->server); - cr->common.core_conn = TL_open_core_connection(cr->common.conn_id, false); - qdr_connection_set_context(cr->common.core_conn, cr); + cr->conn_id = qd_server_allocate_connection_id(tcplite_context->server); + cr->core_conn = TL_open_core_connection(cr->conn_id, false); + qdr_connection_set_context(cr->core_conn, cr); // // Attach an out-link to represent our desire to receive connection streams for the address @@ -293,7 +287,7 @@ static void TL_setup_connector(tcplite_connector_t *cr) vflow_set_uint64(cr->common.vflow, VFLOW_ATTRIBUTE_FLOW_COUNT_L4, 0); vflow_add_rate(cr->common.vflow, VFLOW_ATTRIBUTE_FLOW_COUNT_L4, VFLOW_ATTRIBUTE_FLOW_RATE_L4); - cr->out_link = qdr_link_first_attach(cr->common.core_conn, QD_OUTGOING, source, 0, "tcp.connector.out", 0, false, 0, &cr->link_id); + cr->out_link = qdr_link_first_attach(cr->core_conn, QD_OUTGOING, source, 0, "tcp.connector.out", 0, false, 0, &cr->link_id); qdr_link_set_user_streaming(cr->out_link); qdr_link_set_context(cr->out_link, cr); qdr_link_flow(tcplite_context->core, cr->out_link, 5, false); @@ -336,7 +330,7 @@ static void set_state_XSIDE_IO(tcplite_connection_t *conn, tcplite_connection_st { ASSERT_RAW_IO; qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] State change %s -> %s", - conn->common.conn_id, tcplite_connection_state_name(conn->state), tcplite_connection_state_name(new_state)); + conn->conn_id, tcplite_connection_state_name(conn->state), tcplite_connection_state_name(new_state)); conn->state = new_state; } @@ -356,7 +350,6 @@ static void free_listener(tcplite_listener_t *li) qdpo_free(li->protocol_observer); qdpo_config_free(li->protocol_observer_config); - qd_timer_free(li->activate_timer); qd_tls_domain_decref(li->tls_domain); qd_free_adaptor_config(li->adaptor_config); sys_mutex_free(&li->lock); @@ -389,7 +382,7 @@ static void free_connection_IO(void *context) // No thread assertion here - can be RAW_IO or TIMER_IO bool free_parent = false; tcplite_connection_t *conn = (tcplite_connection_t*) context; - qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Cleaning up resources", conn->common.conn_id); + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Cleaning up resources", conn->conn_id); if (!!conn->common.parent && conn->common.parent->context_type == TL_LISTENER) { tcplite_listener_t *li = (tcplite_listener_t*) conn->common.parent; @@ -488,8 +481,8 @@ static void close_connection_XSIDE_IO(tcplite_connection_t *conn, bool no_delay) qdr_delivery_decref(tcplite_context->core, conn->outbound_delivery, "close_connection_XSIDE_IO - outbound_delivery"); } - if (!!conn->common.core_conn) { - qdr_connection_closed(conn->common.core_conn); + if (!!conn->core_conn) { + qdr_connection_closed(conn->core_conn); qd_connection_counter_dec(QD_PROTOCOL_TCP); } @@ -505,7 +498,7 @@ static void close_connection_XSIDE_IO(tcplite_connection_t *conn, bool no_delay) conn->outbound_link = 0; conn->outbound_stream = 0; conn->outbound_delivery = 0; - conn->common.core_conn = 0; + conn->core_conn = 0; conn->common.vflow = 0; if (!!conn->common.parent && conn->common.parent->context_type == TL_LISTENER) { @@ -610,7 +603,7 @@ static void grant_read_buffers_XSIDE_IO(tcplite_connection_t *conn, const size_t pn_raw_connection_give_read_buffers(conn->raw_conn, raw_buffers, granted); - qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] grant_read_buffers_XSIDE_IO - %ld", conn->common.conn_id, granted); + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] grant_read_buffers_XSIDE_IO - %ld", conn->conn_id, granted); } } @@ -645,7 +638,7 @@ static uint64_t produce_read_buffers_XSIDE_IO(tcplite_connection_t *conn, qd_mes } if (!DEQ_IS_EMPTY(qd_buffers)) { - //qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] produce_read_buffers_XSIDE_IO - Producing %ld buffers", conn->common.conn_id, DEQ_SIZE(qd_buffers)); + //qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] produce_read_buffers_XSIDE_IO - Producing %ld buffers", conn->conn_id, DEQ_SIZE(qd_buffers)); qd_message_produce_buffers(stream, &qd_buffers); cutthrough_notify_buffers_produced_inbound(stream); } @@ -682,7 +675,7 @@ static uint64_t consume_write_buffers_XSIDE_IO(tcplite_connection_t *conn, qd_me octet_count += raw_buffers[i].size; buf = DEQ_NEXT(buf); } - //qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] consume_write_buffers_XSIDE_IO - Consuming %ld buffers", conn->common.conn_id, actual); + //qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] consume_write_buffers_XSIDE_IO - Consuming %ld buffers", conn->conn_id, actual); pn_raw_connection_write_buffers(conn->raw_conn, raw_buffers, actual); cutthrough_notify_buffers_consumed_outbound(stream); } @@ -770,12 +763,12 @@ static void link_setup_LSIDE_IO(tcplite_connection_t *conn) qdr_terminus_set_address(target, li->adaptor_config->address); qdr_terminus_set_dynamic(source); - conn->common.core_conn = TL_open_core_connection(conn->common.conn_id, true); - qdr_connection_set_context(conn->common.core_conn, conn); + conn->core_conn = TL_open_core_connection(conn->conn_id, true); + qdr_connection_set_context(conn->core_conn, conn); - conn->inbound_link = qdr_link_first_attach(conn->common.core_conn, QD_INCOMING, qdr_terminus(0), target, "tcp.lside.in", 0, false, 0, &conn->inbound_link_id); + conn->inbound_link = qdr_link_first_attach(conn->core_conn, QD_INCOMING, qdr_terminus(0), target, "tcp.lside.in", 0, false, 0, &conn->inbound_link_id); qdr_link_set_context(conn->inbound_link, conn); - conn->outbound_link = qdr_link_first_attach(conn->common.core_conn, QD_OUTGOING, source, qdr_terminus(0), "tcp.lside.out", 0, false, 0, &conn->outbound_link_id); + conn->outbound_link = qdr_link_first_attach(conn->core_conn, QD_OUTGOING, source, qdr_terminus(0), "tcp.lside.out", 0, false, 0, &conn->outbound_link_id); qdr_link_set_context(conn->outbound_link, conn); qdr_link_set_user_streaming(conn->outbound_link); qdr_link_flow(tcplite_context->core, conn->outbound_link, 1, false); @@ -789,12 +782,12 @@ static void link_setup_CSIDE_IO(tcplite_connection_t *conn, qdr_delivery_t *deli qdr_terminus_set_address(target, conn->reply_to); - conn->common.core_conn = TL_open_core_connection(conn->common.conn_id, false); - qdr_connection_set_context(conn->common.core_conn, conn); + conn->core_conn = TL_open_core_connection(conn->conn_id, false); + qdr_connection_set_context(conn->core_conn, conn); - conn->inbound_link = qdr_link_first_attach(conn->common.core_conn, QD_INCOMING, qdr_terminus(0), target, "tcp.cside.in", 0, false, 0, &conn->inbound_link_id); + conn->inbound_link = qdr_link_first_attach(conn->core_conn, QD_INCOMING, qdr_terminus(0), target, "tcp.cside.in", 0, false, 0, &conn->inbound_link_id); qdr_link_set_context(conn->inbound_link, conn); - conn->outbound_link = qdr_link_first_attach(conn->common.core_conn, QD_OUTGOING, qdr_terminus(0), qdr_terminus(0), "tcp.cside.out", 0, false, delivery, &conn->inbound_link_id); + conn->outbound_link = qdr_link_first_attach(conn->core_conn, QD_OUTGOING, qdr_terminus(0), qdr_terminus(0), "tcp.cside.out", 0, false, delivery, &conn->inbound_link_id); qdr_link_set_context(conn->outbound_link, conn); } @@ -864,7 +857,7 @@ static bool try_compose_and_send_client_stream_LSIDE_IO(tcplite_connection_t *co qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "][L%" PRIu64 "] Initiating listener side empty client stream message", - conn->common.conn_id, conn->inbound_link_id); + conn->conn_id, conn->inbound_link_id); return true; } @@ -931,7 +924,7 @@ static void compose_and_send_server_stream_CSIDE_IO(tcplite_connection_t *conn) qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "][L%" PRIu64 "] Initiating connector side empty server stream message", - conn->common.conn_id, conn->inbound_link_id); + conn->conn_id, conn->inbound_link_id); } @@ -960,7 +953,7 @@ static uint64_t handle_outbound_delivery_LSIDE_IO(tcplite_connection_t *conn, qd { ASSERT_RAW_IO; qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] handle_outbound_delivery_LSIDE_IO - receive_complete=%s", - conn->common.conn_id, qd_message_receive_complete(conn->outbound_stream) ? "true" : "false"); + conn->conn_id, qd_message_receive_complete(conn->outbound_stream) ? "true" : "false"); if (!conn->outbound_delivery) { // newly arrived delivery: validate it @@ -1036,11 +1029,11 @@ static uint64_t handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qd extract_metadata_from_stream_CSIDE(conn); - conn->common.conn_id = qd_server_allocate_connection_id(tcplite_context->server); + conn->conn_id = qd_server_allocate_connection_id(tcplite_context->server); conn->context.context = conn; conn->context.handler = on_connection_event_CSIDE_IO; - qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] CSIDE outbound delivery", conn->common.conn_id); + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] CSIDE outbound delivery", conn->conn_id); sys_mutex_lock(&cr->lock); DEQ_INSERT_TAIL(cr->connections, conn); @@ -1081,7 +1074,7 @@ static uint64_t handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qd static void handle_outbound_delivery_CSIDE(tcplite_connection_t *conn, qdr_link_t *link, qdr_delivery_t *delivery, bool settled) { qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] handle_outbound_delivery_CSIDE - receive_complete=%s", - conn->common.conn_id, qd_message_receive_complete(conn->outbound_stream) ? "true" : "false"); + conn->conn_id, qd_message_receive_complete(conn->outbound_stream) ? "true" : "false"); // // It is not guaranteed that this function will be called on the proper IO thread. Wake the raw connection for @@ -1118,7 +1111,7 @@ static bool manage_flow_XSIDE_IO(tcplite_connection_t *conn) conn->inbound_octets += octet_count; if (octet_count > 0) { - qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] %cSIDE Raw read: Produced %"PRIu64" octets into stream", conn->common.conn_id, conn->listener_side ? 'L' : 'C', octet_count); + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] %cSIDE Raw read: Produced %"PRIu64" octets into stream", conn->conn_id, conn->listener_side ? 'L' : 'C', octet_count); if (!was_blocked && window_full(conn)) { conn->window.closed_count += 1; qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP RX window CLOSED: inbound_bytes=%" PRIu64 " unacked=%" PRIu64, @@ -1144,7 +1137,7 @@ static bool manage_flow_XSIDE_IO(tcplite_connection_t *conn) // the inbound stream/delivery and close out the inbound half of the connection. // if (pn_raw_connection_is_read_closed(conn->raw_conn) && !blocked) { - qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Read-closed - close inbound delivery", conn->common.conn_id); + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Read-closed - close inbound delivery", conn->conn_id); qd_message_set_receive_complete(conn->inbound_stream); qdr_delivery_continue(tcplite_context->core, conn->inbound_delivery, false); qdr_delivery_set_context(conn->inbound_delivery, 0); @@ -1181,7 +1174,7 @@ static bool manage_flow_XSIDE_IO(tcplite_connection_t *conn) conn->outbound_octets += body_octets; conn->window.pending_ack += body_octets; if (body_octets > 0) { - qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] %cSIDE Raw write: Consumed %"PRIu64" octets from stream (body-field)", conn->common.conn_id, conn->listener_side ? 'L' : 'C', body_octets); + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] %cSIDE Raw write: Consumed %"PRIu64" octets from stream (body-field)", conn->conn_id, conn->listener_side ? 'L' : 'C', body_octets); } } @@ -1193,7 +1186,7 @@ static bool manage_flow_XSIDE_IO(tcplite_connection_t *conn) conn->outbound_octets += octets; conn->window.pending_ack += octets; if (octets > 0) { - qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] %cSIDE Raw write: Consumed %"PRIu64" octets from stream", conn->common.conn_id, conn->listener_side ? 'L' : 'C', octets); + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] %cSIDE Raw write: Consumed %"PRIu64" octets from stream", conn->conn_id, conn->listener_side ? 'L' : 'C', octets); } } @@ -1215,7 +1208,7 @@ static bool manage_flow_XSIDE_IO(tcplite_connection_t *conn) // payload has been consumed and written before write-closing the connection. // if (qd_message_receive_complete(conn->outbound_stream) && !qd_message_can_consume_buffers(conn->outbound_stream)) { - qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Rx-complete, rings empty: Write-closing the raw connection", conn->common.conn_id); + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Rx-complete, rings empty: Write-closing the raw connection", conn->conn_id); pn_raw_connection_write_close(conn->raw_conn); qdr_delivery_set_context(conn->outbound_delivery, 0); qdr_delivery_remote_state_updated(tcplite_context->core, conn->outbound_delivery, PN_ACCEPTED, true, 0, true); // accepted, settled, ref_given @@ -1319,7 +1312,7 @@ static void connection_run_CSIDE_IO(tcplite_connection_t *conn) // // If there was an error during the connection-open, reject the client delivery. // - qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] CSIDE connect error on initial attempt - Rejecting outbound delivery", conn->common.conn_id); + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] CSIDE connect error on initial attempt - Rejecting outbound delivery", conn->conn_id); qdr_delivery_set_context(conn->outbound_delivery, 0); qdr_delivery_remote_state_updated(tcplite_context->core, conn->outbound_delivery, PN_REJECTED, true, 0, true); // rejected, settled, ref_given conn->outbound_delivery = 0; @@ -1421,15 +1414,15 @@ static void on_connection_event_LSIDE_IO(pn_event_t *e, qd_server_t *qd_server, { SET_THREAD_RAW_IO; tcplite_connection_t *conn = (tcplite_connection_t*) context; - qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] on_connection_event_LSIDE_IO: %s", conn->common.conn_id, pn_event_type_name(pn_event_type(e))); + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] on_connection_event_LSIDE_IO: %s", conn->conn_id, pn_event_type_name(pn_event_type(e))); if (pn_event_type(e) == PN_RAW_CONNECTION_DISCONNECTED) { close_connection_XSIDE_IO(conn, false); return; } - if (CLEAR_ATOMIC_FLAG(&conn->core_activation) && !!conn->common.core_conn) { - qdr_connection_process(conn->common.core_conn); + if (CLEAR_ATOMIC_FLAG(&conn->core_activation) && !!conn->core_conn) { + qdr_connection_process(conn->core_conn); } connection_run_LSIDE_IO(conn); @@ -1440,7 +1433,7 @@ static void on_connection_event_CSIDE_IO(pn_event_t *e, qd_server_t *qd_server, { SET_THREAD_RAW_IO; tcplite_connection_t *conn = (tcplite_connection_t*) context; - qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] on_connection_event_CSIDE_IO: %s", conn->common.conn_id, pn_event_type_name(pn_event_type(e))); + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] on_connection_event_CSIDE_IO: %s", conn->conn_id, pn_event_type_name(pn_event_type(e))); if (pn_event_type(e) == PN_RAW_CONNECTION_DISCONNECTED) { conn->error = !!conn->raw_conn ? pn_raw_connection_condition(conn->raw_conn) : 0; @@ -1463,15 +1456,15 @@ static void on_connection_event_CSIDE_IO(pn_event_t *e, qd_server_t *qd_server, } } - if (CLEAR_ATOMIC_FLAG(&conn->core_activation) && !!conn->common.core_conn) { - qdr_connection_process(conn->common.core_conn); + if (CLEAR_ATOMIC_FLAG(&conn->core_activation) && !!conn->core_conn) { + qdr_connection_process(conn->core_conn); } connection_run_CSIDE_IO(conn); } -void on_accept(qd_adaptor_listener_t *listener, pn_listener_t *pn_listener, void *context) +static void on_accept(qd_adaptor_listener_t *listener, pn_listener_t *pn_listener, void *context) { tcplite_listener_t *li = (tcplite_listener_t*) context; @@ -1491,7 +1484,7 @@ void on_accept(qd_adaptor_listener_t *listener, pn_listener_t *pn_listener, void conn->common.vflow = vflow_start_record(VFLOW_RECORD_FLOW, li->common.vflow); vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_OCTETS, 0); - conn->common.conn_id = qd_server_allocate_connection_id(tcplite_context->server); + conn->conn_id = qd_server_allocate_connection_id(tcplite_context->server); conn->context.context = conn; conn->context.handler = on_connection_event_LSIDE_IO; @@ -1517,7 +1510,7 @@ static void CORE_activate(void *context, qdr_connection_t *core_conn) switch (common->context_type) { case TL_LISTENER: - qd_timer_schedule(((tcplite_listener_t*) common)->activate_timer, 0); + assert(false); // listeners are never activated, relies on adaptor_listener callback break; case TL_CONNECTOR: @@ -1586,20 +1579,7 @@ static void CORE_flow(void *context, qdr_link_t *link, int credit) { tcplite_common_t *common = (tcplite_common_t*) qdr_link_get_context(link); - if (common->context_type == TL_LISTENER) { - tcplite_listener_t *li = (tcplite_listener_t*) common; - if (!li->adaptor_listener) { - // - // There is no adaptor listener. We need to allocate one. - // - li->adaptor_listener = qd_adaptor_listener(tcplite_context->qd, li->adaptor_config, LOG_TCP_ADAPTOR); - - // - // Start listening on the socket - // - qd_adaptor_listener_listen(li->adaptor_listener, on_accept, li); - } - } else if (common->context_type == TL_CONNECTION) { + if (common->context_type == TL_CONNECTION) { tcplite_connection_t *conn = (tcplite_connection_t*) common; if (qdr_link_direction(link) == QD_INCOMING && credit > 0) { conn->inbound_credit = true; @@ -1782,7 +1762,6 @@ tcplite_listener_t *qd_dispatch_configure_tcp_listener_lite(qd_dispatch_t *qd, q "Configured TcpListener (lite encap) for %s, %s:%s", li->adaptor_config->address, li->adaptor_config->host, li->adaptor_config->port); - li->activate_timer = qd_timer(tcplite_context->qd, on_core_activate_TIMER_IO, li); li->common.context_type = TL_LISTENER; sys_mutex_init(&li->lock); @@ -1803,13 +1782,9 @@ void qd_dispatch_delete_tcp_listener_lite(qd_dispatch_t *qd, tcplite_listener_t li->closing = true; if (!tcplite_context->adaptor_finalizing) { - if (!!li->common.core_conn) { - qdr_connection_closed(li->common.core_conn); - qd_connection_counter_dec(QD_PROTOCOL_TCP); - } - if (!!li->adaptor_listener) { qd_adaptor_listener_close(li->adaptor_listener); + li->adaptor_listener = 0; } } else { tcplite_connection_t *conn = DEQ_HEAD(li->connections); @@ -1901,7 +1876,7 @@ void qd_dispatch_delete_tcp_connector_lite(qd_dispatch_t *qd, tcplite_connector_ cr->closing = true; if (!tcplite_context->adaptor_finalizing) { - qdr_connection_closed(cr->common.core_conn); + qdr_connection_closed(cr->core_conn); qd_connection_counter_dec(QD_PROTOCOL_TCP); } else { tcplite_connection_t *conn = DEQ_HEAD(cr->connections); diff --git a/src/adaptors/tcp_lite/tcp_lite.h b/src/adaptors/tcp_lite/tcp_lite.h index 306ff7ffd..f69c125c8 100644 --- a/src/adaptors/tcp_lite/tcp_lite.h +++ b/src/adaptors/tcp_lite/tcp_lite.h @@ -51,15 +51,12 @@ struct tcplite_common_t { tcplite_context_type_t context_type; tcplite_common_t *parent; vflow_record_t *vflow; - qdr_connection_t *core_conn; - uint64_t conn_id; }; struct tcplite_listener_t { tcplite_common_t common; DEQ_LINKS(tcplite_listener_t); sys_mutex_t lock; - qd_timer_t *activate_timer; qd_adaptor_config_t *adaptor_config; qd_tls_domain_t *tls_domain; uint64_t link_id; @@ -81,6 +78,8 @@ typedef struct tcplite_connector_t { qd_timer_t *activate_timer; qd_adaptor_config_t *adaptor_config; qd_tls_domain_t *tls_domain; + qdr_connection_t *core_conn; // dispatcher conn and link + uint64_t conn_id; uint64_t link_id; qdr_link_t *out_link; tcplite_connection_list_t connections; @@ -122,6 +121,8 @@ typedef struct tcplite_connection_t { sys_atomic_t core_activation; sys_atomic_t raw_opened; qd_timer_t *close_timer; + qdr_connection_t *core_conn; + uint64_t conn_id; qdr_link_t *inbound_link; qd_message_t *inbound_stream; qdr_delivery_t *inbound_delivery; diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index 2f90128f5..9b1583699 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -2560,9 +2560,10 @@ def _get_conn_counters(self) -> Mapping[str, int]: def _run_test(self, encaps, idle_ct, active_ct): # idle_ct: expected connection count when tcp configured, but prior to - # connections active + # connections active. This includes the count of special dispatcher + # connections. # activ_ct: expected connection count when 1 client and 1 server - # connected + # connected (also include dispatcher conns) mgmt = self.router.management # verify the counters start at zero (not including amqp) @@ -2602,12 +2603,9 @@ def _run_test(self, encaps, idle_ct, active_ct): # expect that simply configuring the tcp listener/connector will # instantiate the "dispatcher" connection: - errmsg = "Expected idle count failed!" - errmsg += "\nIf you fixed the phantom tcp-lite connection counter" - errmsg += " please update this test with the new counter values!" self.assertTrue(retry(lambda: self._get_conn_counters().get("tcp") == idle_ct), - errmsg) + f"Expected {idle_ct} got {self._get_conn_counters()}!") with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client: client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) @@ -2639,8 +2637,8 @@ def test_01_check_counter(self): """ Create and destroy TCP network connections, verify the connection counter is correct. """ - for encaps, idle_ct, active_ct in [('legacy', 1, 3), ('lite', 2, 4)]: - self._run_test(encaps, idle_ct, active_ct) + for encaps in ['legacy', 'lite']: + self._run_test(encaps, idle_ct=1, active_ct=3) class TcpAdaptorNoDelayedDelivery(TestCase):