diff --git a/src/adaptors/tcp_lite/tcp_lite.c b/src/adaptors/tcp_lite/tcp_lite.c index 7a2747c32..dd65d7d88 100644 --- a/src/adaptors/tcp_lite/tcp_lite.c +++ b/src/adaptors/tcp_lite/tcp_lite.c @@ -445,7 +445,9 @@ static void free_connection_IO(void *context) tcplite_connection_t *conn = (tcplite_connection_t*) context; qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Cleaning up resources", conn->conn_id); - // disable activation via Core thread + // Disable activation via Core thread. The lock needs to be taken to ensure the core thread is not currently + // attempting to activate the connection: after the mutex is unlocked we're guaranteed no further activations can + // take place. sys_mutex_lock(&conn->activation_lock); CLEAR_ATOMIC_FLAG(&conn->raw_opened); sys_mutex_unlock(&conn->activation_lock); @@ -1208,9 +1210,7 @@ static uint64_t handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qd // The raw connection establishment must be the last thing done in this function. // After this call, a separate IO thread may immediately be invoked in the context // of the new connection to handle raw connection events. - // ISSUE-1202 - Set the conn->raw_opened flag before calling pn_proactor_raw_connect() // - SET_ATOMIC_FLAG(&conn->raw_opened); pn_proactor_raw_connect(tcplite_context->proactor, conn->raw_conn, cr->adaptor_config->host_port); return QD_DELIVERY_MOVED_TO_NEW_LINK; @@ -1586,14 +1586,15 @@ static void connection_run_LSIDE_IO(tcplite_connection_t *conn) switch (conn->state) { case LSIDE_INITIAL: - // raw connection is active - if (conn->tls) { - qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] LSIDE_IO performing TLS handshake", conn->conn_id); - set_state_XSIDE_IO(conn, LSIDE_TLS_HANDSHAKE); - repeat = true; - } else { - link_setup_LSIDE_IO(conn); - set_state_XSIDE_IO(conn, LSIDE_LINK_SETUP); + if (IS_ATOMIC_FLAG_SET(&conn->raw_opened)) { // raw connection is active + if (conn->tls) { + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] LSIDE_IO performing TLS handshake", conn->conn_id); + set_state_XSIDE_IO(conn, LSIDE_TLS_HANDSHAKE); + repeat = true; + } else { + link_setup_LSIDE_IO(conn); + set_state_XSIDE_IO(conn, LSIDE_LINK_SETUP); + } } break; @@ -1685,9 +1686,10 @@ static void connection_run_CSIDE_IO(tcplite_connection_t *conn) switch (conn->state) { case CSIDE_INITIAL: - // raw connection is active - link_setup_CSIDE_IO(conn, conn->outbound_delivery); - set_state_XSIDE_IO(conn, CSIDE_LINK_SETUP); + if (IS_ATOMIC_FLAG_SET(&conn->raw_opened)) { // raw connection is active + link_setup_CSIDE_IO(conn, conn->outbound_delivery); + set_state_XSIDE_IO(conn, CSIDE_LINK_SETUP); + } break; case CSIDE_LINK_SETUP: @@ -1835,10 +1837,15 @@ static char *get_tls_negotiated_alpn(qd_message_t *msg) static void on_connection_event_LSIDE_IO(pn_event_t *e, qd_server_t *qd_server, void *context) { SET_THREAD_RAW_IO; - tcplite_connection_t *conn = (tcplite_connection_t*) context; - qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] on_connection_event_LSIDE_IO: %s", conn->conn_id, pn_event_type_name(pn_event_type(e))); - - if (pn_event_type(e) == PN_RAW_CONNECTION_DISCONNECTED) { + pn_event_type_t etype = pn_event_type(e); + tcplite_connection_t *conn = (tcplite_connection_t*) context; + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] on_connection_event_LSIDE_IO: %s", conn->conn_id, pn_event_type_name(etype)); + + if (etype == PN_RAW_CONNECTION_CONNECTED) { + // it is safe to call pn_raw_connection_wake() now + assert(!IS_ATOMIC_FLAG_SET(&conn->raw_opened)); + SET_ATOMIC_FLAG(&conn->raw_opened); + } else if (etype == PN_RAW_CONNECTION_DISCONNECTED) { conn->error = !!conn->raw_conn ? pn_raw_connection_condition(conn->raw_conn) : 0; if (!!conn->error) { @@ -1867,10 +1874,15 @@ static void on_connection_event_LSIDE_IO(pn_event_t *e, qd_server_t *qd_server, static void on_connection_event_CSIDE_IO(pn_event_t *e, qd_server_t *qd_server, void *context) { SET_THREAD_RAW_IO; - tcplite_connection_t *conn = (tcplite_connection_t*) context; - qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] on_connection_event_CSIDE_IO: %s", conn->conn_id, pn_event_type_name(pn_event_type(e))); - - if (pn_event_type(e) == PN_RAW_CONNECTION_DISCONNECTED) { + pn_event_type_t etype = pn_event_type(e); + tcplite_connection_t *conn = (tcplite_connection_t*) context; + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] on_connection_event_CSIDE_IO: %s", conn->conn_id, pn_event_type_name(etype)); + + if (etype == PN_RAW_CONNECTION_CONNECTED) { + // it is safe to call pn_raw_connection_wake() now + assert(!IS_ATOMIC_FLAG_SET(&conn->raw_opened)); + SET_ATOMIC_FLAG(&conn->raw_opened); + } else if (etype == PN_RAW_CONNECTION_DISCONNECTED) { conn->error = !!conn->raw_conn ? pn_raw_connection_condition(conn->raw_conn) : 0; if (!!conn->error) { @@ -1931,7 +1943,7 @@ static void on_accept(qd_adaptor_listener_t *listener, pn_listener_t *pn_listene sys_mutex_init(&conn->activation_lock); sys_atomic_init(&conn->core_activation, 0); - sys_atomic_init(&conn->raw_opened, 1); + sys_atomic_init(&conn->raw_opened, 0); conn->listener_side = true; conn->state = LSIDE_INITIAL;