From d59f4bbc57bf6f8e20b8d42e13e91ced1d95dc11 Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Wed, 22 Nov 2023 09:35:20 -0500 Subject: [PATCH] WIP: debug --- src/adaptors/tcp_lite/tcp_lite.c | 93 ++++++++++++++++++++++++++++++-- src/adaptors/tcp_lite/tcp_lite.h | 9 +++- 2 files changed, 98 insertions(+), 4 deletions(-) diff --git a/src/adaptors/tcp_lite/tcp_lite.c b/src/adaptors/tcp_lite/tcp_lite.c index 56f378d64..d604b7da6 100644 --- a/src/adaptors/tcp_lite/tcp_lite.c +++ b/src/adaptors/tcp_lite/tcp_lite.c @@ -84,6 +84,19 @@ static uint64_t buffer_threshold_50; static uint64_t buffer_threshold_75; static uint64_t buffer_threshold_85; +//================================================================================= +// Window Flow Control +//================================================================================= +#define TCP_MAX_CAPACITY UINT64_C(1459620) +#define TCP_ACK_WINDOW (TCP_MAX_CAPACITY / 4) + +// is the incoming byte window full? +// +inline static bool window_full(const tcplite_connection_t *conn) +{ + return !conn->window.disabled && (conn->inbound_octets - conn->window.last_update) >= TCP_MAX_CAPACITY; +} + // // Forward References @@ -366,6 +379,9 @@ static void free_connection_IO(void *context) tcplite_connection_t *conn = (tcplite_connection_t*) context; qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Cleaning up resources", conn->common.conn_id); + qd_log(LOG_TCP_ADAPTOR, QD_LOG_INFO, "[%" PRIu64 "] window closed count=%" PRIu64 " no-grant=%" PRIu64, + conn->common.conn_id, conn->window.closed_count, conn->window.no_grant); + if (!!conn->common.parent && conn->common.parent->context_type == TL_LISTENER) { tcplite_listener_t *li = (tcplite_listener_t*) conn->common.parent; sys_mutex_lock(&li->lock); @@ -508,6 +524,14 @@ static void grant_read_buffers_XSIDE_IO(tcplite_connection_t *conn, const size_t { ASSERT_RAW_IO; + // + // Cannot grant read buffers if the connection is currently blocked due to window flow control + // + if (window_full(conn)) { + conn->window.no_grant += 1; + return; + } + // // Define the allocation tiers. The tier values are the number of read buffers to be granted // to raw connections based on the percentage of usage of the router-wide buffer ceiling. @@ -1080,12 +1104,19 @@ static bool manage_flow_XSIDE_IO(tcplite_connection_t *conn) // // Produce available read buffers into the inbound stream // + bool was_blocked = window_full(conn); bool blocked; uint64_t octet_count = produce_read_buffers_XSIDE_IO(conn, conn->inbound_stream, &blocked); conn->inbound_octets += octet_count; if (octet_count > 0) { qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] %cSIDE Raw read: Produced %"PRIu64" octets into stream", conn->common.conn_id, conn->listener_side ? 'L' : 'C', octet_count); + if (!was_blocked && window_full(conn)) { + conn->window.closed_count += 1; + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP RX window CLOSED: inbound_bytes=%" PRIu64 " unacked=%" PRIu64, + DLV_ARGS(conn->inbound_delivery), conn->inbound_octets, + (conn->inbound_octets - conn->window.last_update)); + } } // @@ -1140,6 +1171,7 @@ static bool manage_flow_XSIDE_IO(tcplite_connection_t *conn) if (!conn->outbound_body_complete) { uint64_t body_octets = consume_message_body_XSIDE_IO(conn, conn->outbound_stream); conn->outbound_octets += body_octets; + conn->window.pending_ack += body_octets; if (body_octets > 0) { qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] %cSIDE Raw write: Consumed %"PRIu64" octets from stream (body-field)", conn->common.conn_id, conn->listener_side ? 'L' : 'C', body_octets); } @@ -1151,7 +1183,7 @@ static bool manage_flow_XSIDE_IO(tcplite_connection_t *conn) if (conn->outbound_body_complete) { uint64_t octets = consume_write_buffers_XSIDE_IO(conn, conn->outbound_stream); conn->outbound_octets += octets; - + conn->window.pending_ack += octets; if (octets > 0) { qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] %cSIDE Raw write: Consumed %"PRIu64" octets from stream", conn->common.conn_id, conn->listener_side ? 'L' : 'C', octets); } @@ -1181,6 +1213,23 @@ static bool manage_flow_XSIDE_IO(tcplite_connection_t *conn) qdr_delivery_remote_state_updated(tcplite_context->core, conn->outbound_delivery, PN_ACCEPTED, true, 0, true); // accepted, settled, ref_given conn->outbound_delivery = 0; conn->outbound_stream = 0; + } else { + // + // More to send. Check if enough octets have been written to open up the window + // + if (conn->window.pending_ack >= TCP_ACK_WINDOW) { + qd_delivery_state_t *dstate = qd_delivery_state(); + dstate->section_number = 0; + dstate->section_offset = conn->outbound_octets; + qdr_delivery_remote_state_updated(tcplite_context->core, conn->outbound_delivery, PN_RECEIVED, + false, dstate, false); // received, !settled, !ref_given + + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, + DLV_FMT " PN_RECEIVED sent with section_offset=%" PRIu64 " pending=%" PRIu64, + DLV_ARGS(conn->outbound_delivery), conn->outbound_octets, conn->window.pending_ack); + + conn->window.pending_ack = 0; + } } } @@ -1619,8 +1668,8 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di } else if (dlv == conn->inbound_delivery) { qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " Inbound delivery update - disposition: %s", DLV_ARGS(dlv), pn_disposition_type_name(disp)); conn->inbound_disposition = disp; - need_wake = true; - if (qd_delivery_state_is_terminal(disp) && disp != PN_ACCEPTED) { + const bool final_outcome = qd_delivery_state_is_terminal(disp); + if (final_outcome && disp != PN_ACCEPTED) { // The delivery failed - this is unrecoverable. if (!!conn->raw_conn) { // set the raw connection condition info so it will appear in the vanflow logs @@ -1633,7 +1682,45 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di pn_raw_connection_close(conn->raw_conn); // clean stuff up when DISCONNECT event arrives } + } else { + // + // handle flow control window updates + // + const bool window_was_full = window_full(conn); + if (disp == PN_RECEIVED) { + // + // The egress adaptor for TCP flow has sent us its count of sent bytes + // + uint64_t ignore; + qd_delivery_state_t *dstate = qdr_delivery_take_local_delivery_state(dlv, &ignore); + + // Resend released will generate a PN_RECEIVED with section_offset == 0, ignore it. Ensure updates + // arrive in order, which may be possible if cut-through for disposition updates is implemented. + if (dstate && dstate->section_offset > 0 + && (int64_t)(dstate->section_offset - conn->window.last_update) > 0) { + //vflow_set_uint64(tc->vflow, VFLOW_ATTRIBUTE_OCTETS_UNACKED, tc->bytes_unacked); + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, + DLV_FMT " PN_RECEIVED inbound_bytes=%" PRIu64 ", was_unacked=%" PRIu64 ", rcv_offset=%" PRIu64 " now_unacked=%" PRIu64, + DLV_ARGS(dlv), conn->inbound_octets, + (conn->inbound_octets - conn->window.last_update), + dstate->section_offset, + (conn->inbound_octets - dstate->section_offset)); + + conn->window.last_update = dstate->section_offset; + + qd_delivery_state_free(dstate); + } + } + conn->window.disabled = conn->window.disabled || settled || final_outcome; + if (window_was_full && !window_full(conn)) { + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, + DLV_FMT " TCP RX window %s: inbound_bytes=%" PRIu64 " unacked=%" PRIu64, + DLV_ARGS(dlv), + conn->window.disabled ? "DISABLED" : "OPENED", + conn->inbound_octets, (conn->inbound_octets - conn->window.last_update)); + } } + need_wake = !window_full(conn); } if (need_wake) { diff --git a/src/adaptors/tcp_lite/tcp_lite.h b/src/adaptors/tcp_lite/tcp_lite.h index 0f4868435..022d427c7 100644 --- a/src/adaptors/tcp_lite/tcp_lite.h +++ b/src/adaptors/tcp_lite/tcp_lite.h @@ -138,8 +138,15 @@ typedef struct tcplite_connection_t { pn_condition_t *error; char *reply_to; qd_handler_context_t context; - tcplite_connection_state_t state; qdpo_transport_handle_t *observer_handle; + tcplite_connection_state_t state; + struct { + uint64_t last_update; + uint64_t pending_ack; + uint64_t closed_count; + uint64_t no_grant; + bool disabled; + } window; bool listener_side; bool inbound_credit; bool inbound_first_octet;