Skip to content

Commit

Permalink
Fixes skupperproject#1221: Check if the message is cut-through before…
Browse files Browse the repository at this point in the history
… adding it to the conn->outbound_cutthrough_worklist (skupperproject#1224)
  • Loading branch information
ganeshmurthy authored Sep 20, 2023
1 parent eb6e1fa commit 92c5ebd
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 15 deletions.
7 changes: 7 additions & 0 deletions src/router_core/delivery.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery)
return qd_message_receive_complete(delivery->msg);
}

bool qdr_delivery_is_unicast_cutthrough(const qdr_delivery_t *delivery)
{
if (!delivery)
return false;
return qd_message_is_unicast_cutthrough(delivery->msg);
}


// set the local disposition (to be send to remote endpoint)
void qdr_delivery_set_disposition(qdr_delivery_t *delivery, uint64_t disposition)
Expand Down
1 change: 1 addition & 0 deletions src/router_core/delivery.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ static inline uint32_t next_delivery_id(void)


bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery);
bool qdr_delivery_is_unicast_cutthrough(const qdr_delivery_t *delivery);
bool qdr_delivery_send_complete(const qdr_delivery_t *delivery);
bool qdr_delivery_oversize(const qdr_delivery_t *delivery);

Expand Down
39 changes: 24 additions & 15 deletions src/router_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -1166,22 +1166,31 @@ static int AMQP_link_flow_handler(void* context, qd_link_t *link)
pn_delivery_t *pdlv = pn_link_current(pnlink);
if (!!pdlv) {
qdr_delivery_t *qdlv = qdr_node_delivery_qdr_from_pn(pdlv);
qdr_delivery_ref_t *dref = new_qdr_delivery_ref_t();
bool used = false;

sys_spinlock_lock(&conn->outbound_cutthrough_spinlock);
if (!qdlv->cutthrough_list_ref) {
DEQ_ITEM_INIT(dref);
dref->dlv = qdlv;
qdlv->cutthrough_list_ref = dref;
DEQ_INSERT_TAIL(conn->outbound_cutthrough_worklist, dref);
qdr_delivery_incref(qdlv, "Recover from Q3 stall");
used = true;
}
sys_spinlock_unlock(&conn->outbound_cutthrough_spinlock);
//
//https://github.com/skupperproject/skupper-router/issues/1221
// Add the delivery/delivery_ref to the outbound_cutthrough_worklist
// only if the delivery is a cut-through delivery.
// Pure all-AMQP deliveries/delivery_refs will never be cut-through and hence will never be placed
// on the conn->outbound_cutthrough_worklist.
//
if (qdr_delivery_is_unicast_cutthrough(qdlv)) {
qdr_delivery_ref_t *dref = new_qdr_delivery_ref_t();
bool used = false;

sys_spinlock_lock(&conn->outbound_cutthrough_spinlock);
if (!qdlv->cutthrough_list_ref) {
DEQ_ITEM_INIT(dref);
dref->dlv = qdlv;
qdlv->cutthrough_list_ref = dref;
DEQ_INSERT_TAIL(conn->outbound_cutthrough_worklist, dref);
qdr_delivery_incref(qdlv, "Recover from Q3 stall");
used = true;
}
sys_spinlock_unlock(&conn->outbound_cutthrough_spinlock);

if (!used) {
free_qdr_delivery_ref_t(dref);
if (!used) {
free_qdr_delivery_ref_t(dref);
}
}
}

Expand Down

0 comments on commit 92c5ebd

Please sign in to comment.