Skip to content

Commit

Permalink
WIP: debug
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Nov 29, 2023
1 parent 49c7e84 commit d59f4bb
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 4 deletions.
93 changes: 90 additions & 3 deletions src/adaptors/tcp_lite/tcp_lite.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ static uint64_t buffer_threshold_50;
static uint64_t buffer_threshold_75;
static uint64_t buffer_threshold_85;

//=================================================================================
// Window Flow Control
//=================================================================================
#define TCP_MAX_CAPACITY UINT64_C(1459620)
#define TCP_ACK_WINDOW (TCP_MAX_CAPACITY / 4)

// is the incoming byte window full?
//
inline static bool window_full(const tcplite_connection_t *conn)
{
return !conn->window.disabled && (conn->inbound_octets - conn->window.last_update) >= TCP_MAX_CAPACITY;
}


//
// Forward References
Expand Down Expand Up @@ -366,6 +379,9 @@ static void free_connection_IO(void *context)
tcplite_connection_t *conn = (tcplite_connection_t*) context;
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Cleaning up resources", conn->common.conn_id);

qd_log(LOG_TCP_ADAPTOR, QD_LOG_INFO, "[%" PRIu64 "] window closed count=%" PRIu64 " no-grant=%" PRIu64,
conn->common.conn_id, conn->window.closed_count, conn->window.no_grant);

if (!!conn->common.parent && conn->common.parent->context_type == TL_LISTENER) {
tcplite_listener_t *li = (tcplite_listener_t*) conn->common.parent;
sys_mutex_lock(&li->lock);
Expand Down Expand Up @@ -508,6 +524,14 @@ static void grant_read_buffers_XSIDE_IO(tcplite_connection_t *conn, const size_t
{
ASSERT_RAW_IO;

//
// Cannot grant read buffers if the connection is currently blocked due to window flow control
//
if (window_full(conn)) {
conn->window.no_grant += 1;
return;
}

//
// Define the allocation tiers. The tier values are the number of read buffers to be granted
// to raw connections based on the percentage of usage of the router-wide buffer ceiling.
Expand Down Expand Up @@ -1080,12 +1104,19 @@ static bool manage_flow_XSIDE_IO(tcplite_connection_t *conn)
//
// Produce available read buffers into the inbound stream
//
bool was_blocked = window_full(conn);
bool blocked;
uint64_t octet_count = produce_read_buffers_XSIDE_IO(conn, conn->inbound_stream, &blocked);
conn->inbound_octets += octet_count;

if (octet_count > 0) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] %cSIDE Raw read: Produced %"PRIu64" octets into stream", conn->common.conn_id, conn->listener_side ? 'L' : 'C', octet_count);
if (!was_blocked && window_full(conn)) {
conn->window.closed_count += 1;
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP RX window CLOSED: inbound_bytes=%" PRIu64 " unacked=%" PRIu64,
DLV_ARGS(conn->inbound_delivery), conn->inbound_octets,
(conn->inbound_octets - conn->window.last_update));
}
}

//
Expand Down Expand Up @@ -1140,6 +1171,7 @@ static bool manage_flow_XSIDE_IO(tcplite_connection_t *conn)
if (!conn->outbound_body_complete) {
uint64_t body_octets = consume_message_body_XSIDE_IO(conn, conn->outbound_stream);
conn->outbound_octets += body_octets;
conn->window.pending_ack += body_octets;
if (body_octets > 0) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] %cSIDE Raw write: Consumed %"PRIu64" octets from stream (body-field)", conn->common.conn_id, conn->listener_side ? 'L' : 'C', body_octets);
}
Expand All @@ -1151,7 +1183,7 @@ static bool manage_flow_XSIDE_IO(tcplite_connection_t *conn)
if (conn->outbound_body_complete) {
uint64_t octets = consume_write_buffers_XSIDE_IO(conn, conn->outbound_stream);
conn->outbound_octets += octets;

conn->window.pending_ack += octets;
if (octets > 0) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] %cSIDE Raw write: Consumed %"PRIu64" octets from stream", conn->common.conn_id, conn->listener_side ? 'L' : 'C', octets);
}
Expand Down Expand Up @@ -1181,6 +1213,23 @@ static bool manage_flow_XSIDE_IO(tcplite_connection_t *conn)
qdr_delivery_remote_state_updated(tcplite_context->core, conn->outbound_delivery, PN_ACCEPTED, true, 0, true); // accepted, settled, ref_given
conn->outbound_delivery = 0;
conn->outbound_stream = 0;
} else {
//
// More to send. Check if enough octets have been written to open up the window
//
if (conn->window.pending_ack >= TCP_ACK_WINDOW) {
qd_delivery_state_t *dstate = qd_delivery_state();
dstate->section_number = 0;
dstate->section_offset = conn->outbound_octets;
qdr_delivery_remote_state_updated(tcplite_context->core, conn->outbound_delivery, PN_RECEIVED,
false, dstate, false); // received, !settled, !ref_given

qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG,
DLV_FMT " PN_RECEIVED sent with section_offset=%" PRIu64 " pending=%" PRIu64,
DLV_ARGS(conn->outbound_delivery), conn->outbound_octets, conn->window.pending_ack);

conn->window.pending_ack = 0;
}
}
}

Expand Down Expand Up @@ -1619,8 +1668,8 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di
} else if (dlv == conn->inbound_delivery) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " Inbound delivery update - disposition: %s", DLV_ARGS(dlv), pn_disposition_type_name(disp));
conn->inbound_disposition = disp;
need_wake = true;
if (qd_delivery_state_is_terminal(disp) && disp != PN_ACCEPTED) {
const bool final_outcome = qd_delivery_state_is_terminal(disp);
if (final_outcome && disp != PN_ACCEPTED) {
// The delivery failed - this is unrecoverable.
if (!!conn->raw_conn) {
// set the raw connection condition info so it will appear in the vanflow logs
Expand All @@ -1633,7 +1682,45 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di
pn_raw_connection_close(conn->raw_conn);
// clean stuff up when DISCONNECT event arrives
}
} else {
//
// handle flow control window updates
//
const bool window_was_full = window_full(conn);
if (disp == PN_RECEIVED) {
//
// The egress adaptor for TCP flow has sent us its count of sent bytes
//
uint64_t ignore;
qd_delivery_state_t *dstate = qdr_delivery_take_local_delivery_state(dlv, &ignore);

// Resend released will generate a PN_RECEIVED with section_offset == 0, ignore it. Ensure updates
// arrive in order, which may be possible if cut-through for disposition updates is implemented.
if (dstate && dstate->section_offset > 0
&& (int64_t)(dstate->section_offset - conn->window.last_update) > 0) {
//vflow_set_uint64(tc->vflow, VFLOW_ATTRIBUTE_OCTETS_UNACKED, tc->bytes_unacked);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG,
DLV_FMT " PN_RECEIVED inbound_bytes=%" PRIu64 ", was_unacked=%" PRIu64 ", rcv_offset=%" PRIu64 " now_unacked=%" PRIu64,
DLV_ARGS(dlv), conn->inbound_octets,
(conn->inbound_octets - conn->window.last_update),
dstate->section_offset,
(conn->inbound_octets - dstate->section_offset));

conn->window.last_update = dstate->section_offset;

qd_delivery_state_free(dstate);
}
}
conn->window.disabled = conn->window.disabled || settled || final_outcome;
if (window_was_full && !window_full(conn)) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG,
DLV_FMT " TCP RX window %s: inbound_bytes=%" PRIu64 " unacked=%" PRIu64,
DLV_ARGS(dlv),
conn->window.disabled ? "DISABLED" : "OPENED",
conn->inbound_octets, (conn->inbound_octets - conn->window.last_update));
}
}
need_wake = !window_full(conn);
}

if (need_wake) {
Expand Down
9 changes: 8 additions & 1 deletion src/adaptors/tcp_lite/tcp_lite.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,15 @@ typedef struct tcplite_connection_t {
pn_condition_t *error;
char *reply_to;
qd_handler_context_t context;
tcplite_connection_state_t state;
qdpo_transport_handle_t *observer_handle;
tcplite_connection_state_t state;
struct {
uint64_t last_update;
uint64_t pending_ack;
uint64_t closed_count;
uint64_t no_grant;
bool disabled;
} window;
bool listener_side;
bool inbound_credit;
bool inbound_first_octet;
Expand Down

0 comments on commit d59f4bb

Please sign in to comment.