Skip to content

Commit

Permalink
Remove unused link-route code
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Nov 19, 2024
1 parent 77ecd45 commit b14c359
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 175 deletions.
16 changes: 8 additions & 8 deletions include/qpid/dispatch/protocol_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -720,14 +720,15 @@ const char *qdr_link_internal_address(const qdr_link_t *link);
bool qdr_link_is_anonymous(const qdr_link_t *link);

/**
* qdr_link_is_routed
* qdr_link_is_core_endpoint
*
* Indicate whether the link is link-routed.
* Indicate whether the link is terminated in the router core. These links are used by the core to send and receive
* messages.
*
* @param link Link object
* @return True if the link is link-routed.
* @return True if the link is terminated in the core
*/
bool qdr_link_is_routed(const qdr_link_t *link);
bool qdr_link_is_core_endpoint(const qdr_link_t *link);

/**
* qdr_link_strip_annotations_in
Expand Down Expand Up @@ -851,10 +852,9 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
bool settled, qd_bitmask_t *link_exclusion, int ingress_index,
uint64_t remote_disposition,
qd_delivery_state_t *remote_state);
qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg, bool settled,
const uint8_t *tag, int tag_length,
uint64_t remote_disposition,
qd_delivery_state_t *remote_state);
qdr_delivery_t *qdr_link_deliver_to_core(qdr_link_t *link, qd_message_t *msg, bool settled,
uint64_t remote_disposition,
qd_delivery_state_t *remote_state);

/**
* qdr_link_process_deliveries
Expand Down
32 changes: 7 additions & 25 deletions src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -770,35 +770,17 @@ static bool AMQP_rx_handler(qd_router_t *router, qd_link_t *link)
break;
}

// Handle the link-routed case
// Handle deliveries destined to a core endpoint.
//
if (qdr_link_is_routed(rlink)) {
pn_delivery_tag_t dtag = pn_delivery_tag(pnd);

if (dtag.size > QDR_DELIVERY_TAG_MAX) {
qd_log(LOG_ROUTER, QD_LOG_DEBUG, "link route delivery failure: msg tag size exceeded %zd (max=%d)",
dtag.size, QDR_DELIVERY_TAG_MAX);
qd_message_set_discard(msg, true);
pn_link_flow(pn_link, 1);
_reject_delivery(pnd, QD_AMQP_COND_INVALID_FIELD, "delivery tag length exceeded");
if (receive_complete) {
pn_delivery_settle(pnd);
qd_message_free(msg);
}
return next_delivery;
}

if (qdr_link_is_core_endpoint(rlink)) {
log_link_message(conn, pn_link, msg);
delivery = qdr_link_deliver_to_routed_link(rlink,
msg,
pn_delivery_settled(pnd),
(uint8_t*) dtag.start,
dtag.size,
pn_delivery_remote_state(pnd),
qd_delivery_read_remote_state(pnd));
delivery = qdr_link_deliver_to_core(rlink, msg,
pn_delivery_settled(pnd),
pn_delivery_remote_state(pnd),
qd_delivery_read_remote_state(pnd));
qd_link_set_incoming_msg(link, (qd_message_t*) 0); // msg no longer exclusive to qd_link
qdr_node_connect_deliveries(link, delivery, pnd);
qdr_delivery_decref(router->router_core, delivery, "release protection of return from deliver_to_routed_link");
qdr_delivery_decref(router->router_core, delivery, "release protection of return from deliver_to_core");
return next_delivery;
}

Expand Down
11 changes: 2 additions & 9 deletions src/router_core/agent_link.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,6 @@ static void qdr_agent_write_column_CT(qdr_core_t *core, qd_composed_field_t *bod
case QDR_LINK_OWNING_ADDR:
if(link->owning_addr)
qd_compose_insert_string(body, address_key(link->owning_addr));
else if (link->connected_link && link->connected_link->terminus_addr)
qd_compose_insert_string(body, link->connected_link->terminus_addr);
else if (link->terminus_addr)
qd_compose_insert_string(body, link->terminus_addr);
else
Expand All @@ -159,13 +157,8 @@ static void qdr_agent_write_column_CT(qdr_core_t *core, qd_composed_field_t *bod
qd_compose_insert_uint(body, link->capacity);
break;

case QDR_LINK_PEER:
if (link->connected_link) {
char id[100];
snprintf(id, 100, "%"PRId64, link->connected_link->identity);
qd_compose_insert_string(body, id);
} else
qd_compose_insert_null(body);
case QDR_LINK_PEER: // link-routing no longer supported
qd_compose_insert_null(body);
break;

case QDR_LINK_UNDELIVERED_COUNT:
Expand Down
73 changes: 2 additions & 71 deletions src/router_core/connections.c
Original file line number Diff line number Diff line change
Expand Up @@ -611,9 +611,9 @@ bool qdr_link_is_anonymous(const qdr_link_t *link)
}


bool qdr_link_is_routed(const qdr_link_t *link)
bool qdr_link_is_core_endpoint(const qdr_link_t *link)
{
return link->connected_link != 0 || link->core_endpoint != 0;
return link->core_endpoint != 0;
}


Expand Down Expand Up @@ -1061,23 +1061,6 @@ void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *conn, qd
}


static void qdr_link_abort_undelivered_CT(qdr_core_t *core, qdr_link_t *link)
{
assert(link->link_direction == QD_OUTGOING);

qdr_connection_t *conn = link->conn;

sys_mutex_lock(&conn->work_lock);
qdr_delivery_t *dlv = DEQ_HEAD(link->undelivered);
while (dlv) {
if (!qdr_delivery_receive_complete(dlv))
qdr_delivery_set_aborted(dlv);
dlv = DEQ_NEXT(dlv);
}
sys_mutex_unlock(&conn->work_lock);
}


static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link, const char *log_text)
{
//
Expand All @@ -1093,14 +1076,6 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
if (link->core_endpoint)
qdrc_endpoint_do_cleanup_CT(core, link->core_endpoint);

//
// If the link has a connected peer, unlink the peer
//
if (link->connected_link) {
link->connected_link->connected_link = 0;
link->connected_link = 0;
}

//
// If this link is involved in inter-router communication, remove its reference
// from the core mask-bit tables
Expand Down Expand Up @@ -2240,22 +2215,6 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac
return;
}

//
// Handle attach-routed links
//
if (link->connected_link) {
qdr_terminus_t *remote_terminus = link->link_direction == QD_OUTGOING ? target : source;
if (link->strip_prefix) {
qdr_terminus_strip_address_prefix(remote_terminus, link->strip_prefix);
}
if (link->insert_prefix) {
qdr_terminus_insert_address_prefix(remote_terminus, link->insert_prefix);
}

qdr_link_outbound_second_attach_CT(core, link->connected_link, source, target);
return;
}

if (link->link_direction == QD_INCOMING) {
//
// Handle incoming link cases
Expand Down Expand Up @@ -2362,34 +2321,6 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
}
}

//
// For routed links, propagate the detach
//
if (link->connected_link) {
//
// If the connected link is outgoing and there is a delivery on the connected link's undelivered
// list that is not receive-complete, we must flag that delivery as aborted or it will forever
// block the propagation of the detach.
//
if (link->connected_link->link_direction == QD_OUTGOING)
qdr_link_abort_undelivered_CT(core, link->connected_link);

if (dt != QD_LOST)
qdr_link_outbound_detach_CT(core, link->connected_link, error, QDR_CONDITION_NONE, dt == QD_CLOSED);
else {
qdr_link_outbound_detach_CT(core, link->connected_link, 0, QDR_CONDITION_ROUTED_LINK_LOST, !link->terminus_survives_disconnect);
qdr_error_free(error);
}

//
// If the link is completely detached, release its resources
//
if (link->detach_send_done)
qdr_link_cleanup_protected_CT(core, conn, link, "Link detached");

return;
}

//
// For auto links, switch the auto link to failed state and record the error
//
Expand Down
7 changes: 3 additions & 4 deletions src/router_core/delivery.c
Original file line number Diff line number Diff line change
Expand Up @@ -346,12 +346,11 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv) TA_NO_THREAD
}

//
// If this is an incoming link and it is not link-routed or inter-router, issue
// one replacement credit on the link. Note that credit on inter-router links is
// issued immediately even for unsettled deliveries.
// If this is an incoming link and it is not inter-router or inter-edge, issue one replacement credit on the link.
// Note that credit on inter-router links is issued immediately even for unsettled deliveries.
//
if (moved && link->link_direction == QD_INCOMING &&
link->link_type != QD_LINK_ROUTER && !link->edge && !link->connected_link)
link->link_type != QD_LINK_ROUTER && !link->edge)
qdr_link_issue_credit_CT(core, link, 1, false);

return moved;
Expand Down
14 changes: 0 additions & 14 deletions src/router_core/forwarder.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,20 +317,6 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery
out_dlv->link_work = qdr_link_work_getref(work);
sys_mutex_unlock(&out_link->conn->work_lock);

//
// We are dealing here only with link routed deliveries
// If the out_link has a connected link and if the out_link is an inter-router link, increment the global deliveries_transit
// If the out_link is a route container link, add to the global deliveries_egress
//
if (out_link->connected_link) {
if (out_link->conn->role == QDR_ROLE_INTER_ROUTER) {
core->deliveries_transit++;
}
else {
core->deliveries_egress++;
}
}

//
// Activate the outgoing connection for later processing.
//
Expand Down
3 changes: 0 additions & 3 deletions src/router_core/router_core_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ struct qdr_action_t {
qdr_delivery_t *delivery;
qd_delivery_state_t *dstate;
uint64_t disposition;
uint8_t tag[32];
int tag_length;
bool settled;
bool presettled; // true if remote settles while msg is in flight
bool more; // true if there are more frames arriving, false otherwise
Expand Down Expand Up @@ -450,7 +448,6 @@ struct qdr_link_t {
int detach_count; ///< 0, 1, or 2 depending on the state of the lifecycle
uint32_t open_moved_streams; ///< Number of still-open streaming deliveries that were moved from this link
qdr_address_t *owning_addr; ///< [ref] Address record that owns this link
qdr_link_t *connected_link; ///< [ref] If this is a link-route, reference the connected link
qdrc_endpoint_t *core_endpoint; ///< [ref] Set if this link terminates on an in-core endpoint
qdr_link_ref_t *ref[QDR_LINK_LIST_CLASSES]; ///< Pointers to containing reference objects
qdr_auto_link_t *auto_link; ///< [ref] Auto_link that owns this link
Expand Down
49 changes: 8 additions & 41 deletions src/router_core/transfer.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,11 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
}


qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg, bool settled,
const uint8_t *tag, int tag_length,
uint64_t remote_disposition,
qd_delivery_state_t* remote_state)
qdr_delivery_t *qdr_link_deliver_to_core(qdr_link_t *link, qd_message_t *msg, bool settled,
uint64_t remote_disposition,
qd_delivery_state_t* remote_state)
{
qdr_action_t *action = qdr_action(qdr_link_deliver_CT, "link_deliver");
qdr_action_t *action = qdr_action(qdr_link_deliver_CT, "link_deliver_to_core");
qdr_delivery_t *dlv = new_qdr_delivery_t();

ZERO(dlv);
Expand All @@ -126,19 +125,16 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *
dlv->conn_id = link->conn_id;
sys_mutex_init(&dlv->dispo_lock);

qd_message_disable_router_annotations(msg); // routed links do not use router annotations
qd_message_disable_router_annotations(msg); // deliveries to the core do not use router annotations

qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, DLV_FMT " Delivery created qdr_link_deliver_to_routed_link",
qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, DLV_FMT " Delivery created qdr_link_deliver_to_core",
DLV_ARGS(dlv));

qdr_delivery_incref(dlv, "qdr_link_deliver_to_routed_link - newly created delivery, add to action list");
qdr_delivery_incref(dlv, "qdr_link_deliver_to_routed_link - protect returned value");
qdr_delivery_incref(dlv, "qdr_link_deliver_to_core - newly created delivery, add to action list");
qdr_delivery_incref(dlv, "qdr_link_deliver_to_core - protect returned value");

action->args.delivery.delivery = dlv;
action->args.delivery.more = !qd_message_receive_complete(msg);
action->args.delivery.tag_length = tag_length;
assert(tag_length <= QDR_DELIVERY_TAG_MAX);
memcpy(action->args.delivery.tag, tag, tag_length);
qdr_action_enqueue(link->core, action);
return dlv;
}
Expand Down Expand Up @@ -759,35 +755,6 @@ void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
return;
}

if (link->connected_link) {
//
// If this is an attach-routed link, put the delivery directly onto the peer link
//
qdr_delivery_t *peer = qdr_forward_new_delivery_CT(core, dlv, link->connected_link, dlv->msg);

//
// Copy the delivery tag. For link-routing, the delivery tag must be preserved.
//
peer->tag_length = action->args.delivery.tag_length;
memcpy(peer->tag, action->args.delivery.tag, peer->tag_length);

qdr_forward_deliver_CT(core, link->connected_link, peer);

if (!dlv->settled) {
DEQ_INSERT_TAIL(link->unsettled, dlv);
dlv->where = QDR_DELIVERY_IN_UNSETTLED;
qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG,
DLV_FMT " Delivery transfer: qdr_link_deliver_CT: action-list -> unsettled-list", DLV_ARGS(dlv));
} else {
//
// If the delivery is settled, decrement the ref_count on the delivery.
// This count was the owned-by-action count.
//
qdr_delivery_decref_CT(core, dlv, "qdr_link_deliver_CT - removed from action");
}
return;
}

//
// NOTE: The link->undelivered list does not need to be protected by the
// connection's work lock for incoming links. This protection is only
Expand Down

0 comments on commit b14c359

Please sign in to comment.