Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #1700: refactor the AMQP link lifecycle #1701

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions include/qpid/dispatch/protocol_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -653,12 +653,6 @@ void qdr_terminus_set_dnp_address_iterator(qdr_terminus_t *term, qd_iterator_t *
******************************************************************************
*/

typedef enum {
QD_DETACHED, // Protocol detach
QD_CLOSED, // Protocol close
QD_LOST // Connection or session closed
} qd_detach_type_t;

/**
* qdr_link_set_context
*
Expand Down Expand Up @@ -810,15 +804,32 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target);

/**
* qdr_link_detach
* qdr_link_detach_received
*
* This function is invoked when a link detach arrives.
* This function is invoked when a link detach performative arrives from the remote peer. This may the first detach
* (peer-initiated link detach) or in response to a detach sent by the router (second detach).
*
* @param link The link pointer returned by qdr_link_first_attach or in a FIRST_ATTACH event.
* @param dt The type of detach that occurred.
* @param error The link error from the detach frame or 0 if none.
*/
void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error);
void qdr_link_detach_received(qdr_link_t *link, qdr_error_t *error);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like renaming the function qdr_link_detach() to qdr_link_detach_received(). The new function name makes it clear that a detach (first or second) has been received from the remote peer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!



/**
* qdr_link_closed
*
* This function is invoked by the adaptor when the link has fully closed. This will be the last call made by the
* adaptor for this link. This may be called as a result of a successful detach handshake or due to link loss. This will
* also be called during adaptor shutdown on any outstanding links.
*
* The core may free the qdr_link_t by this call. The adaptor MUST NOT reference the qdr_link_t on return from this
* call.
*
* @param link The link pointer returned by qdr_link_first_attach or in a FIRST_ATTACH event.
* @param forced True if the link was closed due to failure or shutdown. False if closed by clean detach handshake.
*/
void qdr_link_closed(qdr_link_t *link, bool forced);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked for the usage of this function in the tcp adaptor, say in close_connection_XSIDE_IO() -

    if (!!conn->outbound_link) {
        qdr_link_closed(conn->outbound_link, true);
    }

In the above usage, we are calling the qdr_link_closed() function to close a link. Should the function be called qdr_link_close() or qd_close_link() instead ? closed seems to mean that the link has already been closed

Copy link
Contributor Author

@kgiusti kgiusti Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

closed seems to mean that the link has already been closed

Indeed that's the idea: the adaptor has detected the link is closed and needs to notify the core that the link no longer exists (hence qdr_link_closed rather than qd_link_closed: it's a Core API call not an adaptor call).

I chose qdr_link_closed because the function performs essentially the same thing as the exising core function qdr_connection_closed except the new one deals with qdr_link_t's not qdr_connection_t's).

You'll see that both adaptors call qdr_connection_close() right before they clean up their connection state objects (qd_tcp_connection_t or qd_connection_t - depending on the adaptor). The TCP adaptor used to call qdr_link_detach() even though there's not such thing as a detach performative for TCP! For TCP we basically simulate a network connection drop since there's no AMQP close handshake going on.

See here and here

What do you think?



/**
* qdr_link_deliver
Expand Down
100 changes: 50 additions & 50 deletions src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,24 +139,6 @@ static qdr_delivery_t *qdr_node_delivery_qdr_from_pn(pn_delivery_t *dlv)
return ref ? (qdr_delivery_t*) ref->ref : 0;
}

// clean up all qdr_delivery/pn_delivery bindings for the link
//
void qd_link_abandoned_deliveries_handler(qd_router_t *router, qd_link_t *link)
{
qd_link_ref_list_t *list = qd_link_get_ref_list(link);
qd_link_ref_t *ref = DEQ_HEAD(*list);

while (ref) {
qdr_delivery_t *dlv = (qdr_delivery_t*) ref->ref;
pn_delivery_t *pdlv = qdr_delivery_get_context(dlv);
assert(pdlv && ref == (qd_link_ref_t*) pn_delivery_get_context(pdlv));

// this will remove and release the ref
qdr_node_disconnect_deliveries(router->router_core, link, dlv, pdlv);
ref = DEQ_HEAD(*list);
}
}


// read the delivery-state set by the remote endpoint
//
Expand Down Expand Up @@ -1223,10 +1205,9 @@ static int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link)
/**
* Link Detached Handler
*/
static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link, qd_detach_type_t dt)
static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link)
{
if (!link)
return 0;
assert(link);

pn_link_t *pn_link = qd_link_pn(link);
if (!pn_link)
Expand Down Expand Up @@ -1257,29 +1238,59 @@ static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link, qd_det
}
}

qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
pn_condition_t *cond = qd_link_pn(link) ? pn_link_remote_condition(qd_link_pn(link)) : 0;
// Notify the core that a detach has been received.

qdr_link_t *rlink = (qdr_link_t *) qd_link_get_context(link);
if (rlink) {
//
// If this is the second (response) detach or the link hasn't really detached but is being dropped due to parent
// connection/session loss then this is the last proton event that will be generated for this link. The qd_link
// will be freed on return from this call so remove the cross linkage between it and the qdr_link peer.

if (dt == QD_LOST || qdr_link_get_context(rlink) == 0) {
// note qdr_link context will be zeroed when the core sends the first detach, so if it is zero then this is
// the second detach!
qd_link_set_context(link, 0);
qdr_link_set_context(rlink, 0);
}

qdr_error_t *error = qdr_error_from_pn(cond);
qdr_link_detach(rlink, dt, error);
pn_condition_t *cond = pn_link_remote_condition(pn_link);
qdr_error_t *error = qdr_error_from_pn(cond);
qdr_link_detach_received(rlink, error);
} else if ((pn_link_state(pn_link) & PN_LOCAL_CLOSED) == 0) {
// Normally the core would be responsible for sending the response detach to close the link (via
// CORE_link_detach) but since there is no core link that will not happen.
pn_link_close(pn_link);
}

return 0;
}


/**
* Link closed handler
*
* This is the last callback for the given link - the link will be freed on return from this call! Forced is true if the
* link has not properly closed (detach handshake completed).
*/
static void AMQP_link_closed_handler(qd_router_t *router, qd_link_t *qd_link, bool forced)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if the AMQP_ prefix should be used only for functions that are called directly as a response to proton AMQP events. This function can be called qd_link_close() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be... but I'm not sure if other devs would be ok with that new naming concept. @ssorj @ted-ross and others - opinions?

{
assert(qd_link);

// Clean up all qdr_delivery/pn_delivery bindings for the link.

qd_link_ref_list_t *list = qd_link_get_ref_list(qd_link);
qd_link_ref_t *ref = DEQ_HEAD(*list);

while (ref) {
qdr_delivery_t *dlv = (qdr_delivery_t*) ref->ref;
pn_delivery_t *pdlv = qdr_delivery_get_context(dlv);
assert(pdlv && ref == (qd_link_ref_t*) pn_delivery_get_context(pdlv));

// This will decrement the qdr_delivery_t reference count - do not access the dlv pointer after this call!
qdr_node_disconnect_deliveries(router->router_core, qd_link, dlv, pdlv);
ref = DEQ_HEAD(*list);
}

qdr_link_t *qdr_link = (qdr_link_t *) qd_link_get_context(qd_link);
if (qdr_link) {
// Notify core that this link no longer exists
qdr_link_set_context(qdr_link, 0);
qd_link_set_context(qd_link, 0);
qdr_link_closed(qdr_link, forced);
// This will cause the core to free qdr_link at some point so:
qdr_link = 0;
}
}

static void bind_connection_context(qdr_connection_t *qdrc, void* token)
{
qd_connection_t *conn = (qd_connection_t*) token;
Expand Down Expand Up @@ -1776,8 +1787,8 @@ static const qd_node_type_t router_node = {"router", 0,
AMQP_outgoing_link_handler,
AMQP_conn_wake_handler,
AMQP_link_detach_handler,
AMQP_link_closed_handler,
AMQP_link_attach_handler,
qd_link_abandoned_deliveries_handler,
AMQP_link_flow_handler,
0, // node_created_handler
0, // node_destroyed_handler
Expand Down Expand Up @@ -1920,7 +1931,7 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error
return;

pn_link_t *pn_link = qd_link_pn(qlink);
if (!pn_link)
if (!pn_link || !!(pn_link_state(pn_link) & PN_LOCAL_CLOSED)) // already detached
return;

if (error) {
Expand All @@ -1945,17 +1956,6 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error
}
}

//
// This is the last event for this link that the core is going to send into Proton so remove the core => adaptor
// linkage. If this is the response attach then there will be no further proton link events to send to the core so
// remove the adaptor => core linkage. If this is the first (request) detach preserve the adaptor => core linkage so
// we can notify the core when the second (response) detach arrives
//
qdr_link_set_context(link, 0);
if (!first) {
qd_link_set_context(qlink, 0);
}

qd_link_close(qlink);
}

Expand Down
56 changes: 32 additions & 24 deletions src/adaptors/amqp/container.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ struct qd_link_t {
ALLOC_DEFINE_SAFE(qd_link_t);
ALLOC_DEFINE(qd_link_ref_t);

static void qd_link_free(qd_link_t *);


/** Encapsulates a proton session */
struct qd_session_t {
DEQ_LINKS(qd_session_t);
Expand Down Expand Up @@ -277,7 +280,8 @@ static void notify_closed(qd_container_t *container, qd_connection_t *conn, void


// The given connection has dropped. There will be no further link events for this connection so manually clean up all
// links
// links. Note that we do not free the pn_link_t - proton will free all links when the parent connection is freed.
//
static void close_links(qd_container_t *container, pn_connection_t *conn, bool print_log)
{
pn_link_t *pn_link = pn_link_head(conn, 0);
Expand All @@ -289,7 +293,7 @@ static void close_links(qd_container_t *container, pn_connection_t *conn, bool p
if (print_log)
qd_log(LOG_CONTAINER, QD_LOG_DEBUG, "Aborting link '%s' due to parent connection end",
pn_link_name(pn_link));
container->ntype->link_detach_handler(container->qd_router, qd_link, QD_LOST);
container->ntype->link_closed_handler(container->qd_router, qd_link, true); // true == forced
qd_link_free(qd_link);
}

Expand Down Expand Up @@ -318,6 +322,7 @@ static void cleanup_link(qd_link_t *link)
// cleanup any inbound message that has not been forwarded
qd_message_t *msg = qd_alloc_deref_safe_ptr(&link->incoming_msg);
if (msg) {
qd_nullify_safe_ptr(&link->incoming_msg);
qd_message_free(msg);
}
}
Expand All @@ -326,8 +331,7 @@ static void cleanup_link(qd_link_t *link)
static int close_handler(qd_container_t *container, pn_connection_t *conn, qd_connection_t* qd_conn)
{
//
// Close all links, passing QD_LOST as the reason. These links are not
// being properly 'detached'. They are being orphaned.
// Close all links. These links are not being properly 'detached'. They are being orphaned.
//
if (qd_conn)
qd_conn->closed = true;
Expand Down Expand Up @@ -508,9 +512,9 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
}
if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
if (pn_session_state(ssn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
// Remote has nuked our session. Check for any links that were
// left open and forcibly detach them, since no detaches will
// arrive on this session.
// Remote has closed the session. Check for any child links and forcibly close them since there will be
// no detach performatives arriving for these links. Note that we do not free the pn_link_t since proton
// will free all child pn_link_t when it frees the session.
pn_link = pn_link_head(conn, 0);
while (pn_link) {
pn_link_t *next_link = pn_link_next(pn_link, 0);
Expand All @@ -529,7 +533,7 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
}
qd_log(LOG_CONTAINER, QD_LOG_DEBUG,
"Aborting link '%s' due to parent session end", pn_link_name(pn_link));
container->ntype->link_detach_handler(container->qd_router, qd_link, QD_LOST);
container->ntype->link_closed_handler(container->qd_router, qd_link, true);
qd_link_free(qd_link);
}
}
Expand Down Expand Up @@ -590,10 +594,6 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
pn_link = pn_event_link(event);
qd_link = (qd_link_t*) pn_link_get_context(pn_link);
if (qd_link) {
qd_detach_type_t dt = pn_event_type(event) == PN_LINK_REMOTE_CLOSE ? QD_CLOSED : QD_DETACHED;
if (qd_link->pn_link == pn_link) {
pn_link_close(pn_link);
}
if (qd_link->policy_counted) {
qd_link->policy_counted = false;
if (pn_link_is_sender(pn_link)) {
Expand All @@ -609,25 +609,35 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
}
}

container->ntype->link_detach_handler(container->qd_router, qd_link, dt);
// notify arrival of inbound detach
container->ntype->link_detach_handler(container->qd_router, qd_link);

if (pn_link_state(pn_link) & PN_LOCAL_CLOSED) {
// link fully closed
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
// Link now fully detached
container->ntype->link_closed_handler(container->qd_router, qd_link, false);
qd_link_free(qd_link);
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
}
} else { // no qd_link, manually detach or free
if ((pn_link_state(pn_link) & PN_LOCAL_CLOSED) == 0) {
pn_link_close(pn_link);
} else {
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
}

} else {
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
}
}
break;

case PN_LINK_LOCAL_CLOSE:
pn_link = pn_event_link(event);
if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
qd_link_free((qd_link_t *) pn_link_get_context(pn_link));
qd_link_t *qd_link = (qd_link_t*) pn_link_get_context(pn_link);
if (qd_link) {
// Link now fully detached
container->ntype->link_closed_handler(container->qd_router, qd_link, false);
qd_link_free(qd_link);
}
add_link_to_free_list(&qd_conn->free_link_list, pn_link); // why???
}
break;

Expand Down Expand Up @@ -775,16 +785,14 @@ qd_link_t *qd_link(qd_connection_t *conn, qd_direction_t dir, const char* name,
}


void qd_link_free(qd_link_t *link)
static void qd_link_free(qd_link_t *link)
{
if (!link) return;

sys_mutex_lock(&amqp_adaptor.container->lock);
DEQ_REMOVE(amqp_adaptor.container->links, link);
sys_mutex_unlock(&amqp_adaptor.container->lock);

amqp_adaptor.container->ntype->link_abandoned_deliveries_handler(amqp_adaptor.container->qd_router, link);

cleanup_link(link);
free_qd_link_t(link);
}
Expand Down
2 changes: 0 additions & 2 deletions src/adaptors/amqp/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ qd_container_t *qd_container(qd_router_t *router, const qd_node_type_t *node_typ
void qd_container_free(qd_container_t *container);

qd_link_t *qd_link(qd_connection_t *conn, qd_direction_t dir, const char *name, qd_session_class_t);
void qd_link_free(qd_link_t *link);

/**
* List of reference in the qd_link used to track abandoned deliveries
Expand Down Expand Up @@ -98,7 +97,6 @@ pn_terminus_t *qd_link_target(qd_link_t *link);
pn_terminus_t *qd_link_remote_source(qd_link_t *link);
pn_terminus_t *qd_link_remote_target(qd_link_t *link);
void qd_link_close(qd_link_t *link);
void qd_link_free(qd_link_t *link);
void qd_link_q2_restart_receive(const qd_alloc_safe_ptr_t context);
void qd_link_q3_block(qd_link_t *link);
void qd_link_q3_unblock(qd_link_t *link);
Expand Down
15 changes: 10 additions & 5 deletions src/adaptors/amqp/node_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ typedef struct qd_router_t qd_router_t;
typedef bool (*qd_container_delivery_handler_t) (qd_router_t *, qd_link_t *link);
typedef void (*qd_container_disposition_handler_t) (qd_router_t *, qd_link_t *link, pn_delivery_t *pnd);
typedef int (*qd_container_link_handler_t) (qd_router_t *, qd_link_t *link);
typedef int (*qd_container_link_detach_handler_t) (qd_router_t *, qd_link_t *link, qd_detach_type_t dt);
typedef int (*qd_container_link_detach_handler_t) (qd_router_t *, qd_link_t *link);
typedef void (*qd_container_link_closed_handler_t) (qd_router_t *, qd_link_t *link, bool forced);
typedef void (*qd_container_node_handler_t) (qd_router_t *);
typedef int (*qd_container_conn_handler_t) (qd_router_t *, qd_connection_t *conn, void *context);
typedef void (*qd_container_link_abandoned_deliveries_handler_t) (qd_router_t *, qd_link_t *link);

/**
* A set of Node handlers for deliveries, links and container events.
Expand Down Expand Up @@ -57,15 +57,20 @@ struct qd_node_type_t {
/** Invoked when an activated connection is available for writing. */
qd_container_conn_handler_t writable_handler;

/** Invoked when a link is detached. */
/** Invoked when link detached is received. */
qd_container_link_detach_handler_t link_detach_handler;

/** The last callback issued for the given qd_link_t. The adaptor must clean up all state related to the qd_link_t
* as it will be freed on return from this call. The forced flag is set to true if the link is being forced closed
* due to the parent connection/session closing or on shutdown.
*/
qd_container_link_closed_handler_t link_closed_handler;

///@}

/** Invoked when a link we created was opened by the peer */
qd_container_link_handler_t link_attach_handler;

qd_container_link_abandoned_deliveries_handler_t link_abandoned_deliveries_handler;

/** Invoked when a link receives a flow event */
qd_container_link_handler_t link_flow_handler;

Expand Down
Loading
Loading