Skip to content

Commit

Permalink
checkpoint: use state flags
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Dec 17, 2024
1 parent f2b8092 commit 3f59387
Show file tree
Hide file tree
Showing 17 changed files with 311 additions and 241 deletions.
31 changes: 21 additions & 10 deletions include/qpid/dispatch/protocol_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -653,12 +653,6 @@ void qdr_terminus_set_dnp_address_iterator(qdr_terminus_t *term, qd_iterator_t *
******************************************************************************
*/

typedef enum {
QD_DETACHED, // Protocol detach
QD_CLOSED, // Protocol close
QD_LOST // Connection or session closed
} qd_detach_type_t;

/**
* qdr_link_set_context
*
Expand Down Expand Up @@ -810,15 +804,32 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target);

/**
* qdr_link_detach
* qdr_link_detach_received
*
* This function is invoked when a link detach arrives.
* This function is invoked when a link detach performative arrives from the remote peer. This may the first detach
* (peer-initiated link detach) or in response to a detach sent by the router (second detach).
*
* @param link The link pointer returned by qdr_link_first_attach or in a FIRST_ATTACH event.
* @param dt The type of detach that occurred.
* @param error The link error from the detach frame or 0 if none.
*/
void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error);
void qdr_link_detach_received(qdr_link_t *link, qdr_error_t *error);


/**
* qdr_link_closed
*
* This function is invoked by the adaptor when the link has fully closed. This will be the last call made by the
* adaptor for this link. This may be called as a result of a successful detach handshake or due to link loss. This will
* also be called during adaptor shutdown on any outstanding links.
*
* The core may free the qdr_link_t by this call. The adaptor MUST NOT reference the qdr_link_t on return from this
* call.
*
* @param link The link pointer returned by qdr_link_first_attach or in a FIRST_ATTACH event.
* @param forced True if the link was closed due to failure or shutdown. False if closed by clean detach handshake.
*/
void qdr_link_closed(qdr_link_t *link, bool forced);


/**
* qdr_link_deliver
Expand Down
96 changes: 46 additions & 50 deletions src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,24 +139,6 @@ static qdr_delivery_t *qdr_node_delivery_qdr_from_pn(pn_delivery_t *dlv)
return ref ? (qdr_delivery_t*) ref->ref : 0;
}

// clean up all qdr_delivery/pn_delivery bindings for the link
//
void qd_link_abandoned_deliveries_handler(qd_router_t *router, qd_link_t *link)
{
qd_link_ref_list_t *list = qd_link_get_ref_list(link);
qd_link_ref_t *ref = DEQ_HEAD(*list);

while (ref) {
qdr_delivery_t *dlv = (qdr_delivery_t*) ref->ref;
pn_delivery_t *pdlv = qdr_delivery_get_context(dlv);
assert(pdlv && ref == (qd_link_ref_t*) pn_delivery_get_context(pdlv));

// this will remove and release the ref
qdr_node_disconnect_deliveries(router->router_core, link, dlv, pdlv);
ref = DEQ_HEAD(*list);
}
}


// read the delivery-state set by the remote endpoint
//
Expand Down Expand Up @@ -1223,10 +1205,9 @@ static int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link)
/**
* Link Detached Handler
*/
static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link, qd_detach_type_t dt)
static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link)
{
if (!link)
return 0;
assert(link);

pn_link_t *pn_link = qd_link_pn(link);
if (!pn_link)
Expand Down Expand Up @@ -1257,29 +1238,55 @@ static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link, qd_det
}
}

qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
pn_condition_t *cond = qd_link_pn(link) ? pn_link_remote_condition(qd_link_pn(link)) : 0;
// Notify the core that a detach has been received

qdr_link_t *rlink = (qdr_link_t *) qd_link_get_context(link);
if (rlink) {
//
// If this is the second (response) detach or the link hasn't really detached but is being dropped due to parent
// connection/session loss then this is the last proton event that will be generated for this link. The qd_link
// will be freed on return from this call so remove the cross linkage between it and the qdr_link peer.

if (dt == QD_LOST || qdr_link_get_context(rlink) == 0) {
// note qdr_link context will be zeroed when the core sends the first detach, so if it is zero then this is
// the second detach!
qd_link_set_context(link, 0);
qdr_link_set_context(rlink, 0);
}

qdr_error_t *error = qdr_error_from_pn(cond);
qdr_link_detach(rlink, dt, error);
pn_condition_t *cond = pn_link_remote_condition(pn_link);
qdr_error_t *error = qdr_error_from_pn(cond);
qdr_link_detach_received(rlink, error);
}

return 0;
}


/**
* Link closed handler
*
* This is the last callback for the given link - the link will be freed on return from this call! Forced is true if the
* link has not properly closed (detach handshake completed).
*/
static void AMQP_link_closed_handler(qd_router_t *router, qd_link_t *qd_link, bool forced)
{
assert(qd_link);

// Clean up all qdr_delivery/pn_delivery bindings for the link.

qd_link_ref_list_t *list = qd_link_get_ref_list(qd_link);
qd_link_ref_t *ref = DEQ_HEAD(*list);

while (ref) {
qdr_delivery_t *dlv = (qdr_delivery_t*) ref->ref;
pn_delivery_t *pdlv = qdr_delivery_get_context(dlv);
assert(pdlv && ref == (qd_link_ref_t*) pn_delivery_get_context(pdlv));

// This will decrement the qdr_delivery_t reference count - do not access the dlv pointer after this call!
qdr_node_disconnect_deliveries(router->router_core, qd_link, dlv, pdlv);
ref = DEQ_HEAD(*list);
}

qdr_link_t *qdr_link = (qdr_link_t *) qd_link_get_context(qd_link);
if (qdr_link) {
// Notify core that this link no longer exists
qdr_link_set_context(qdr_link, 0);
qd_link_set_context(qd_link, 0);
qdr_link_closed(qdr_link, forced);
// This will cause the core to free qdr_link at some point so:
qdr_link = 0;
}
}

static void bind_connection_context(qdr_connection_t *qdrc, void* token)
{
qd_connection_t *conn = (qd_connection_t*) token;
Expand Down Expand Up @@ -1776,8 +1783,8 @@ static const qd_node_type_t router_node = {"router", 0,
AMQP_outgoing_link_handler,
AMQP_conn_wake_handler,
AMQP_link_detach_handler,
AMQP_link_closed_handler,
AMQP_link_attach_handler,
qd_link_abandoned_deliveries_handler,
AMQP_link_flow_handler,
0, // node_created_handler
0, // node_destroyed_handler
Expand Down Expand Up @@ -1920,7 +1927,7 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error
return;

pn_link_t *pn_link = qd_link_pn(qlink);
if (!pn_link)
if (!pn_link || !!(pn_link_state(pn_link) & PN_LOCAL_CLOSED)) // already detached
return;

if (error) {
Expand All @@ -1945,17 +1952,6 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error
}
}

//
// This is the last event for this link that the core is going to send into Proton so remove the core => adaptor
// linkage. If this is the response attach then there will be no further proton link events to send to the core so
// remove the adaptor => core linkage. If this is the first (request) detach preserve the adaptor => core linkage so
// we can notify the core when the second (response) detach arrives
//
qdr_link_set_context(link, 0);
if (!first) {
qd_link_set_context(qlink, 0);
}

qd_link_close(qlink);
}

Expand Down
56 changes: 32 additions & 24 deletions src/adaptors/amqp/container.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ struct qd_link_t {
ALLOC_DEFINE_SAFE(qd_link_t);
ALLOC_DEFINE(qd_link_ref_t);

static void qd_link_free(qd_link_t *);


/** Encapsulates a proton session */
struct qd_session_t {
DEQ_LINKS(qd_session_t);
Expand Down Expand Up @@ -277,7 +280,8 @@ static void notify_closed(qd_container_t *container, qd_connection_t *conn, void


// The given connection has dropped. There will be no further link events for this connection so manually clean up all
// links
// links. Note that we do not free the pn_link_t - proton will free all links when the parent connection is freed.
//
static void close_links(qd_container_t *container, pn_connection_t *conn, bool print_log)
{
pn_link_t *pn_link = pn_link_head(conn, 0);
Expand All @@ -289,7 +293,7 @@ static void close_links(qd_container_t *container, pn_connection_t *conn, bool p
if (print_log)
qd_log(LOG_CONTAINER, QD_LOG_DEBUG, "Aborting link '%s' due to parent connection end",
pn_link_name(pn_link));
container->ntype->link_detach_handler(container->qd_router, qd_link, QD_LOST);
container->ntype->link_closed_handler(container->qd_router, qd_link, true); // true == forced
qd_link_free(qd_link);
}

Expand Down Expand Up @@ -318,6 +322,7 @@ static void cleanup_link(qd_link_t *link)
// cleanup any inbound message that has not been forwarded
qd_message_t *msg = qd_alloc_deref_safe_ptr(&link->incoming_msg);
if (msg) {
qd_nullify_safe_ptr(&link->incoming_msg);
qd_message_free(msg);
}
}
Expand All @@ -326,8 +331,7 @@ static void cleanup_link(qd_link_t *link)
static int close_handler(qd_container_t *container, pn_connection_t *conn, qd_connection_t* qd_conn)
{
//
// Close all links, passing QD_LOST as the reason. These links are not
// being properly 'detached'. They are being orphaned.
// Close all links. These links are not being properly 'detached'. They are being orphaned.
//
if (qd_conn)
qd_conn->closed = true;
Expand Down Expand Up @@ -508,9 +512,9 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
}
if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
if (pn_session_state(ssn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
// Remote has nuked our session. Check for any links that were
// left open and forcibly detach them, since no detaches will
// arrive on this session.
// Remote has closed the session. Check for any child links and forcibly close them since there will be
// no detach performatives arriving for these links. Note that we do not free the pn_link_t since proton
// will free all child pn_link_t when it frees the session.
pn_link = pn_link_head(conn, 0);
while (pn_link) {
pn_link_t *next_link = pn_link_next(pn_link, 0);
Expand All @@ -529,7 +533,7 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
}
qd_log(LOG_CONTAINER, QD_LOG_DEBUG,
"Aborting link '%s' due to parent session end", pn_link_name(pn_link));
container->ntype->link_detach_handler(container->qd_router, qd_link, QD_LOST);
container->ntype->link_closed_handler(container->qd_router, qd_link, true);
qd_link_free(qd_link);
}
}
Expand Down Expand Up @@ -590,10 +594,6 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
pn_link = pn_event_link(event);
qd_link = (qd_link_t*) pn_link_get_context(pn_link);
if (qd_link) {
qd_detach_type_t dt = pn_event_type(event) == PN_LINK_REMOTE_CLOSE ? QD_CLOSED : QD_DETACHED;
if (qd_link->pn_link == pn_link) {
pn_link_close(pn_link);
}
if (qd_link->policy_counted) {
qd_link->policy_counted = false;
if (pn_link_is_sender(pn_link)) {
Expand All @@ -609,25 +609,35 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
}
}

container->ntype->link_detach_handler(container->qd_router, qd_link, dt);
// notify arrival of inbound detach
container->ntype->link_detach_handler(container->qd_router, qd_link);

if (pn_link_state(pn_link) & PN_LOCAL_CLOSED) {
// link fully closed
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
// Link now fully detached
container->ntype->link_closed_handler(container->qd_router, qd_link, false);
qd_link_free(qd_link);
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
}
} else { // no qd_link, manually detach or free
if ((pn_link_state(pn_link) & PN_LOCAL_CLOSED) == 0) {
pn_link_close(pn_link);
} else {
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
}

} else {
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
}
}
break;

case PN_LINK_LOCAL_CLOSE:
pn_link = pn_event_link(event);
if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
qd_link_free((qd_link_t *) pn_link_get_context(pn_link));
qd_link_t *qd_link = (qd_link_t*) pn_link_get_context(pn_link);
if (qd_link) {
// Link now fully detached
container->ntype->link_closed_handler(container->qd_router, qd_link, false);
qd_link_free(qd_link);
}
add_link_to_free_list(&qd_conn->free_link_list, pn_link); // why???
}
break;

Expand Down Expand Up @@ -775,16 +785,14 @@ qd_link_t *qd_link(qd_connection_t *conn, qd_direction_t dir, const char* name,
}


void qd_link_free(qd_link_t *link)
static void qd_link_free(qd_link_t *link)
{
if (!link) return;

sys_mutex_lock(&amqp_adaptor.container->lock);
DEQ_REMOVE(amqp_adaptor.container->links, link);
sys_mutex_unlock(&amqp_adaptor.container->lock);

amqp_adaptor.container->ntype->link_abandoned_deliveries_handler(amqp_adaptor.container->qd_router, link);

cleanup_link(link);
free_qd_link_t(link);
}
Expand Down
2 changes: 0 additions & 2 deletions src/adaptors/amqp/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ qd_container_t *qd_container(qd_router_t *router, const qd_node_type_t *node_typ
void qd_container_free(qd_container_t *container);

qd_link_t *qd_link(qd_connection_t *conn, qd_direction_t dir, const char *name, qd_session_class_t);
void qd_link_free(qd_link_t *link);

/**
* List of reference in the qd_link used to track abandoned deliveries
Expand Down Expand Up @@ -98,7 +97,6 @@ pn_terminus_t *qd_link_target(qd_link_t *link);
pn_terminus_t *qd_link_remote_source(qd_link_t *link);
pn_terminus_t *qd_link_remote_target(qd_link_t *link);
void qd_link_close(qd_link_t *link);
void qd_link_free(qd_link_t *link);
void qd_link_q2_restart_receive(const qd_alloc_safe_ptr_t context);
void qd_link_q3_block(qd_link_t *link);
void qd_link_q3_unblock(qd_link_t *link);
Expand Down
15 changes: 10 additions & 5 deletions src/adaptors/amqp/node_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ typedef struct qd_router_t qd_router_t;
typedef bool (*qd_container_delivery_handler_t) (qd_router_t *, qd_link_t *link);
typedef void (*qd_container_disposition_handler_t) (qd_router_t *, qd_link_t *link, pn_delivery_t *pnd);
typedef int (*qd_container_link_handler_t) (qd_router_t *, qd_link_t *link);
typedef int (*qd_container_link_detach_handler_t) (qd_router_t *, qd_link_t *link, qd_detach_type_t dt);
typedef int (*qd_container_link_detach_handler_t) (qd_router_t *, qd_link_t *link);
typedef void (*qd_container_link_closed_handler_t) (qd_router_t *, qd_link_t *link, bool forced);
typedef void (*qd_container_node_handler_t) (qd_router_t *);
typedef int (*qd_container_conn_handler_t) (qd_router_t *, qd_connection_t *conn, void *context);
typedef void (*qd_container_link_abandoned_deliveries_handler_t) (qd_router_t *, qd_link_t *link);

/**
* A set of Node handlers for deliveries, links and container events.
Expand Down Expand Up @@ -57,15 +57,20 @@ struct qd_node_type_t {
/** Invoked when an activated connection is available for writing. */
qd_container_conn_handler_t writable_handler;

/** Invoked when a link is detached. */
/** Invoked when link detached is received. */
qd_container_link_detach_handler_t link_detach_handler;

/** The last callback issued for the given qd_link_t. The adaptor must clean up all state related to the qd_link_t
* as it will be freed on return from this call. The forced flag is set to true if the link is being forced closed
* due to the parent connection/session closing or on shutdown.
*/
qd_container_link_closed_handler_t link_closed_handler;

///@}

/** Invoked when a link we created was opened by the peer */
qd_container_link_handler_t link_attach_handler;

qd_container_link_abandoned_deliveries_handler_t link_abandoned_deliveries_handler;

/** Invoked when a link receives a flow event */
qd_container_link_handler_t link_flow_handler;

Expand Down
Loading

0 comments on commit 3f59387

Please sign in to comment.