diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h index 60ad0be03..5cc099ab8 100644 --- a/include/qpid/dispatch/protocol_adaptor.h +++ b/include/qpid/dispatch/protocol_adaptor.h @@ -720,14 +720,15 @@ const char *qdr_link_internal_address(const qdr_link_t *link); bool qdr_link_is_anonymous(const qdr_link_t *link); /** - * qdr_link_is_routed + * qdr_link_is_core_endpoint * - * Indicate whether the link is link-routed. + * Indicate whether the link is terminated in the router core. These links are used by the core to send and receive + * messages. * * @param link Link object - * @return True if the link is link-routed. + * @return True if the link is terminated in the core */ -bool qdr_link_is_routed(const qdr_link_t *link); +bool qdr_link_is_core_endpoint(const qdr_link_t *link); /** * qdr_link_strip_annotations_in @@ -851,10 +852,9 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg, bool settled, qd_bitmask_t *link_exclusion, int ingress_index, uint64_t remote_disposition, qd_delivery_state_t *remote_state); -qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg, bool settled, - const uint8_t *tag, int tag_length, - uint64_t remote_disposition, - qd_delivery_state_t *remote_state); +qdr_delivery_t *qdr_link_deliver_to_core(qdr_link_t *link, qd_message_t *msg, bool settled, + uint64_t remote_disposition, + qd_delivery_state_t *remote_state); /** * qdr_link_process_deliveries diff --git a/src/adaptors/amqp/amqp_adaptor.c b/src/adaptors/amqp/amqp_adaptor.c index 9a206f74b..25fc16a59 100644 --- a/src/adaptors/amqp/amqp_adaptor.c +++ b/src/adaptors/amqp/amqp_adaptor.c @@ -770,35 +770,17 @@ static bool AMQP_rx_handler(qd_router_t *router, qd_link_t *link) break; } - // Handle the link-routed case + // Handle deliveries destined to a core endpoint. // - if (qdr_link_is_routed(rlink)) { - pn_delivery_tag_t dtag = pn_delivery_tag(pnd); - - if (dtag.size > QDR_DELIVERY_TAG_MAX) { - qd_log(LOG_ROUTER, QD_LOG_DEBUG, "link route delivery failure: msg tag size exceeded %zd (max=%d)", - dtag.size, QDR_DELIVERY_TAG_MAX); - qd_message_set_discard(msg, true); - pn_link_flow(pn_link, 1); - _reject_delivery(pnd, QD_AMQP_COND_INVALID_FIELD, "delivery tag length exceeded"); - if (receive_complete) { - pn_delivery_settle(pnd); - qd_message_free(msg); - } - return next_delivery; - } - + if (qdr_link_is_core_endpoint(rlink)) { log_link_message(conn, pn_link, msg); - delivery = qdr_link_deliver_to_routed_link(rlink, - msg, - pn_delivery_settled(pnd), - (uint8_t*) dtag.start, - dtag.size, - pn_delivery_remote_state(pnd), - qd_delivery_read_remote_state(pnd)); + delivery = qdr_link_deliver_to_core(rlink, msg, + pn_delivery_settled(pnd), + pn_delivery_remote_state(pnd), + qd_delivery_read_remote_state(pnd)); qd_link_set_incoming_msg(link, (qd_message_t*) 0); // msg no longer exclusive to qd_link qdr_node_connect_deliveries(link, delivery, pnd); - qdr_delivery_decref(router->router_core, delivery, "release protection of return from deliver_to_routed_link"); + qdr_delivery_decref(router->router_core, delivery, "release protection of return from deliver_to_core"); return next_delivery; } diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c index e40da0e81..5aeb50619 100644 --- a/src/router_core/agent_link.c +++ b/src/router_core/agent_link.c @@ -147,8 +147,6 @@ static void qdr_agent_write_column_CT(qdr_core_t *core, qd_composed_field_t *bod case QDR_LINK_OWNING_ADDR: if(link->owning_addr) qd_compose_insert_string(body, address_key(link->owning_addr)); - else if (link->connected_link && link->connected_link->terminus_addr) - qd_compose_insert_string(body, link->connected_link->terminus_addr); else if (link->terminus_addr) qd_compose_insert_string(body, link->terminus_addr); else @@ -159,13 +157,8 @@ static void qdr_agent_write_column_CT(qdr_core_t *core, qd_composed_field_t *bod qd_compose_insert_uint(body, link->capacity); break; - case QDR_LINK_PEER: - if (link->connected_link) { - char id[100]; - snprintf(id, 100, "%"PRId64, link->connected_link->identity); - qd_compose_insert_string(body, id); - } else - qd_compose_insert_null(body); + case QDR_LINK_PEER: // link-routing no longer supported + qd_compose_insert_null(body); break; case QDR_LINK_UNDELIVERED_COUNT: diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 7795cd711..5b68acd6f 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -611,9 +611,9 @@ bool qdr_link_is_anonymous(const qdr_link_t *link) } -bool qdr_link_is_routed(const qdr_link_t *link) +bool qdr_link_is_core_endpoint(const qdr_link_t *link) { - return link->connected_link != 0 || link->core_endpoint != 0; + return link->core_endpoint != 0; } @@ -1061,23 +1061,6 @@ void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *conn, qd } -static void qdr_link_abort_undelivered_CT(qdr_core_t *core, qdr_link_t *link) -{ - assert(link->link_direction == QD_OUTGOING); - - qdr_connection_t *conn = link->conn; - - sys_mutex_lock(&conn->work_lock); - qdr_delivery_t *dlv = DEQ_HEAD(link->undelivered); - while (dlv) { - if (!qdr_delivery_receive_complete(dlv)) - qdr_delivery_set_aborted(dlv); - dlv = DEQ_NEXT(dlv); - } - sys_mutex_unlock(&conn->work_lock); -} - - static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link, const char *log_text) { // @@ -1093,14 +1076,6 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li if (link->core_endpoint) qdrc_endpoint_do_cleanup_CT(core, link->core_endpoint); - // - // If the link has a connected peer, unlink the peer - // - if (link->connected_link) { - link->connected_link->connected_link = 0; - link->connected_link = 0; - } - // // If this link is involved in inter-router communication, remove its reference // from the core mask-bit tables @@ -2240,22 +2215,6 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac return; } - // - // Handle attach-routed links - // - if (link->connected_link) { - qdr_terminus_t *remote_terminus = link->link_direction == QD_OUTGOING ? target : source; - if (link->strip_prefix) { - qdr_terminus_strip_address_prefix(remote_terminus, link->strip_prefix); - } - if (link->insert_prefix) { - qdr_terminus_insert_address_prefix(remote_terminus, link->insert_prefix); - } - - qdr_link_outbound_second_attach_CT(core, link->connected_link, source, target); - return; - } - if (link->link_direction == QD_INCOMING) { // // Handle incoming link cases @@ -2362,34 +2321,6 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b } } - // - // For routed links, propagate the detach - // - if (link->connected_link) { - // - // If the connected link is outgoing and there is a delivery on the connected link's undelivered - // list that is not receive-complete, we must flag that delivery as aborted or it will forever - // block the propagation of the detach. - // - if (link->connected_link->link_direction == QD_OUTGOING) - qdr_link_abort_undelivered_CT(core, link->connected_link); - - if (dt != QD_LOST) - qdr_link_outbound_detach_CT(core, link->connected_link, error, QDR_CONDITION_NONE, dt == QD_CLOSED); - else { - qdr_link_outbound_detach_CT(core, link->connected_link, 0, QDR_CONDITION_ROUTED_LINK_LOST, !link->terminus_survives_disconnect); - qdr_error_free(error); - } - - // - // If the link is completely detached, release its resources - // - if (link->detach_send_done) - qdr_link_cleanup_protected_CT(core, conn, link, "Link detached"); - - return; - } - // // For auto links, switch the auto link to failed state and record the error // diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c index 9eef5f061..3712f86e6 100644 --- a/src/router_core/delivery.c +++ b/src/router_core/delivery.c @@ -346,12 +346,11 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv) TA_NO_THREAD } // - // If this is an incoming link and it is not link-routed or inter-router, issue - // one replacement credit on the link. Note that credit on inter-router links is - // issued immediately even for unsettled deliveries. + // If this is an incoming link and it is not inter-router or inter-edge, issue one replacement credit on the link. + // Note that credit on inter-router links is issued immediately even for unsettled deliveries. // if (moved && link->link_direction == QD_INCOMING && - link->link_type != QD_LINK_ROUTER && !link->edge && !link->connected_link) + link->link_type != QD_LINK_ROUTER && !link->edge) qdr_link_issue_credit_CT(core, link, 1, false); return moved; diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 3df8c0fbc..1637a423c 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -317,20 +317,6 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery out_dlv->link_work = qdr_link_work_getref(work); sys_mutex_unlock(&out_link->conn->work_lock); - // - // We are dealing here only with link routed deliveries - // If the out_link has a connected link and if the out_link is an inter-router link, increment the global deliveries_transit - // If the out_link is a route container link, add to the global deliveries_egress - // - if (out_link->connected_link) { - if (out_link->conn->role == QDR_ROLE_INTER_ROUTER) { - core->deliveries_transit++; - } - else { - core->deliveries_egress++; - } - } - // // Activate the outgoing connection for later processing. // diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index b6e7b6c73..b95c9f5e3 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -140,8 +140,6 @@ struct qdr_action_t { qdr_delivery_t *delivery; qd_delivery_state_t *dstate; uint64_t disposition; - uint8_t tag[32]; - int tag_length; bool settled; bool presettled; // true if remote settles while msg is in flight bool more; // true if there are more frames arriving, false otherwise @@ -450,7 +448,6 @@ struct qdr_link_t { int detach_count; ///< 0, 1, or 2 depending on the state of the lifecycle uint32_t open_moved_streams; ///< Number of still-open streaming deliveries that were moved from this link qdr_address_t *owning_addr; ///< [ref] Address record that owns this link - qdr_link_t *connected_link; ///< [ref] If this is a link-route, reference the connected link qdrc_endpoint_t *core_endpoint; ///< [ref] Set if this link terminates on an in-core endpoint qdr_link_ref_t *ref[QDR_LINK_LIST_CLASSES]; ///< Pointers to containing reference objects qdr_auto_link_t *auto_link; ///< [ref] Auto_link that owns this link diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 9b9301f82..b44798686 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -106,12 +106,11 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg, } -qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg, bool settled, - const uint8_t *tag, int tag_length, - uint64_t remote_disposition, - qd_delivery_state_t* remote_state) +qdr_delivery_t *qdr_link_deliver_to_core(qdr_link_t *link, qd_message_t *msg, bool settled, + uint64_t remote_disposition, + qd_delivery_state_t* remote_state) { - qdr_action_t *action = qdr_action(qdr_link_deliver_CT, "link_deliver"); + qdr_action_t *action = qdr_action(qdr_link_deliver_CT, "link_deliver_to_core"); qdr_delivery_t *dlv = new_qdr_delivery_t(); ZERO(dlv); @@ -126,19 +125,16 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t * dlv->conn_id = link->conn_id; sys_mutex_init(&dlv->dispo_lock); - qd_message_disable_router_annotations(msg); // routed links do not use router annotations + qd_message_disable_router_annotations(msg); // deliveries to the core do not use router annotations - qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, DLV_FMT " Delivery created qdr_link_deliver_to_routed_link", + qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, DLV_FMT " Delivery created qdr_link_deliver_to_core", DLV_ARGS(dlv)); - qdr_delivery_incref(dlv, "qdr_link_deliver_to_routed_link - newly created delivery, add to action list"); - qdr_delivery_incref(dlv, "qdr_link_deliver_to_routed_link - protect returned value"); + qdr_delivery_incref(dlv, "qdr_link_deliver_to_core - newly created delivery, add to action list"); + qdr_delivery_incref(dlv, "qdr_link_deliver_to_core - protect returned value"); action->args.delivery.delivery = dlv; action->args.delivery.more = !qd_message_receive_complete(msg); - action->args.delivery.tag_length = tag_length; - assert(tag_length <= QDR_DELIVERY_TAG_MAX); - memcpy(action->args.delivery.tag, tag, tag_length); qdr_action_enqueue(link->core, action); return dlv; } @@ -759,35 +755,6 @@ void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard) return; } - if (link->connected_link) { - // - // If this is an attach-routed link, put the delivery directly onto the peer link - // - qdr_delivery_t *peer = qdr_forward_new_delivery_CT(core, dlv, link->connected_link, dlv->msg); - - // - // Copy the delivery tag. For link-routing, the delivery tag must be preserved. - // - peer->tag_length = action->args.delivery.tag_length; - memcpy(peer->tag, action->args.delivery.tag, peer->tag_length); - - qdr_forward_deliver_CT(core, link->connected_link, peer); - - if (!dlv->settled) { - DEQ_INSERT_TAIL(link->unsettled, dlv); - dlv->where = QDR_DELIVERY_IN_UNSETTLED; - qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, - DLV_FMT " Delivery transfer: qdr_link_deliver_CT: action-list -> unsettled-list", DLV_ARGS(dlv)); - } else { - // - // If the delivery is settled, decrement the ref_count on the delivery. - // This count was the owned-by-action count. - // - qdr_delivery_decref_CT(core, dlv, "qdr_link_deliver_CT - removed from action"); - } - return; - } - // // NOTE: The link->undelivered list does not need to be protected by the // connection's work lock for incoming links. This protection is only