Skip to content

Commit

Permalink
fixup: block core activation until after raw conn is active
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Mar 1, 2024
1 parent af70e0a commit 2f6fe8c
Showing 1 changed file with 35 additions and 23 deletions.
58 changes: 35 additions & 23 deletions src/adaptors/tcp_lite/tcp_lite.c
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,9 @@ static void free_connection_IO(void *context)
tcplite_connection_t *conn = (tcplite_connection_t*) context;
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Cleaning up resources", conn->conn_id);

// disable activation via Core thread
// Disable activation via Core thread. The lock needs to be taken to ensure the core thread is not currently
// attempting to activate the connection: after the mutex is unlocked we're guaranteed no further activations can
// take place.
sys_mutex_lock(&conn->activation_lock);
CLEAR_ATOMIC_FLAG(&conn->raw_opened);
sys_mutex_unlock(&conn->activation_lock);
Expand Down Expand Up @@ -1208,9 +1210,7 @@ static uint64_t handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qd
// The raw connection establishment must be the last thing done in this function.
// After this call, a separate IO thread may immediately be invoked in the context
// of the new connection to handle raw connection events.
// ISSUE-1202 - Set the conn->raw_opened flag before calling pn_proactor_raw_connect()
//
SET_ATOMIC_FLAG(&conn->raw_opened);
pn_proactor_raw_connect(tcplite_context->proactor, conn->raw_conn, cr->adaptor_config->host_port);

return QD_DELIVERY_MOVED_TO_NEW_LINK;
Expand Down Expand Up @@ -1586,14 +1586,15 @@ static void connection_run_LSIDE_IO(tcplite_connection_t *conn)

switch (conn->state) {
case LSIDE_INITIAL:
// raw connection is active
if (conn->tls) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] LSIDE_IO performing TLS handshake", conn->conn_id);
set_state_XSIDE_IO(conn, LSIDE_TLS_HANDSHAKE);
repeat = true;
} else {
link_setup_LSIDE_IO(conn);
set_state_XSIDE_IO(conn, LSIDE_LINK_SETUP);
if (IS_ATOMIC_FLAG_SET(&conn->raw_opened)) { // raw connection is active
if (conn->tls) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] LSIDE_IO performing TLS handshake", conn->conn_id);
set_state_XSIDE_IO(conn, LSIDE_TLS_HANDSHAKE);
repeat = true;
} else {
link_setup_LSIDE_IO(conn);
set_state_XSIDE_IO(conn, LSIDE_LINK_SETUP);
}
}
break;

Expand Down Expand Up @@ -1685,9 +1686,10 @@ static void connection_run_CSIDE_IO(tcplite_connection_t *conn)

switch (conn->state) {
case CSIDE_INITIAL:
// raw connection is active
link_setup_CSIDE_IO(conn, conn->outbound_delivery);
set_state_XSIDE_IO(conn, CSIDE_LINK_SETUP);
if (IS_ATOMIC_FLAG_SET(&conn->raw_opened)) { // raw connection is active
link_setup_CSIDE_IO(conn, conn->outbound_delivery);
set_state_XSIDE_IO(conn, CSIDE_LINK_SETUP);
}
break;

case CSIDE_LINK_SETUP:
Expand Down Expand Up @@ -1835,10 +1837,15 @@ static char *get_tls_negotiated_alpn(qd_message_t *msg)
static void on_connection_event_LSIDE_IO(pn_event_t *e, qd_server_t *qd_server, void *context)
{
SET_THREAD_RAW_IO;
tcplite_connection_t *conn = (tcplite_connection_t*) context;
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] on_connection_event_LSIDE_IO: %s", conn->conn_id, pn_event_type_name(pn_event_type(e)));

if (pn_event_type(e) == PN_RAW_CONNECTION_DISCONNECTED) {
pn_event_type_t etype = pn_event_type(e);
tcplite_connection_t *conn = (tcplite_connection_t*) context;
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] on_connection_event_LSIDE_IO: %s", conn->conn_id, pn_event_type_name(etype));

if (etype == PN_RAW_CONNECTION_CONNECTED) {
// it is safe to call pn_raw_connection_wake() now
assert(!IS_ATOMIC_FLAG_SET(&conn->raw_opened));
SET_ATOMIC_FLAG(&conn->raw_opened);
} else if (etype == PN_RAW_CONNECTION_DISCONNECTED) {
conn->error = !!conn->raw_conn ? pn_raw_connection_condition(conn->raw_conn) : 0;

if (!!conn->error) {
Expand Down Expand Up @@ -1867,10 +1874,15 @@ static void on_connection_event_LSIDE_IO(pn_event_t *e, qd_server_t *qd_server,
static void on_connection_event_CSIDE_IO(pn_event_t *e, qd_server_t *qd_server, void *context)
{
SET_THREAD_RAW_IO;
tcplite_connection_t *conn = (tcplite_connection_t*) context;
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] on_connection_event_CSIDE_IO: %s", conn->conn_id, pn_event_type_name(pn_event_type(e)));

if (pn_event_type(e) == PN_RAW_CONNECTION_DISCONNECTED) {
pn_event_type_t etype = pn_event_type(e);
tcplite_connection_t *conn = (tcplite_connection_t*) context;
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] on_connection_event_CSIDE_IO: %s", conn->conn_id, pn_event_type_name(etype));

if (etype == PN_RAW_CONNECTION_CONNECTED) {
// it is safe to call pn_raw_connection_wake() now
assert(!IS_ATOMIC_FLAG_SET(&conn->raw_opened));
SET_ATOMIC_FLAG(&conn->raw_opened);
} else if (etype == PN_RAW_CONNECTION_DISCONNECTED) {
conn->error = !!conn->raw_conn ? pn_raw_connection_condition(conn->raw_conn) : 0;

if (!!conn->error) {
Expand Down Expand Up @@ -1931,7 +1943,7 @@ static void on_accept(qd_adaptor_listener_t *listener, pn_listener_t *pn_listene

sys_mutex_init(&conn->activation_lock);
sys_atomic_init(&conn->core_activation, 0);
sys_atomic_init(&conn->raw_opened, 1);
sys_atomic_init(&conn->raw_opened, 0);

conn->listener_side = true;
conn->state = LSIDE_INITIAL;
Expand Down

0 comments on commit 2f6fe8c

Please sign in to comment.