Skip to content

Commit

Permalink
Phase2
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Oct 11, 2024
1 parent d862de1 commit bda043b
Showing 1 changed file with 22 additions and 33 deletions.
55 changes: 22 additions & 33 deletions src/router_core/transfer.c
Original file line number Diff line number Diff line change
Expand Up @@ -432,23 +432,6 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar

link->drain_mode = drain;

//
// If the link was stalled due to internal backpressure from the transport, put it
// on the links-with-work list and activate the connection to resume sending.
//
if (link->stalled_outbound) {
link->stalled_outbound = false;

sys_mutex_lock(&link->conn->work_lock);

if (DEQ_SIZE(link->undelivered) > 0) {
qdr_add_link_ref(&link->conn->links_with_work[link->priority], link, QDR_LINK_LIST_CLASS_WORK);
activate = true;
}

sys_mutex_unlock(&link->conn->work_lock);
}

if (link->core_endpoint) {
qdrc_endpoint_do_flow_CT(core, link->core_endpoint, credit, drain);
}
Expand All @@ -460,24 +443,30 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
//
link->credit_stored += credit;

//
// Handle the replenishing of credit outbound
//
if (link->link_direction == QD_OUTGOING && (credit > 0 || drain_was_set)) {
if (drain_was_set) {
work = qdr_link_work(QDR_LINK_WORK_FLOW);
work->drain_action = QDR_LINK_WORK_DRAIN_ACTION_DRAINED;
}
if (link->link_direction == QD_OUTGOING) {
//
// Schedule the link for output if
// - credit has been granted or
// - drain mode entered or
// - the link was previously blocked waiting on credit or session capacity
//
if (credit > 0 || drain_was_set || link->stalled_outbound) {
link->stalled_outbound = false;
if (drain_was_set) {
work = qdr_link_work(QDR_LINK_WORK_FLOW);
work->drain_action = QDR_LINK_WORK_DRAIN_ACTION_DRAINED;
}

sys_mutex_lock(&link->conn->work_lock);
if (work)
DEQ_INSERT_TAIL(link->work_list, work);
if (DEQ_SIZE(link->undelivered) > 0 || drain_was_set) {
qdr_add_link_ref(&link->conn->links_with_work[link->priority], link, QDR_LINK_LIST_CLASS_WORK);
activate = true;
sys_mutex_lock(&link->conn->work_lock);
if (work)
DEQ_INSERT_TAIL(link->work_list, work);
if (DEQ_SIZE(link->undelivered) > 0 || drain_was_set) {
qdr_add_link_ref(&link->conn->links_with_work[link->priority], link, QDR_LINK_LIST_CLASS_WORK);
activate = true;
}
sys_mutex_unlock(&link->conn->work_lock);
}
sys_mutex_unlock(&link->conn->work_lock);
} else if (link->link_direction == QD_INCOMING) {
} else { // incoming link
if (drain) {
link->credit_pending = link->capacity;
}
Expand Down

0 comments on commit bda043b

Please sign in to comment.