Skip to content

Commit

Permalink
fixup: proper use of from_client parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed May 7, 2024
1 parent c19dd9a commit 9b3a094
Showing 1 changed file with 25 additions and 22 deletions.
47 changes: 25 additions & 22 deletions src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ static uint64_t produce_read_buffers_XSIDE_IO(qd_tcp_connection_t *conn, qd_mess
if (qd_buffer_size(buf) > 0) {
DEQ_INSERT_TAIL(qd_buffers, buf);
if (conn->listener_side && !!conn->observer_handle) {
qdpo_data(conn->observer_handle, conn->listener_side, qd_buffer_base(buf), qd_buffer_size(buf));
qdpo_data(conn->observer_handle, true, qd_buffer_base(buf), qd_buffer_size(buf));
}
} else {
qd_buffer_free(buf);
Expand Down Expand Up @@ -802,7 +802,7 @@ static uint64_t consume_write_buffers_XSIDE_IO(qd_tcp_connection_t *conn, qd_mes
qd_buffer_t *buf = DEQ_HEAD(buffers);
for (size_t i = 0; i < actual; i++) {
if (conn->listener_side && !!conn->observer_handle) {
qdpo_data(conn->observer_handle, conn->listener_side, qd_buffer_base(buf), qd_buffer_size(buf));
qdpo_data(conn->observer_handle, false, qd_buffer_base(buf), qd_buffer_size(buf));
}
raw_buffers[i].context = (uintptr_t) buf;
raw_buffers[i].bytes = (char*) qd_buffer_base(buf);
Expand All @@ -824,11 +824,9 @@ static uint64_t consume_write_buffers_XSIDE_IO(qd_tcp_connection_t *conn, qd_mes
// output message buffers and will free them as they are processed. Due to that we need to make a copy of these buffers
// in order to avoid freeing buffers that are part of the message (double-free).
//
static uint64_t copy_message_body_TLS_XSIDE_IO(qd_tcp_connection_t *conn, qd_message_t *stream, qd_buffer_list_t *buffers, size_t limit)
static void copy_message_body_TLS_XSIDE_IO(qd_tcp_connection_t *conn, qd_message_t *stream, qd_buffer_list_t *buffers, size_t limit)
{
size_t offset = 0;
uint64_t octets = 0;
const bool observe = (conn->listener_side && !!conn->observer_handle);

assert(conn->tls);

Expand All @@ -843,12 +841,8 @@ static uint64_t copy_message_body_TLS_XSIDE_IO(qd_tcp_connection_t *conn, qd_mes
qd_buffer_t *clone = qd_buffer();
clone->size = size;
memcpy(qd_buffer_base(clone), qd_buffer_base(conn->outbound_body) + offset, size);
if (observe) {
qdpo_data(conn->observer_handle, conn->listener_side, qd_buffer_base(clone), qd_buffer_size(clone));
}
DEQ_INSERT_TAIL(*buffers, clone);
}
octets += size;
offset = 0;
conn->outbound_body = DEQ_NEXT(conn->outbound_body);
}
Expand All @@ -857,8 +851,6 @@ static uint64_t copy_message_body_TLS_XSIDE_IO(qd_tcp_connection_t *conn, qd_mes
conn->outbound_body_complete = true;
qd_message_release_raw_body(stream);
}

return octets;
}

static uint64_t consume_message_body_XSIDE_IO(qd_tcp_connection_t *conn, qd_message_t *stream)
Expand Down Expand Up @@ -890,7 +882,7 @@ static uint64_t consume_message_body_XSIDE_IO(qd_tcp_connection_t *conn, qd_mess
size_t size = qd_buffer_size(conn->outbound_body) - offset;

if (observe) {
qdpo_data(conn->observer_handle, conn->listener_side, bytes, size);
qdpo_data(conn->observer_handle, false, bytes, size);
}
pn_raw_buffer_t raw_buffer;
raw_buffer.context = 0;
Expand Down Expand Up @@ -1438,9 +1430,8 @@ static bool manage_flow_XSIDE_IO(qd_tcp_connection_t *conn)
//
static int64_t tls_consume_data_buffers(void *context, qd_buffer_list_t *buffers, size_t limit)
{
qd_tcp_connection_t *conn = (qd_tcp_connection_t *) context;
const bool observe = conn->listener_side && !!conn->observer_handle;
uint64_t octets = 0;
qd_tcp_connection_t *conn = (qd_tcp_connection_t *) context;
uint64_t octets = 0;

assert(limit > 0);
assert(DEQ_IS_EMPTY(*buffers));
Expand All @@ -1449,7 +1440,7 @@ static int64_t tls_consume_data_buffers(void *context, qd_buffer_list_t *buffers
return octets;

if (!conn->outbound_body_complete) {
octets = copy_message_body_TLS_XSIDE_IO(conn, conn->outbound_stream, buffers, limit);
copy_message_body_TLS_XSIDE_IO(conn, conn->outbound_stream, buffers, limit);
assert(limit >= DEQ_SIZE(*buffers));
limit -= DEQ_SIZE(*buffers);
}
Expand All @@ -1458,19 +1449,21 @@ static int64_t tls_consume_data_buffers(void *context, qd_buffer_list_t *buffers
qd_buffer_list_t tmp = DEQ_EMPTY;
qd_message_consume_buffers(conn->outbound_stream, &tmp, limit);
assert(limit >= DEQ_SIZE(tmp));
limit -= DEQ_SIZE(tmp);
qd_buffer_t *buf = DEQ_HEAD(tmp);
DEQ_APPEND(*buffers, tmp);
}

if (!DEQ_IS_EMPTY(*buffers)) {
const bool observe = conn->listener_side && !!conn->observer_handle;

qd_buffer_t *buf = DEQ_HEAD(*buffers);
while (buf) {
octets += qd_buffer_size(buf);
if (observe) {
qdpo_data(conn->observer_handle, conn->listener_side, qd_buffer_base(buf), qd_buffer_size(buf));
qdpo_data(conn->observer_handle, false, qd_buffer_base(buf), qd_buffer_size(buf));
}
buf = DEQ_NEXT(buf);
}
DEQ_APPEND(*buffers, tmp);
}

if (octets) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG,
"[C%"PRIu64"] TLS consumed %"PRIu64" cleartext octets from stream", conn->conn_id, octets);

Expand Down Expand Up @@ -1545,6 +1538,7 @@ static bool manage_tls_flow_XSIDE_IO(qd_tcp_connection_t *conn)
const int tls_status = qd_tls_do_io2(conn->tls, conn->raw_conn, tls_consume_data_buffers, conn,
(can_produce) ? &decrypted_buffers : 0,
&decrypted_octets);

//
// Process inbound cleartext data.
//
Expand All @@ -1553,6 +1547,15 @@ static bool manage_tls_flow_XSIDE_IO(qd_tcp_connection_t *conn)
more_work = true;
conn->inbound_octets += decrypted_octets;
vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_OCTETS, conn->inbound_octets);

if (conn->listener_side && !!conn->observer_handle) {
qd_buffer_t *buf = DEQ_HEAD(decrypted_buffers);
while (buf) {
qdpo_data(conn->observer_handle, true, qd_buffer_base(buf), qd_buffer_size(buf));
buf = DEQ_NEXT(buf);
}
}

qd_message_produce_buffers(conn->inbound_stream, &decrypted_buffers);

qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] %cSIDE TLS read: Produced %"PRIu64" octets into stream", conn->conn_id, conn->listener_side ? 'L' : 'C', decrypted_octets);
Expand Down

0 comments on commit 9b3a094

Please sign in to comment.