Skip to content

Commit

Permalink
Fixes skupperproject#1432: prevent use-after-free race in cutthrough …
Browse files Browse the repository at this point in the history
…activation.

The patch adds locking to the cutthrough activation logic that
prevents the activation handler from being released while another
thread attempts using the activation.

Closes skupperproject#1432
  • Loading branch information
kgiusti committed Mar 21, 2024
1 parent 12e7e2b commit 0ac5158
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 174 deletions.
4 changes: 2 additions & 2 deletions include/qpid/dispatch/cutthrough_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
*
* @param msg - Pointer to a stream
*/
void cutthrough_notify_buffers_produced_inbound(qd_message_t *msg);
void cutthrough_notify_buffers_consumed_outbound(qd_message_t *msg);
void cutthrough_notify_buffers_produced_inbound(const qd_message_activation_t *activation);
void cutthrough_notify_buffers_consumed_outbound(const qd_message_activation_t *activation);

#endif
28 changes: 6 additions & 22 deletions include/qpid/dispatch/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -754,20 +754,6 @@ void qd_message_produce_buffers(qd_message_t *stream, qd_buffer_list_t *buffers)
int qd_message_consume_buffers(qd_message_t *stream, qd_buffer_list_t *buffers, int limit);


/**
* Indicate whether this stream should be resumed from a stalled state. This will be the case
* if (a) the stream was stalled due to being full, and (b) the payload has shrunk down below
* the resume threshold.
*
* If the result is true, there is a side effect of clearing the 'stalled' state.
*
* @param stream Pointer to the message
* @return true Yes, the stream was stalled and buffer production may continue
* @return false No, the stream was not stalled or it was stalled and is not yet ready to resume
*/
bool qd_message_resume_from_stalled(qd_message_t *stream);


typedef enum {
QD_ACTIVATION_NONE = 0,
QD_ACTIVATION_AMQP,
Expand All @@ -784,33 +770,31 @@ typedef struct {
* Tell the message stream which connection is consuming its buffers.
*
* @param stream Pointer to the message
* @param connection Pointer to the qd_connection that is consuming this stream's buffers
* @param activation Parameters for activating the consuming I/O thread
*/
void qd_message_set_consumer_activation(qd_message_t *stream, qd_message_activation_t *activation);

/**
* Return the connection that is consuming this message stream's buffers.
* Cancel the activation. No further activations will be occur on return from this call.
*
* @param stream Pointer to the message
* @return qd_connection_t* Pointer to the connection that is consuming buffers from this stream
*/
void qd_message_get_consumer_activation(const qd_message_t *stream, qd_message_activation_t *activation);
void qd_message_cancel_consumer_activation(qd_message_t *stream);

/**
* Tell the message stream which connection is producing its buffers.
*
* @param stream Pointer to the message
* @param connection Pointer to the qd_connection that is consuming this stream's buffers
* @param activation Parameters for activating the producing I/O thread
*/
void qd_message_set_producer_activation(qd_message_t *stream, qd_message_activation_t *activation);

/**
* Return the connection that is producing this message stream's buffers.
* Cancel the activation. No further activations will occur on return from this call.
*
* @param stream Pointer to the message
* @return qd_connection_t* Pointer to the connection that is consuming buffers from this stream
*/
void qd_message_get_producer_activation(const qd_message_t *stream, qd_message_activation_t *activation);
void qd_message_cancel_producer_activation(qd_message_t *stream);

///@}

Expand Down
71 changes: 35 additions & 36 deletions src/adaptors/tcp_lite/tcp_lite.c
Original file line number Diff line number Diff line change
Expand Up @@ -539,17 +539,14 @@ static void close_connection_XSIDE_IO(tcplite_connection_t *conn, bool no_delay)

free(conn->reply_to);

qd_message_activation_t activation;
activation.type = QD_ACTIVATION_NONE;
activation.delivery = 0;
qd_nullify_safe_ptr(&activation.safeptr);

if (!!conn->inbound_stream) {
qd_message_set_producer_activation(conn->inbound_stream, &activation);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP cancel producer activation", DLV_ARGS(conn->inbound_delivery));
qd_message_cancel_producer_activation(conn->inbound_stream);
}

if (!!conn->outbound_stream) {
qd_message_set_consumer_activation(conn->outbound_stream, &activation);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP cancel consumer activation", DLV_ARGS(conn->outbound_delivery));
qd_message_cancel_consumer_activation(conn->outbound_stream);
}

if (!!conn->inbound_delivery) {
Expand Down Expand Up @@ -917,6 +914,17 @@ static void link_setup_CSIDE_IO(tcplite_connection_t *conn, qdr_delivery_t *deli
qdr_link_set_context(conn->inbound_link, conn);
conn->outbound_link = qdr_link_first_attach(conn->core_conn, QD_OUTGOING, qdr_terminus(0), qdr_terminus(0), "tcp.cside.out", 0, false, delivery, &conn->outbound_link_id);
qdr_link_set_context(conn->outbound_link, conn);

// now that the raw connection is up and able to be activated enable cutthrough activation

assert(conn->outbound_stream);
qd_message_activation_t activation;
activation.type = QD_ACTIVATION_TCP;
activation.delivery = 0;
qd_alloc_set_safe_ptr(&activation.safeptr, conn);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP enabling consumer activation", DLV_ARGS(delivery));
qd_message_set_consumer_activation(conn->outbound_stream, &activation);
qd_message_start_unicast_cutthrough(conn->outbound_stream);
}


Expand Down Expand Up @@ -983,6 +991,7 @@ static bool try_compose_and_send_client_stream_LSIDE_IO(tcplite_connection_t *co
activation.type = QD_ACTIVATION_TCP;
activation.delivery = 0;
qd_alloc_set_safe_ptr(&activation.safeptr, conn);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "][L%" PRIu64 "] TCP enabling producer activation", conn->conn_id, conn->inbound_link_id);
qd_message_set_producer_activation(conn->inbound_stream, &activation);
qd_message_start_unicast_cutthrough(conn->inbound_stream);

Expand Down Expand Up @@ -1049,6 +1058,7 @@ static void compose_and_send_server_stream_CSIDE_IO(tcplite_connection_t *conn)
activation.type = QD_ACTIVATION_TCP;
activation.delivery = 0;
qd_alloc_set_safe_ptr(&activation.safeptr, conn);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "][L%" PRIu64 "] TCP enabling producer activation", conn->conn_id, conn->inbound_link_id);
qd_message_set_producer_activation(conn->inbound_stream, &activation);
qd_message_start_unicast_cutthrough(conn->inbound_stream);

Expand Down Expand Up @@ -1119,6 +1129,7 @@ static uint64_t handle_outbound_delivery_LSIDE_IO(tcplite_connection_t *conn, qd
activation.type = QD_ACTIVATION_TCP;
activation.delivery = 0;
qd_alloc_set_safe_ptr(&activation.safeptr, conn);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP enabling consumer activation", DLV_ARGS(delivery));
qd_message_set_consumer_activation(conn->outbound_stream, &activation);
qd_message_start_unicast_cutthrough(conn->outbound_stream);
}
Expand Down Expand Up @@ -1184,16 +1195,6 @@ static uint64_t handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qd
conn->raw_conn = pn_raw_connection();
pn_raw_connection_set_context(conn->raw_conn, &conn->context);

//
// Message validation ensures the start of the message body is present. Activate cutthrough on the body data.
//
qd_message_activation_t activation;
activation.type = QD_ACTIVATION_TCP;
activation.delivery = 0;
qd_alloc_set_safe_ptr(&activation.safeptr, conn);
qd_message_set_consumer_activation(conn->outbound_stream, &activation);
qd_message_start_unicast_cutthrough(conn->outbound_stream);

//
// The raw connection establishment must be the last thing done in this function.
// After this call, a separate IO thread may immediately be invoked in the context
Expand Down Expand Up @@ -1274,12 +1275,16 @@ static bool manage_flow_XSIDE_IO(tcplite_connection_t *conn)
// If the raw connection is read-closed and the last produce did not block, settle and complete
// the inbound stream/delivery and close out the inbound half of the connection.
//

if (read_closed) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " Raw conn read-closed - close inbound delivery", DLV_ARGS(conn->inbound_delivery));
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG,
DLV_FMT " Raw conn read-closed - close inbound delivery, cancel producer activation",
DLV_ARGS(conn->inbound_delivery));
qd_message_set_receive_complete(conn->inbound_stream);
qd_message_cancel_producer_activation(conn->inbound_stream);
qdr_delivery_continue(tcplite_context->core, conn->inbound_delivery, false);
qdr_delivery_set_context(conn->inbound_delivery, 0);
qdr_delivery_decref(tcplite_context->core, conn->inbound_delivery, "TCP_LSIDE_IO - read-close");
qdr_delivery_decref(tcplite_context->core, conn->inbound_delivery, "FLOW_XSIDE_IO - inbound_delivery released");
conn->inbound_delivery = 0;
conn->inbound_stream = 0;
return true;
Expand Down Expand Up @@ -1346,10 +1351,14 @@ static bool manage_flow_XSIDE_IO(tcplite_connection_t *conn)
// payload has been consumed and written before write-closing the connection.
//
if (qd_message_receive_complete(conn->outbound_stream) && !qd_message_can_consume_buffers(conn->outbound_stream)) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Rx-complete, rings empty: Write-closing the raw connection", conn->conn_id);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " Rx-complete, rings empty: Write-closing the raw connection, consumer activation cancelled",
DLV_ARGS(conn->outbound_delivery));
pn_raw_connection_write_close(conn->raw_conn);
qd_message_set_send_complete(conn->outbound_stream);
qd_message_cancel_consumer_activation(conn->outbound_stream);
qdr_delivery_set_context(conn->outbound_delivery, 0);
qdr_delivery_remote_state_updated(tcplite_context->core, conn->outbound_delivery, PN_ACCEPTED, true, 0, true); // accepted, settled, ref_given
// do NOT decref outbound_delivery - ref count passed to qdr_delivery_remote_state_updated()!
conn->outbound_delivery = 0;
conn->outbound_stream = 0;
} else {
Expand Down Expand Up @@ -1520,15 +1529,10 @@ static bool manage_tls_flow_XSIDE_IO(tcplite_connection_t *conn)
//
bool ignore;
if (qd_tls_is_input_drained(conn->tls, &ignore) && conn->inbound_stream) {
qd_message_activation_t activation;
activation.type = QD_ACTIVATION_NONE;
activation.delivery = 0;
qd_nullify_safe_ptr(&activation.safeptr);

qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TLS inbound stream receive complete", DLV_ARGS(conn->inbound_delivery));
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TLS inbound stream receive complete, producer activation cancelled", DLV_ARGS(conn->inbound_delivery));

qd_message_set_receive_complete(conn->inbound_stream);
qd_message_set_producer_activation(conn->inbound_stream, &activation);
qd_message_cancel_producer_activation(conn->inbound_stream);

qdr_delivery_set_context(conn->inbound_delivery, 0);
qdr_delivery_continue(tcplite_context->core, conn->inbound_delivery, false);
Expand All @@ -1541,21 +1545,16 @@ static bool manage_tls_flow_XSIDE_IO(tcplite_connection_t *conn)
// Check for end of outbound message data
//
if (qd_tls_is_output_flushed(conn->tls) && conn->outbound_stream) {
qd_message_activation_t activation;
activation.type = QD_ACTIVATION_NONE;
activation.delivery = 0;
qd_nullify_safe_ptr(&activation.safeptr);

qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TLS outbound stream send complete", DLV_ARGS(conn->outbound_delivery));
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TLS outbound stream send complete, consumer activation cancelled", DLV_ARGS(conn->outbound_delivery));

qd_message_set_send_complete(conn->outbound_stream);
qd_message_set_consumer_activation(conn->outbound_stream, &activation);
qd_message_cancel_consumer_activation(conn->outbound_stream);

qdr_delivery_set_context(conn->outbound_delivery, 0);
qdr_delivery_remote_state_updated(tcplite_context->core, conn->outbound_delivery,
tls_status < 0 ? PN_MODIFIED : PN_ACCEPTED,
true, 0, false); // settled, 0, ref_given
qdr_delivery_decref(tcplite_context->core, conn->outbound_delivery, "TLS_FLOW_XSIDE_IO - outbound_delivery released");
true, 0, true); // settled, 0, ref_given
// do NOT decref outbound_delivery - ref count passed to qdr_delivery_remote_state_updated()!
conn->outbound_delivery = 0;
conn->outbound_stream = 0;
}
Expand Down
17 changes: 5 additions & 12 deletions src/cutthrough_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "adaptors/tcp_lite/tcp_lite.h"


static void activate_connection(qd_message_activation_t *activation, qd_direction_t dir)
static void activate_connection(const qd_message_activation_t *activation, qd_direction_t dir)
{
switch (activation->type) {
case QD_ACTIVATION_NONE:
Expand Down Expand Up @@ -77,20 +77,13 @@ static void activate_connection(qd_message_activation_t *activation, qd_directio
}


void cutthrough_notify_buffers_produced_inbound(qd_message_t *msg)
void cutthrough_notify_buffers_produced_inbound(const qd_message_activation_t *activation)
{
qd_message_activation_t activation;
qd_message_get_consumer_activation(msg, &activation);
activate_connection(&activation, QD_OUTGOING);
activate_connection(activation, QD_OUTGOING);
}


void cutthrough_notify_buffers_consumed_outbound(qd_message_t *msg)
void cutthrough_notify_buffers_consumed_outbound(const qd_message_activation_t *activation)
{
bool unstall = qd_message_resume_from_stalled(msg);
if (unstall) {
qd_message_activation_t activation;
qd_message_get_producer_activation(msg, &activation);
activate_connection(&activation, QD_INCOMING);
}
activate_connection(activation, QD_INCOMING);
}
Loading

0 comments on commit 0ac5158

Please sign in to comment.