diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c index 0ca9c7b30..62d315667 100644 --- a/src/router_core/delivery.c +++ b/src/router_core/delivery.c @@ -90,6 +90,13 @@ bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery) return qd_message_receive_complete(delivery->msg); } +bool qdr_delivery_is_unicast_cutthrough(const qdr_delivery_t *delivery) +{ + if (!delivery) + return false; + return qd_message_is_unicast_cutthrough(delivery->msg); +} + // set the local disposition (to be send to remote endpoint) void qdr_delivery_set_disposition(qdr_delivery_t *delivery, uint64_t disposition) diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h index b423650f5..2f1c324cf 100644 --- a/src/router_core/delivery.h +++ b/src/router_core/delivery.h @@ -99,6 +99,7 @@ static inline uint32_t next_delivery_id(void) bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery); +bool qdr_delivery_is_unicast_cutthrough(const qdr_delivery_t *delivery); bool qdr_delivery_send_complete(const qdr_delivery_t *delivery); bool qdr_delivery_oversize(const qdr_delivery_t *delivery); diff --git a/src/router_node.c b/src/router_node.c index 3137669d9..57161b342 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -1166,22 +1166,31 @@ static int AMQP_link_flow_handler(void* context, qd_link_t *link) pn_delivery_t *pdlv = pn_link_current(pnlink); if (!!pdlv) { qdr_delivery_t *qdlv = qdr_node_delivery_qdr_from_pn(pdlv); - qdr_delivery_ref_t *dref = new_qdr_delivery_ref_t(); - bool used = false; - - sys_spinlock_lock(&conn->outbound_cutthrough_spinlock); - if (!qdlv->cutthrough_list_ref) { - DEQ_ITEM_INIT(dref); - dref->dlv = qdlv; - qdlv->cutthrough_list_ref = dref; - DEQ_INSERT_TAIL(conn->outbound_cutthrough_worklist, dref); - qdr_delivery_incref(qdlv, "Recover from Q3 stall"); - used = true; - } - sys_spinlock_unlock(&conn->outbound_cutthrough_spinlock); + // + //https://github.com/skupperproject/skupper-router/issues/1221 + // Add the delivery/delivery_ref to the outbound_cutthrough_worklist + // only if the delivery is a cut-through delivery. + // Pure all-AMQP deliveries/delivery_refs will never be cut-through and hence will never be placed + // on the conn->outbound_cutthrough_worklist. + // + if (qdr_delivery_is_unicast_cutthrough(qdlv)) { + qdr_delivery_ref_t *dref = new_qdr_delivery_ref_t(); + bool used = false; + + sys_spinlock_lock(&conn->outbound_cutthrough_spinlock); + if (!qdlv->cutthrough_list_ref) { + DEQ_ITEM_INIT(dref); + dref->dlv = qdlv; + qdlv->cutthrough_list_ref = dref; + DEQ_INSERT_TAIL(conn->outbound_cutthrough_worklist, dref); + qdr_delivery_incref(qdlv, "Recover from Q3 stall"); + used = true; + } + sys_spinlock_unlock(&conn->outbound_cutthrough_spinlock); - if (!used) { - free_qdr_delivery_ref_t(dref); + if (!used) { + free_qdr_delivery_ref_t(dref); + } } }