diff --git a/src/adaptors/tcp/tcp_adaptor.c b/src/adaptors/tcp/tcp_adaptor.c index eb4fdb968..e0be3b1ed 100644 --- a/src/adaptors/tcp/tcp_adaptor.c +++ b/src/adaptors/tcp/tcp_adaptor.c @@ -763,7 +763,7 @@ static uint64_t produce_read_buffers_XSIDE_IO(qd_tcp_connection_t *conn, qd_mess if (qd_buffer_size(buf) > 0) { DEQ_INSERT_TAIL(qd_buffers, buf); if (conn->listener_side && !!conn->observer_handle) { - qdpo_data(conn->observer_handle, conn->listener_side, qd_buffer_base(buf), qd_buffer_size(buf)); + qdpo_data(conn->observer_handle, true, qd_buffer_base(buf), qd_buffer_size(buf)); } } else { qd_buffer_free(buf); @@ -802,7 +802,7 @@ static uint64_t consume_write_buffers_XSIDE_IO(qd_tcp_connection_t *conn, qd_mes qd_buffer_t *buf = DEQ_HEAD(buffers); for (size_t i = 0; i < actual; i++) { if (conn->listener_side && !!conn->observer_handle) { - qdpo_data(conn->observer_handle, conn->listener_side, qd_buffer_base(buf), qd_buffer_size(buf)); + qdpo_data(conn->observer_handle, false, qd_buffer_base(buf), qd_buffer_size(buf)); } raw_buffers[i].context = (uintptr_t) buf; raw_buffers[i].bytes = (char*) qd_buffer_base(buf); @@ -824,11 +824,9 @@ static uint64_t consume_write_buffers_XSIDE_IO(qd_tcp_connection_t *conn, qd_mes // output message buffers and will free them as they are processed. Due to that we need to make a copy of these buffers // in order to avoid freeing buffers that are part of the message (double-free). // -static uint64_t copy_message_body_TLS_XSIDE_IO(qd_tcp_connection_t *conn, qd_message_t *stream, qd_buffer_list_t *buffers, size_t limit) +static void copy_message_body_TLS_XSIDE_IO(qd_tcp_connection_t *conn, qd_message_t *stream, qd_buffer_list_t *buffers, size_t limit) { size_t offset = 0; - uint64_t octets = 0; - const bool observe = (conn->listener_side && !!conn->observer_handle); assert(conn->tls); @@ -843,12 +841,8 @@ static uint64_t copy_message_body_TLS_XSIDE_IO(qd_tcp_connection_t *conn, qd_mes qd_buffer_t *clone = qd_buffer(); clone->size = size; memcpy(qd_buffer_base(clone), qd_buffer_base(conn->outbound_body) + offset, size); - if (observe) { - qdpo_data(conn->observer_handle, conn->listener_side, qd_buffer_base(clone), qd_buffer_size(clone)); - } DEQ_INSERT_TAIL(*buffers, clone); } - octets += size; offset = 0; conn->outbound_body = DEQ_NEXT(conn->outbound_body); } @@ -857,8 +851,6 @@ static uint64_t copy_message_body_TLS_XSIDE_IO(qd_tcp_connection_t *conn, qd_mes conn->outbound_body_complete = true; qd_message_release_raw_body(stream); } - - return octets; } static uint64_t consume_message_body_XSIDE_IO(qd_tcp_connection_t *conn, qd_message_t *stream) @@ -890,7 +882,7 @@ static uint64_t consume_message_body_XSIDE_IO(qd_tcp_connection_t *conn, qd_mess size_t size = qd_buffer_size(conn->outbound_body) - offset; if (observe) { - qdpo_data(conn->observer_handle, conn->listener_side, bytes, size); + qdpo_data(conn->observer_handle, false, bytes, size); } pn_raw_buffer_t raw_buffer; raw_buffer.context = 0; @@ -1438,9 +1430,8 @@ static bool manage_flow_XSIDE_IO(qd_tcp_connection_t *conn) // static int64_t tls_consume_data_buffers(void *context, qd_buffer_list_t *buffers, size_t limit) { - qd_tcp_connection_t *conn = (qd_tcp_connection_t *) context; - const bool observe = conn->listener_side && !!conn->observer_handle; - uint64_t octets = 0; + qd_tcp_connection_t *conn = (qd_tcp_connection_t *) context; + uint64_t octets = 0; assert(limit > 0); assert(DEQ_IS_EMPTY(*buffers)); @@ -1449,7 +1440,7 @@ static int64_t tls_consume_data_buffers(void *context, qd_buffer_list_t *buffers return octets; if (!conn->outbound_body_complete) { - octets = copy_message_body_TLS_XSIDE_IO(conn, conn->outbound_stream, buffers, limit); + copy_message_body_TLS_XSIDE_IO(conn, conn->outbound_stream, buffers, limit); assert(limit >= DEQ_SIZE(*buffers)); limit -= DEQ_SIZE(*buffers); } @@ -1458,19 +1449,21 @@ static int64_t tls_consume_data_buffers(void *context, qd_buffer_list_t *buffers qd_buffer_list_t tmp = DEQ_EMPTY; qd_message_consume_buffers(conn->outbound_stream, &tmp, limit); assert(limit >= DEQ_SIZE(tmp)); - limit -= DEQ_SIZE(tmp); - qd_buffer_t *buf = DEQ_HEAD(tmp); + DEQ_APPEND(*buffers, tmp); + } + + if (!DEQ_IS_EMPTY(*buffers)) { + const bool observe = conn->listener_side && !!conn->observer_handle; + + qd_buffer_t *buf = DEQ_HEAD(*buffers); while (buf) { octets += qd_buffer_size(buf); if (observe) { - qdpo_data(conn->observer_handle, conn->listener_side, qd_buffer_base(buf), qd_buffer_size(buf)); + qdpo_data(conn->observer_handle, false, qd_buffer_base(buf), qd_buffer_size(buf)); } buf = DEQ_NEXT(buf); } - DEQ_APPEND(*buffers, tmp); - } - if (octets) { qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] TLS consumed %"PRIu64" cleartext octets from stream", conn->conn_id, octets); @@ -1545,6 +1538,7 @@ static bool manage_tls_flow_XSIDE_IO(qd_tcp_connection_t *conn) const int tls_status = qd_tls_do_io2(conn->tls, conn->raw_conn, tls_consume_data_buffers, conn, (can_produce) ? &decrypted_buffers : 0, &decrypted_octets); + // // Process inbound cleartext data. // @@ -1553,6 +1547,15 @@ static bool manage_tls_flow_XSIDE_IO(qd_tcp_connection_t *conn) more_work = true; conn->inbound_octets += decrypted_octets; vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_OCTETS, conn->inbound_octets); + + if (conn->listener_side && !!conn->observer_handle) { + qd_buffer_t *buf = DEQ_HEAD(decrypted_buffers); + while (buf) { + qdpo_data(conn->observer_handle, true, qd_buffer_base(buf), qd_buffer_size(buf)); + buf = DEQ_NEXT(buf); + } + } + qd_message_produce_buffers(conn->inbound_stream, &decrypted_buffers); qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] %cSIDE TLS read: Produced %"PRIu64" octets into stream", conn->conn_id, conn->listener_side ? 'L' : 'C', decrypted_octets);