From f7f332b36b9ccc750ceee5b4ae6112b48555a32f Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Fri, 11 Oct 2024 10:15:40 -0400 Subject: [PATCH] Fixes #1625: schedule output on core links after flow update Ensure core links are included in link credit processing. Remove dead code and extra attempt to queue link to the work list. --- .../edge_addr_tracking/edge_addr_tracking.c | 2 +- .../mesh_discovery/mesh_discovery_edge.c | 4 +- src/router_core/transfer.c | 61 ++++++------------- 3 files changed, 20 insertions(+), 47 deletions(-) diff --git a/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c b/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c index 6b76ab4eb..775ca369d 100644 --- a/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c +++ b/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c @@ -227,7 +227,7 @@ static void qdrc_send_message(qdr_core_t *core, qdr_address_t *addr, qdrc_endpoi qd_message_t *msg = qdcm_edge_create_address_dlv(core, addr, insert_addr); qdr_delivery_t *dlv = qdrc_endpoint_delivery_CT(core, endpoint, msg); - qdrc_endpoint_send_CT(core, endpoint, dlv, true); + qdrc_endpoint_send_CT(core, endpoint, dlv, false); } diff --git a/src/router_core/modules/mesh_discovery/mesh_discovery_edge.c b/src/router_core/modules/mesh_discovery/mesh_discovery_edge.c index 2432d61cd..6e1967171 100644 --- a/src/router_core/modules/mesh_discovery/mesh_discovery_edge.c +++ b/src/router_core/modules/mesh_discovery/mesh_discovery_edge.c @@ -95,7 +95,7 @@ static void send_mesh_id_to_interior(void) qd_compose_free(content); qdr_delivery_t *delivery = qdrc_endpoint_delivery_CT(state.core, state.interior_sender, msg); - qdrc_endpoint_send_CT(state.core, state.interior_sender, delivery, true); + qdrc_endpoint_send_CT(state.core, state.interior_sender, delivery, false); state.interior_sender_credit--; state.interior_needs_update = false; @@ -127,7 +127,7 @@ static void send_bid(mesh_peer_t *peer) qd_compose_free(content); qdr_delivery_t *delivery = qdrc_endpoint_delivery_CT(state.core, peer->sender, msg); - qdrc_endpoint_send_CT(state.core, peer->sender, delivery, true); + qdrc_endpoint_send_CT(state.core, peer->sender, delivery, false); peer->send_credit--; peer->needs_update = false; diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 44bd55553..9b9301f82 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -432,53 +432,26 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar link->drain_mode = drain; - // - // If the link was stalled due to internal backpressure from the transport, put it - // on the links-with-work list and activate the connection to resume sending. - // - if (link->stalled_outbound) { - link->stalled_outbound = false; - - sys_mutex_lock(&link->conn->work_lock); - - if (DEQ_SIZE(link->undelivered) > 0) { - qdr_add_link_ref(&link->conn->links_with_work[link->priority], link, QDR_LINK_LIST_CLASS_WORK); - activate = true; - } - - sys_mutex_unlock(&link->conn->work_lock); - } - if (link->core_endpoint) { qdrc_endpoint_do_flow_CT(core, link->core_endpoint, credit, drain); - } else if (link->connected_link) { + } + + if (link->attach_count == 1) // - // If this is an attach-routed link, propagate the flow data downrange. - // Note that the credit value is incremental. + // The link is half-open. Store the pending credit to be dealt with once the link is + // progressed to the next step. // - qdr_link_t *clink = link->connected_link; - - if (clink->link_direction == QD_INCOMING) - qdr_link_issue_credit_CT(core, link->connected_link, credit, drain); - else { - work = qdr_link_work(QDR_LINK_WORK_FLOW); - work->value = credit; - if (drain) - work->drain_action = QDR_LINK_WORK_DRAIN_ACTION_DRAINED; - qdr_link_enqueue_work_CT(core, clink, work); - } - } else { - if (link->attach_count == 1) - // - // The link is half-open. Store the pending credit to be dealt with once the link is - // progressed to the next step. - // - link->credit_stored += credit; + link->credit_stored += credit; + if (link->link_direction == QD_OUTGOING) { // - // Handle the replenishing of credit outbound + // Schedule the link for output if + // - credit has been granted or + // - drain mode entered or + // - the link was previously blocked waiting on credit or session capacity // - if (link->link_direction == QD_OUTGOING && (credit > 0 || drain_was_set)) { + if (credit > 0 || drain_was_set || link->stalled_outbound) { + link->stalled_outbound = false; if (drain_was_set) { work = qdr_link_work(QDR_LINK_WORK_FLOW); work->drain_action = QDR_LINK_WORK_DRAIN_ACTION_DRAINED; @@ -492,10 +465,10 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar activate = true; } sys_mutex_unlock(&link->conn->work_lock); - } else if (link->link_direction == QD_INCOMING) { - if (drain) { - link->credit_pending = link->capacity; - } + } + } else { // incoming link + if (drain) { + link->credit_pending = link->capacity; } }