Skip to content

Commit

Permalink
ISSUE-1480: add tcp window vanflow events (skupperproject#1488)
Browse files Browse the repository at this point in the history
Note this does not resolve ISSUE skupperproject#1480, it just fixes one aspect of
it.
  • Loading branch information
kgiusti authored Apr 29, 2024
1 parent bdfb163 commit c75d4d3
Showing 1 changed file with 13 additions and 4 deletions.
17 changes: 13 additions & 4 deletions src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,7 @@ static uint64_t handle_first_outbound_delivery_CSIDE(qd_tcp_connector_t *connect

conn->common.vflow = vflow_start_record(VFLOW_RECORD_FLOW, connector->common.vflow);
vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_OCTETS, 0);
vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_WINDOW_SIZE, TCP_MAX_CAPACITY_BYTES);

extract_metadata_from_stream_CSIDE(conn);

Expand Down Expand Up @@ -1246,10 +1247,12 @@ static bool manage_flow_XSIDE_IO(qd_tcp_connection_t *conn)
if (octet_count > 0) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] %cSIDE Raw read: Produced %"PRIu64" octets into stream", conn->conn_id, conn->listener_side ? 'L' : 'C', octet_count);
if (!was_blocked && window_full(conn) && !read_closed) {
uint64_t unacked = conn->inbound_octets - conn->window.last_update;
conn->window.closed_count += 1;
vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_OCTETS_UNACKED, unacked);
vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_WINDOW_CLOSURES, conn->window.closed_count);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP RX window CLOSED: inbound_bytes=%" PRIu64 " unacked=%" PRIu64,
DLV_ARGS(conn->inbound_delivery), conn->inbound_octets,
(conn->inbound_octets - conn->window.last_update));
DLV_ARGS(conn->inbound_delivery), conn->inbound_octets, unacked);
}
}

Expand Down Expand Up @@ -1499,7 +1502,10 @@ static bool manage_tls_flow_XSIDE_IO(qd_tcp_connection_t *conn)

qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] %cSIDE TLS read: Produced %"PRIu64" octets into stream", conn->conn_id, conn->listener_side ? 'L' : 'C', decrypted_octets);
if (!window_blocked && window_full(conn)) {
uint64_t unacked = conn->inbound_octets - conn->window.last_update;
conn->window.closed_count += 1;
vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_OCTETS_UNACKED, unacked);
vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_WINDOW_CLOSURES, conn->window.closed_count);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP RX window CLOSED: inbound_bytes=%" PRIu64 " unacked=%" PRIu64,
DLV_ARGS(conn->inbound_delivery), conn->inbound_octets,
(conn->inbound_octets - conn->window.last_update));
Expand Down Expand Up @@ -1966,6 +1972,7 @@ static void on_accept(qd_adaptor_listener_t *listener, pn_listener_t *pn_listene

conn->common.vflow = vflow_start_record(VFLOW_RECORD_FLOW, li->common.vflow);
vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_OCTETS, 0);
vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_WINDOW_SIZE, TCP_MAX_CAPACITY_BYTES);

conn->context.context = conn;
conn->context.handler = on_connection_event_LSIDE_IO;
Expand Down Expand Up @@ -2161,17 +2168,19 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di
qd_delivery_state_t *dstate = qdr_delivery_take_local_delivery_state(dlv, &ignore);

// Resend released will generate a PN_RECEIVED with section_offset == 0, ignore it. Ensure updates
// arrive in order, which may be possible if cut-through for disposition updates is implemented.
// arrive in order, which may not happen if cut-through for disposition updates is implemented.
if (dstate && dstate->section_offset > 0
&& (int64_t)(dstate->section_offset - conn->window.last_update) > 0) {
//vflow_set_uint64(tc->vflow, VFLOW_ATTRIBUTE_OCTETS_UNACKED, tc->bytes_unacked);

qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG,
DLV_FMT " PN_RECEIVED inbound_bytes=%" PRIu64 ", was_unacked=%" PRIu64 ", rcv_offset=%" PRIu64 " now_unacked=%" PRIu64,
DLV_ARGS(dlv), conn->inbound_octets,
(conn->inbound_octets - conn->window.last_update),
dstate->section_offset,
(conn->inbound_octets - dstate->section_offset));
conn->window.last_update = dstate->section_offset;
vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_OCTETS_UNACKED, conn->inbound_octets - dstate->section_offset);

qd_delivery_state_free(dstate);
}
}
Expand Down

0 comments on commit c75d4d3

Please sign in to comment.