Skip to content

Commit

Permalink
Fixes skupperproject#1625: schedule output on core links after flow u…
Browse files Browse the repository at this point in the history
…pdate

Ensure core links are included in link credit processing. Remove dead
code and extra attempt to queue link to the work list.
  • Loading branch information
kgiusti committed Oct 11, 2024
1 parent c8573a1 commit f7f332b
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ static void qdrc_send_message(qdr_core_t *core, qdr_address_t *addr, qdrc_endpoi
qd_message_t *msg = qdcm_edge_create_address_dlv(core, addr, insert_addr);
qdr_delivery_t *dlv = qdrc_endpoint_delivery_CT(core, endpoint, msg);

qdrc_endpoint_send_CT(core, endpoint, dlv, true);
qdrc_endpoint_send_CT(core, endpoint, dlv, false);
}


Expand Down
4 changes: 2 additions & 2 deletions src/router_core/modules/mesh_discovery/mesh_discovery_edge.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ static void send_mesh_id_to_interior(void)
qd_compose_free(content);

qdr_delivery_t *delivery = qdrc_endpoint_delivery_CT(state.core, state.interior_sender, msg);
qdrc_endpoint_send_CT(state.core, state.interior_sender, delivery, true);
qdrc_endpoint_send_CT(state.core, state.interior_sender, delivery, false);

state.interior_sender_credit--;
state.interior_needs_update = false;
Expand Down Expand Up @@ -127,7 +127,7 @@ static void send_bid(mesh_peer_t *peer)
qd_compose_free(content);

qdr_delivery_t *delivery = qdrc_endpoint_delivery_CT(state.core, peer->sender, msg);
qdrc_endpoint_send_CT(state.core, peer->sender, delivery, true);
qdrc_endpoint_send_CT(state.core, peer->sender, delivery, false);

peer->send_credit--;
peer->needs_update = false;
Expand Down
61 changes: 17 additions & 44 deletions src/router_core/transfer.c
Original file line number Diff line number Diff line change
Expand Up @@ -432,53 +432,26 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar

link->drain_mode = drain;

//
// If the link was stalled due to internal backpressure from the transport, put it
// on the links-with-work list and activate the connection to resume sending.
//
if (link->stalled_outbound) {
link->stalled_outbound = false;

sys_mutex_lock(&link->conn->work_lock);

if (DEQ_SIZE(link->undelivered) > 0) {
qdr_add_link_ref(&link->conn->links_with_work[link->priority], link, QDR_LINK_LIST_CLASS_WORK);
activate = true;
}

sys_mutex_unlock(&link->conn->work_lock);
}

if (link->core_endpoint) {
qdrc_endpoint_do_flow_CT(core, link->core_endpoint, credit, drain);
} else if (link->connected_link) {
}

if (link->attach_count == 1)
//
// If this is an attach-routed link, propagate the flow data downrange.
// Note that the credit value is incremental.
// The link is half-open. Store the pending credit to be dealt with once the link is
// progressed to the next step.
//
qdr_link_t *clink = link->connected_link;

if (clink->link_direction == QD_INCOMING)
qdr_link_issue_credit_CT(core, link->connected_link, credit, drain);
else {
work = qdr_link_work(QDR_LINK_WORK_FLOW);
work->value = credit;
if (drain)
work->drain_action = QDR_LINK_WORK_DRAIN_ACTION_DRAINED;
qdr_link_enqueue_work_CT(core, clink, work);
}
} else {
if (link->attach_count == 1)
//
// The link is half-open. Store the pending credit to be dealt with once the link is
// progressed to the next step.
//
link->credit_stored += credit;
link->credit_stored += credit;

if (link->link_direction == QD_OUTGOING) {
//
// Handle the replenishing of credit outbound
// Schedule the link for output if
// - credit has been granted or
// - drain mode entered or
// - the link was previously blocked waiting on credit or session capacity
//
if (link->link_direction == QD_OUTGOING && (credit > 0 || drain_was_set)) {
if (credit > 0 || drain_was_set || link->stalled_outbound) {
link->stalled_outbound = false;
if (drain_was_set) {
work = qdr_link_work(QDR_LINK_WORK_FLOW);
work->drain_action = QDR_LINK_WORK_DRAIN_ACTION_DRAINED;
Expand All @@ -492,10 +465,10 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
activate = true;
}
sys_mutex_unlock(&link->conn->work_lock);
} else if (link->link_direction == QD_INCOMING) {
if (drain) {
link->credit_pending = link->capacity;
}
}
} else { // incoming link
if (drain) {
link->credit_pending = link->capacity;
}
}

Expand Down

0 comments on commit f7f332b

Please sign in to comment.