diff --git a/src/adaptors/amqp/amqp_adaptor.c b/src/adaptors/amqp/amqp_adaptor.c index 1b9f90bbe..48e38c11c 100644 --- a/src/adaptors/amqp/amqp_adaptor.c +++ b/src/adaptors/amqp/amqp_adaptor.c @@ -1020,7 +1020,7 @@ static void deferred_AMQP_rx_handler(void *context, bool discard) if (!discard) { qd_link_t *qdl = safe_deref_qd_link_t(*safe_qdl); - if (!!qdl) { + if (!!qdl && !!qd_link_pn(qdl)) { assert(qd_link_direction(qdl) == QD_INCOMING); while (true) { if (!AMQP_rx_handler(amqp_adaptor.router, qdl)) @@ -1169,70 +1169,66 @@ static int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link) } // check if Q3 can be unblocked - pn_session_t *pn_ssn = pn_link_session(pnlink); - if (pn_ssn) { - qd_session_t *qd_ssn = qd_session_from_pn(pn_ssn); - if (qd_ssn && qd_session_is_q3_blocked(qd_ssn)) { - // Q3 blocked - have we drained enough outgoing bytes? - const size_t q3_lower = QD_BUFFER_SIZE * QD_QLIMIT_Q3_LOWER; - if (pn_session_outgoing_bytes(pn_ssn) < q3_lower) { - // yes. We must now unblock all links that have been blocked by Q3 - - qd_link_list_t *blinks = qd_session_q3_blocked_links(qd_ssn); - qd_link_t *blink = DEQ_HEAD(*blinks); - qd_connection_t *conn = qd_link_connection(blink); - - while (blink) { - qd_link_q3_unblock(blink); // removes from blinks list! - pnlink = qd_link_pn(blink); - if (blink != link) { // already flowed this link - rlink = (qdr_link_t *) qd_link_get_context(blink); - if (rlink) { - // signalling flow to the core causes the link to be re-activated - qdr_link_flow(router->router_core, rlink, pn_link_remote_credit(pnlink), pn_link_get_drain(pnlink)); - } + qd_session_t *qd_ssn = qd_link_get_session(link); + if (qd_session_is_q3_blocked(qd_ssn)) { + // Q3 blocked - have we drained enough outgoing bytes? + if (qd_session_get_outgoing_capacity(qd_ssn) >= qd_session_get_outgoing_threshold(qd_ssn)) { + // yes. We must now unblock all links that have been blocked by Q3 + + qd_link_list_t *blinks = qd_session_q3_blocked_links(qd_ssn); + qd_link_t *blink = DEQ_HEAD(*blinks); + qd_connection_t *conn = qd_link_connection(blink); + + while (blink) { + qd_link_q3_unblock(blink); // removes from blinks list! + pnlink = qd_link_pn(blink); + if (blink != link) { // already flowed this link + rlink = (qdr_link_t *) qd_link_get_context(blink); + if (rlink) { + // signalling flow to the core causes the link to be re-activated + qdr_link_flow(router->router_core, rlink, pn_link_remote_credit(pnlink), pn_link_get_drain(pnlink)); } + } - pn_delivery_t *pdlv = pn_link_current(pnlink); - if (!!pdlv) { - qdr_delivery_t *qdlv = qdr_node_delivery_qdr_from_pn(pdlv); - // - //https://github.com/skupperproject/skupper-router/issues/1221 - // Add the delivery/delivery_ref to the outbound_cutthrough_worklist - // only if the delivery is a cut-through delivery. - // Pure all-AMQP deliveries/delivery_refs will never be cut-through and hence will never be placed - // on the conn->outbound_cutthrough_worklist. - // - if (qdr_delivery_is_unicast_cutthrough(qdlv)) { - qdr_delivery_ref_t *dref = new_qdr_delivery_ref_t(); - bool used = false; - - sys_spinlock_lock(&conn->outbound_cutthrough_spinlock); - if (!qdlv->cutthrough_list_ref) { - DEQ_ITEM_INIT(dref); - dref->dlv = qdlv; - qdlv->cutthrough_list_ref = dref; - DEQ_INSERT_TAIL(conn->outbound_cutthrough_worklist, dref); - qdr_delivery_incref(qdlv, "Recover from Q3 stall"); - used = true; - } - sys_spinlock_unlock(&conn->outbound_cutthrough_spinlock); + pn_delivery_t *pdlv = pn_link_current(pnlink); + if (!!pdlv) { + qdr_delivery_t *qdlv = qdr_node_delivery_qdr_from_pn(pdlv); + // + //https://github.com/skupperproject/skupper-router/issues/1221 + // Add the delivery/delivery_ref to the outbound_cutthrough_worklist + // only if the delivery is a cut-through delivery. + // Pure all-AMQP deliveries/delivery_refs will never be cut-through and hence will never be placed + // on the conn->outbound_cutthrough_worklist. + // + if (qdr_delivery_is_unicast_cutthrough(qdlv)) { + qdr_delivery_ref_t *dref = new_qdr_delivery_ref_t(); + bool used = false; + + sys_spinlock_lock(&conn->outbound_cutthrough_spinlock); + if (!qdlv->cutthrough_list_ref) { + DEQ_ITEM_INIT(dref); + dref->dlv = qdlv; + qdlv->cutthrough_list_ref = dref; + DEQ_INSERT_TAIL(conn->outbound_cutthrough_worklist, dref); + qdr_delivery_incref(qdlv, "Recover from Q3 stall"); + used = true; + } + sys_spinlock_unlock(&conn->outbound_cutthrough_spinlock); - if (!used) { - free_qdr_delivery_ref_t(dref); - } + if (!used) { + free_qdr_delivery_ref_t(dref); } } - - blink = DEQ_HEAD(*blinks); } - // - // Wake the connection for outgoing cut-through - // - SET_ATOMIC_FLAG(&conn->wake_cutthrough_outbound); - AMQP_conn_wake_handler(router, conn, 0); + blink = DEQ_HEAD(*blinks); } + + // + // Wake the connection for outgoing cut-through + // + SET_ATOMIC_FLAG(&conn->wake_cutthrough_outbound); + AMQP_conn_wake_handler(router, conn, 0); } } return 0; @@ -2386,7 +2382,6 @@ static void qd_amqp_adaptor_final(void *adaptor_context) pn_transport_t *tport = pn_connection_transport(ctx->pn_conn); if (tport) pn_transport_set_context(tport, 0); /* for transport_tracer */ - qd_session_cleanup(ctx); pn_connection_set_context(ctx->pn_conn, 0); } qd_connection_invoke_deferred_calls(ctx, true); // Discard any pending deferred calls @@ -2404,6 +2399,12 @@ static void qd_amqp_adaptor_final(void *adaptor_context) qd_listener_remove_connection(ctx->listener, ctx); ctx->listener = 0; } + + // free up any still active sessions + for (int i = 0; i < QD_SSN_CLASS_COUNT; ++i) + qd_session_decref(ctx->qd_sessions[i]); + qd_connection_release_sessions(ctx); + qd_tls_session_free(ctx->ssl); sys_atomic_destroy(&ctx->wake_core); sys_atomic_destroy(&ctx->wake_cutthrough_inbound); diff --git a/src/adaptors/amqp/container.c b/src/adaptors/amqp/container.c index 4637229a0..76eee5ccf 100644 --- a/src/adaptors/amqp/container.c +++ b/src/adaptors/amqp/container.c @@ -47,8 +47,8 @@ /** Encapsulates a proton link for sending and receiving messages */ struct qd_link_t { DEQ_LINKS(qd_link_t); - pn_session_t *pn_sess; pn_link_t *pn_link; + qd_session_t *qd_session; qd_direction_t direction; void *context; qd_alloc_safe_ptr_t incoming_msg; // DISPATCH-1690: for cleanup @@ -66,34 +66,27 @@ ALLOC_DEFINE(qd_link_ref_t); /** Encapsulates a proton session */ struct qd_session_t { DEQ_LINKS(qd_session_t); - pn_session_t *pn_sess; + sys_atomic_t ref_count; + pn_session_t *pn_session; qd_link_list_t q3_blocked_links; ///< Q3 blocked if !empty + + // For outgoing session flow control. Never buffer more than out_window_limit bytes of data on the session before + // returning control to the proactor. This prevents memory bloat and allows proactor to send buffered data in a + // timely manner. The low watermark is used to unblock the session - do not resume writing to the session until the + // amount of available capacity has grown to at least the low watermark. + + size_t out_window_limit; + size_t out_window_low_watermark; }; -// Bug workaround to free links/session when we hope they are no longer used! +// Bug workaround to free Proton links when we hope they are no longer used! // Fingers crossed! :| // -struct qd_pn_free_link_session_t { - DEQ_LINKS(qd_pn_free_link_session_t); - pn_session_t *pn_session; +struct qd_pn_free_link_t { + DEQ_LINKS(qd_pn_free_link_t); pn_link_t *pn_link; }; -ALLOC_DEFINE(qd_pn_free_link_session_t); - - -// macros to work around PROTON-2184 (see container.h) -// -static inline void qd_session_link_pn(qd_session_t *qd_ssn, pn_session_t *pn_ssn) -{ - assert(qd_ssn && pn_ssn); - pn_session_set_context(pn_ssn, qd_ssn); -} - -static inline void qd_session_unlink_pn(qd_session_t *qd_ssn, pn_session_t *pn_ssn) -{ - assert(qd_ssn); - pn_session_set_context(pn_ssn, 0); -} +ALLOC_DEFINE(qd_pn_free_link_t); ALLOC_DECLARE(qd_session_t); ALLOC_DEFINE(qd_session_t); @@ -112,6 +105,12 @@ struct qd_container_t { qd_link_list_t links; }; +qd_session_t *qd_session(pn_session_t *pn_ssn); + +static inline qd_session_t *qd_session_from_pn(pn_session_t *pn_ssn) +{ + return (qd_session_t *)pn_session_get_context(pn_ssn); +} static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link) { @@ -128,12 +127,15 @@ static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link) sys_mutex_lock(&container->lock); DEQ_INSERT_TAIL(container->links, link); sys_mutex_unlock(&container->lock); - link->pn_sess = pn_link_session(pn_link); link->pn_link = pn_link; link->direction = QD_OUTGOING; link->remote_snd_settle_mode = pn_link_remote_snd_settle_mode(pn_link); + link->qd_session = qd_session_from_pn(pn_link_session(pn_link)); + assert(link->qd_session); + qd_session_incref(link->qd_session); + pn_link_set_context(pn_link, link); container->ntype->outgoing_handler(container->qd_router, link); } @@ -154,11 +156,14 @@ static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link, u sys_mutex_lock(&container->lock); DEQ_INSERT_TAIL(container->links, link); sys_mutex_unlock(&container->lock); - link->pn_sess = pn_link_session(pn_link); link->pn_link = pn_link; link->direction = QD_INCOMING; link->remote_snd_settle_mode = pn_link_remote_snd_settle_mode(pn_link); + link->qd_session = qd_session_from_pn(pn_link_session(pn_link)); + assert(link->qd_session); + qd_session_incref(link->qd_session); + if (max_size) { pn_link_set_max_message_size(pn_link, max_size); } @@ -260,11 +265,14 @@ static void cleanup_link(qd_link_t *link) if (link) { if (link->q3_blocked) qd_link_q3_unblock(link); + + qd_session_decref(link->qd_session); + link->qd_session = 0; + if (link->pn_link) { pn_link_set_context(link->pn_link, NULL); link->pn_link = 0; } - link->pn_sess = 0; // cleanup any inbound message that has not been forwarded qd_message_t *msg = qd_alloc_deref_safe_ptr(&link->incoming_msg); @@ -283,7 +291,7 @@ static int close_handler(qd_container_t *container, pn_connection_t *conn, qd_co if (qd_conn) qd_conn->closed = true; close_links(container, conn, true); - qd_session_cleanup(qd_conn); + qd_connection_release_sessions(qd_conn); if (qd_conn) notify_closed(container, qd_conn, qd_connection_get_context(qd_conn)); return 0; @@ -299,9 +307,9 @@ static void writable_handler(qd_container_t *container, pn_connection_t *conn, q /** * Returns true if the free_link already exists in free_link_list, false otherwise */ -static bool link_exists(qd_pn_free_link_session_list_t *free_list, pn_link_t *free_link) +static bool link_exists(qd_pn_free_link_list_t *free_list, pn_link_t *free_link) { - qd_pn_free_link_session_t *free_item = DEQ_HEAD(*free_list); + qd_pn_free_link_t *free_item = DEQ_HEAD(*free_list); while(free_item) { if (free_item->pn_link == free_link) return true; @@ -310,39 +318,13 @@ static bool link_exists(qd_pn_free_link_session_list_t *free_list, pn_link_t *fr return false; } -/** - * Returns true if the free_session already exists in free_session_list, false otherwise -*/ -static bool session_exists(qd_pn_free_link_session_list_t *free_list, pn_session_t *free_session) -{ - qd_pn_free_link_session_t *free_item = DEQ_HEAD(*free_list); - while(free_item) { - if (free_item->pn_session == free_session) - return true; - free_item = DEQ_NEXT(free_item); - } - return false; -} - -static void add_session_to_free_list(qd_pn_free_link_session_list_t *free_link_session_list, pn_session_t *ssn) +static void add_link_to_free_list(qd_pn_free_link_list_t *free_link_list, pn_link_t *pn_link) { - if (!session_exists(free_link_session_list, ssn)) { - qd_pn_free_link_session_t *to_free = new_qd_pn_free_link_session_t(); - DEQ_ITEM_INIT(to_free); - to_free->pn_session = ssn; - to_free->pn_link = 0; - DEQ_INSERT_TAIL(*free_link_session_list, to_free); - } -} - -static void add_link_to_free_list(qd_pn_free_link_session_list_t *free_link_session_list, pn_link_t *pn_link) -{ - if (!link_exists(free_link_session_list, pn_link)) { - qd_pn_free_link_session_t *to_free = new_qd_pn_free_link_session_t(); + if (!link_exists(free_link_list, pn_link)) { + qd_pn_free_link_t *to_free = new_qd_pn_free_link_t(); DEQ_ITEM_INIT(to_free); to_free->pn_link = pn_link; - to_free->pn_session = 0; - DEQ_INSERT_TAIL(*free_link_session_list, to_free); + DEQ_INSERT_TAIL(*free_link_list, to_free); } } @@ -355,7 +337,7 @@ static void add_link_to_free_list(qd_pn_free_link_session_list_t *free_link_sess */ void qd_conn_event_batch_complete(qd_container_t *container, qd_connection_t *qd_conn, bool conn_closed) { - qd_pn_free_link_session_t *to_free = DEQ_HEAD(qd_conn->free_link_session_list); + qd_pn_free_link_t *to_free = DEQ_HEAD(qd_conn->free_link_list); while(to_free) { if (!conn_closed) { @@ -367,12 +349,10 @@ void qd_conn_event_batch_complete(qd_container_t *container, qd_connection_t *qd pn_link_set_context(to_free->pn_link, 0); pn_link_free(to_free->pn_link); } - if (to_free->pn_session) - pn_session_free(to_free->pn_session); } - DEQ_REMOVE_HEAD(qd_conn->free_link_session_list); - free_qd_pn_free_link_session_t(to_free); - to_free = DEQ_HEAD(qd_conn->free_link_session_list); + DEQ_REMOVE_HEAD(qd_conn->free_link_list); + free_qd_pn_free_link_t(to_free); + to_free = DEQ_HEAD(qd_conn->free_link_list); } @@ -420,8 +400,8 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, qd_conn->closed = true; if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) { close_links(container, conn, false); - if(qd_conn) { - qd_session_cleanup(qd_conn); + if (qd_conn) { + qd_connection_release_sessions(qd_conn); } pn_connection_close(conn); if(qd_conn) { @@ -429,8 +409,8 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, } } else if (pn_connection_state(conn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { close_links(container, conn, false); - if(qd_conn) { - qd_session_cleanup(qd_conn); + if (qd_conn) { + qd_connection_release_sessions(qd_conn); notify_closed(container, qd_conn, qd_connection_get_context(qd_conn)); qd_conn_event_batch_complete(container, qd_conn, true); } @@ -442,7 +422,7 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, ssn = pn_event_session(event); if (pn_session_state(ssn) & PN_LOCAL_UNINIT) { // remote created new session - assert(qd_session_from_pn(ssn) == 0); + assert(qd_session_from_pn(ssn) == 0); // expect not already set up qd_session_t *qd_ssn = qd_session(ssn); if (!qd_ssn) { pn_condition_t *cond = pn_session_condition(ssn); @@ -453,14 +433,14 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, } if (qd_conn->policy_settings) { if (!qd_policy_approve_amqp_session(ssn, qd_conn)) { - qd_session_free(qd_ssn); + qd_session_decref(qd_ssn); break; } qd_conn->n_sessions++; } - qd_session_link_pn(qd_ssn, ssn); - qd_policy_apply_session_settings(ssn, qd_conn); - pn_session_open(ssn); + DEQ_INSERT_TAIL(qd_conn->child_sessions, qd_ssn); + qd_policy_apply_session_settings(qd_ssn->pn_session, qd_conn); + pn_session_open(qd_ssn->pn_session); } } break; @@ -468,8 +448,9 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, case PN_SESSION_LOCAL_CLOSE : ssn = pn_event_session(event); for (int i = 0; i < QD_SSN_CLASS_COUNT; ++i) { - if (ssn == qd_conn->pn_sessions[i]) { - qd_conn->pn_sessions[i] = 0; + if (qd_conn->qd_sessions[i] && ssn == qd_conn->qd_sessions[i]->pn_session) { + qd_session_decref(qd_conn->qd_sessions[i]); + qd_conn->qd_sessions[i] = 0; break; } } @@ -485,16 +466,19 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { qd_session_t *qd_ssn = qd_session_from_pn(ssn); - qd_session_free(qd_ssn); - add_session_to_free_list(&qd_conn->free_link_session_list, ssn); + if (qd_ssn) { + DEQ_REMOVE(qd_conn->child_sessions, qd_ssn); + qd_session_decref(qd_ssn); + } } break; case PN_SESSION_REMOTE_CLOSE : ssn = pn_event_session(event); for (int i = 0; i < QD_SSN_CLASS_COUNT; ++i) { - if (ssn == qd_conn->pn_sessions[i]) { - qd_conn->pn_sessions[i] = 0; + if (qd_conn->qd_sessions[i] && ssn == qd_conn->qd_sessions[i]->pn_session) { + qd_session_decref(qd_conn->qd_sessions[i]); + qd_conn->qd_sessions[i] = 0; break; } } @@ -545,8 +529,10 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, } else if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { qd_session_t *qd_ssn = qd_session_from_pn(ssn); - qd_session_free(qd_ssn); - add_session_to_free_list(&qd_conn->free_link_session_list, ssn); + if (qd_ssn) { + DEQ_REMOVE(qd_conn->child_sessions, qd_ssn); + qd_session_decref(qd_ssn); + } } } break; @@ -602,11 +588,11 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, } if (pn_link_state(pn_link) & PN_LOCAL_CLOSED) { - add_link_to_free_list(&qd_conn->free_link_session_list, pn_link); + add_link_to_free_list(&qd_conn->free_link_list, pn_link); } container->ntype->link_detach_handler(container->qd_router, qd_link, dt); } else { - add_link_to_free_list(&qd_conn->free_link_session_list, pn_link); + add_link_to_free_list(&qd_conn->free_link_list, pn_link); } } break; @@ -615,7 +601,7 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, case PN_LINK_LOCAL_CLOSE: pn_link = pn_event_link(event); if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { - add_link_to_free_list(&qd_conn->free_link_session_list, pn_link); + add_link_to_free_list(&qd_conn->free_link_list, pn_link); } break; @@ -698,21 +684,24 @@ qd_link_t *qd_link(qd_connection_t *conn, qd_direction_t dir, const char* name, { const qd_server_config_t * cf = qd_connection_config(conn); - pn_session_t *pn_ssn = conn->pn_sessions[ssn_class]; - if (!pn_ssn) { - pn_ssn = pn_session(qd_connection_pn(conn)); + qd_session_t *qd_ssn = conn->qd_sessions[ssn_class]; + if (!qd_ssn) { + // need to create a new AMQP session + pn_session_t *pn_ssn = pn_session(qd_connection_pn(conn)); if (!pn_ssn) { return NULL; } - qd_session_t *qd_ssn = qd_session(pn_ssn); + qd_ssn = qd_session(pn_ssn); if (!qd_ssn) { pn_session_free(pn_ssn); return NULL; // PN_LOCAL_CLOSE event will free pn_ssn } - conn->pn_sessions[ssn_class] = pn_ssn; - pn_session_set_incoming_capacity(pn_ssn, cf->incoming_capacity); - pn_session_open(pn_ssn); + DEQ_INSERT_TAIL(conn->child_sessions, qd_ssn); + conn->qd_sessions[ssn_class] = qd_ssn; + qd_session_incref(qd_ssn); + pn_session_set_incoming_capacity(qd_ssn->pn_session, cf->incoming_capacity); + pn_session_open(qd_ssn->pn_session); } qd_link_t *link = new_qd_link_t(); @@ -725,12 +714,13 @@ qd_link_t *qd_link(qd_connection_t *conn, qd_direction_t dir, const char* name, DEQ_INSERT_TAIL(amqp_adaptor.container->links, link); sys_mutex_unlock(&amqp_adaptor.container->lock); - link->pn_sess = pn_ssn; + link->qd_session = qd_ssn; + qd_session_incref(qd_ssn); if (dir == QD_OUTGOING) - link->pn_link = pn_sender(link->pn_sess, name); + link->pn_link = pn_sender(link->qd_session->pn_session, name); else - link->pn_link = pn_receiver(link->pn_sess, name); + link->pn_link = pn_receiver(link->qd_session->pn_session, name); link->direction = dir; link->remote_snd_settle_mode = pn_link_remote_snd_settle_mode(link->pn_link); @@ -778,9 +768,9 @@ pn_link_t *qd_link_pn(const qd_link_t *link) return link->pn_link; } -pn_session_t *qd_link_pn_session(qd_link_t *link) +qd_session_t *qd_link_session(qd_link_t *link) { - return link->pn_sess; + return link->qd_session; } @@ -872,11 +862,10 @@ void qd_link_detach(qd_link_t *link) void qd_link_q3_block(qd_link_t *link) { assert(link); - if (!link->q3_blocked && link->pn_sess) { - qd_session_t *qd_ssn = qd_session_from_pn(link->pn_sess); - assert(qd_ssn); + if (!link->q3_blocked) { + assert(link->qd_session); link->q3_blocked = true; - DEQ_INSERT_TAIL_N(Q3, qd_ssn->q3_blocked_links, link); + DEQ_INSERT_TAIL_N(Q3, link->qd_session->q3_blocked_links, link); } } @@ -885,9 +874,8 @@ void qd_link_q3_unblock(qd_link_t *link) { assert(link); if (link->q3_blocked) { - qd_session_t *qd_ssn = qd_session_from_pn(link->pn_sess); - assert(qd_ssn); - DEQ_REMOVE_N(Q3, qd_ssn->q3_blocked_links, link); + assert(link->qd_session); + DEQ_REMOVE_N(Q3, link->qd_session->q3_blocked_links, link); link->q3_blocked = false; } } @@ -905,66 +893,129 @@ void qd_link_set_link_id(qd_link_t *link, uint64_t link_id) } +qd_session_t *qd_link_get_session(const qd_link_t *link) +{ + assert(link); + return link->qd_session; +} + + qd_session_t *qd_session(pn_session_t *pn_ssn) { - assert(pn_ssn); - qd_session_t *qd_ssn = qd_session_from_pn(pn_ssn); - if (!qd_ssn) { - qd_ssn = new_qd_session_t(); - if (qd_ssn) { - ZERO(qd_ssn); - qd_ssn->pn_sess = pn_ssn; - DEQ_INIT(qd_ssn->q3_blocked_links); - qd_session_link_pn(qd_ssn, pn_ssn); - } + assert(pn_ssn && qd_session_from_pn(pn_ssn) == 0); + qd_session_t *qd_ssn = new_qd_session_t(); + if (qd_ssn) { + ZERO(qd_ssn); + DEQ_ITEM_INIT(qd_ssn); + sys_atomic_init(&qd_ssn->ref_count, 1); + qd_ssn->pn_session = pn_ssn; + DEQ_INIT(qd_ssn->q3_blocked_links); + pn_session_set_context(pn_ssn, qd_ssn); + + // @TODO(kgiusti) make these dependent on connection role + qd_ssn->out_window_limit = QD_QLIMIT_Q3_UPPER * QD_BUFFER_SIZE; + qd_ssn->out_window_low_watermark = QD_QLIMIT_Q3_LOWER * QD_BUFFER_SIZE; } return qd_ssn; } -void qd_session_free(qd_session_t *qd_ssn) +void qd_session_incref(qd_session_t *qd_ssn) +{ + assert(qd_ssn); + uint32_t rc = sys_atomic_inc(&qd_ssn->ref_count); + (void) rc; + assert(rc != 0); // caller did not have a valid ref +} + + +void qd_session_decref(qd_session_t *qd_ssn) { if (qd_ssn) { - qd_link_t *link = DEQ_HEAD(qd_ssn->q3_blocked_links); - while (link) { - qd_link_q3_unblock(link); // removes link from list - link = DEQ_HEAD(qd_ssn->q3_blocked_links); - } - if (qd_ssn->pn_sess) { - qd_session_unlink_pn(qd_ssn, qd_ssn->pn_sess); + uint32_t rc = sys_atomic_dec(&qd_ssn->ref_count); + assert(rc != 0); // underflow + if (rc == 1) { + qd_link_t *link = DEQ_HEAD(qd_ssn->q3_blocked_links); + while (link) { + qd_link_q3_unblock(link); // removes link from list + link = DEQ_HEAD(qd_ssn->q3_blocked_links); + } + if (qd_ssn->pn_session) { + assert(qd_session_from_pn(qd_ssn->pn_session) == qd_ssn); + pn_session_set_context(qd_ssn->pn_session, 0); + pn_session_free(qd_ssn->pn_session); + qd_ssn->pn_session = 0; + } + sys_atomic_destroy(&qd_ssn->ref_count); + free_qd_session_t(qd_ssn); } - free_qd_session_t(qd_ssn); } } qd_link_list_t *qd_session_q3_blocked_links(qd_session_t *qd_ssn) { - return qd_ssn ? &qd_ssn->q3_blocked_links : 0; + assert(qd_ssn); + return &qd_ssn->q3_blocked_links; } bool qd_session_is_q3_blocked(const qd_session_t *qd_ssn) { - return qd_ssn && !DEQ_IS_EMPTY(qd_ssn->q3_blocked_links); + assert(qd_ssn); + return !DEQ_IS_EMPTY(qd_ssn->q3_blocked_links); +} + + +/** Get outgoing window capacity in bytes + * + * Returns the available outgoing data capacity for the session. This capacity must be shared by all sending links on + * this session. + */ +size_t qd_session_get_outgoing_capacity(const qd_session_t *qd_ssn) +{ + assert(qd_ssn && qd_ssn->pn_session); + + size_t buffered = pn_session_outgoing_bytes(qd_ssn->pn_session); + if (buffered < qd_ssn->out_window_limit) { + return qd_ssn->out_window_limit - buffered; + } + return 0; } +/** Return the session outgoing window low water mark + * + * Blocked session can resume output once the available outgoing capacity reaches at least this value + */ +size_t qd_session_get_outgoing_threshold(const qd_session_t *qd_ssn) +{ + assert(qd_ssn); + return qd_ssn->out_window_low_watermark; +} + -/** release all qd_session_t instances for the connection - * called prior to releasing the qd_connection_t +/** Release all of the connections sessions + * + * The connection is about to be freed. When this happens proton will free all the pn_session_t automagically. So clear + * the session pointers and drop the reference. It is possible that a qd_link_t is still referencing a session but if it + * accesses the pn_session_t after this point that's a bug. */ -void qd_session_cleanup(qd_connection_t *qd_conn) +void qd_connection_release_sessions(qd_connection_t *qd_conn) { pn_connection_t *pn_conn = (qd_conn) ? qd_conn->pn_conn : 0; if (!pn_conn) return; - pn_session_t *pn_ssn = pn_session_head(pn_conn, 0); - while (pn_ssn) { - qd_session_t *qd_ssn = qd_session_from_pn(pn_ssn); - qd_session_free(qd_ssn); - pn_ssn = pn_session_next(pn_ssn, 0); + qd_session_t *qd_ssn = DEQ_HEAD(qd_conn->child_sessions); + while (qd_ssn) { + DEQ_REMOVE_HEAD(qd_conn->child_sessions); + if (qd_ssn->pn_session) { + pn_session_set_context(qd_ssn->pn_session, 0); + qd_ssn->pn_session = 0; + } + qd_session_decref(qd_ssn); + qd_ssn = DEQ_HEAD(qd_conn->child_sessions); } } diff --git a/src/adaptors/amqp/container.h b/src/adaptors/amqp/container.h index da1950146..757d71846 100644 --- a/src/adaptors/amqp/container.h +++ b/src/adaptors/amqp/container.h @@ -93,7 +93,6 @@ void policy_notify_opened(void *container, qd_connection_t *conn, void *context) qd_direction_t qd_link_direction(const qd_link_t *link); void qd_link_set_q2_limit_unbounded(qd_link_t *link, bool q2_limit_unbounded); pn_snd_settle_mode_t qd_link_remote_snd_settle_mode(const qd_link_t *link); -pn_session_t *qd_link_pn_session(qd_link_t *link); pn_terminus_t *qd_link_source(qd_link_t *link); pn_terminus_t *qd_link_target(qd_link_t *link); pn_terminus_t *qd_link_remote_source(qd_link_t *link); @@ -108,17 +107,16 @@ uint64_t qd_link_link_id(const qd_link_t *link); void qd_link_set_link_id(qd_link_t *link, uint64_t link_id); struct qd_message_t; void qd_link_set_incoming_msg(qd_link_t *link, struct qd_message_t *msg); +qd_session_t *qd_link_get_session(const qd_link_t *link); -qd_session_t *qd_session(pn_session_t *pn_ssn); -void qd_session_cleanup(qd_connection_t *qd_conn); -void qd_session_free(qd_session_t *qd_ssn); +void qd_session_incref(qd_session_t *qd_ssn); +void qd_session_decref(qd_session_t *qd_ssn); bool qd_session_is_q3_blocked(const qd_session_t *qd_ssn); qd_link_list_t *qd_session_q3_blocked_links(qd_session_t *qd_ssn); +size_t qd_session_get_outgoing_capacity(const qd_session_t *qd_ssn); +size_t qd_session_get_outgoing_threshold(const qd_session_t *qd_ssn); -static inline qd_session_t *qd_session_from_pn(pn_session_t *pn_ssn) -{ - return (qd_session_t *)pn_session_get_context(pn_ssn); -} +void qd_connection_release_sessions(qd_connection_t *qd_conn); ///@} #endif diff --git a/src/adaptors/amqp/qd_connection.c b/src/adaptors/amqp/qd_connection.c index acbaeec28..b7088338e 100644 --- a/src/adaptors/amqp/qd_connection.c +++ b/src/adaptors/amqp/qd_connection.c @@ -22,6 +22,7 @@ #include "qd_connector.h" #include "qd_listener.h" #include "policy.h" +#include "container.h" #include "qpid/dispatch/dispatch.h" #include "qpid/dispatch/proton_utils.h" @@ -258,7 +259,8 @@ void qd_connection_init(qd_connection_t *ctx, qd_server_t *server, qd_server_con pn_connection_set_context(ctx->pn_conn, ctx); DEQ_ITEM_INIT(ctx); DEQ_INIT(ctx->deferred_calls); - DEQ_INIT(ctx->free_link_session_list); + DEQ_INIT(ctx->free_link_list); + DEQ_INIT(ctx->child_sessions); // note: setup connector or listener before decorating the connection since // decoration involves accessing the connection's parent. @@ -381,6 +383,9 @@ static void qd_connection_free(qd_connection_t *qd_conn, const char *condition_n if (qd_conn->timer) qd_timer_free(qd_conn->timer); free(qd_conn->name); free(qd_conn->role); + for (int i = 0; i < QD_SSN_CLASS_COUNT; ++i) + qd_session_decref(qd_conn->qd_sessions[i]); + qd_connection_release_sessions(qd_conn); qd_tls_session_free(qd_conn->ssl); sys_atomic_destroy(&qd_conn->wake_core); sys_atomic_destroy(&qd_conn->wake_cutthrough_inbound); diff --git a/src/adaptors/amqp/qd_connection.h b/src/adaptors/amqp/qd_connection.h index 862d424f7..658c633e3 100644 --- a/src/adaptors/amqp/qd_connection.h +++ b/src/adaptors/amqp/qd_connection.h @@ -43,7 +43,7 @@ typedef struct qd_listener_t qd_listener_t; typedef struct qd_connector_t qd_connector_t; typedef struct qd_policy_settings_t qd_policy_settings_t; typedef struct pn_connection_t pn_connection_t; -typedef struct pn_session_t pn_session_t; +typedef struct qd_session_t qd_session_t; typedef struct qd_timer_t qd_timer_t; typedef struct qd_tls_session_t qd_tls_session_t; @@ -60,8 +60,10 @@ typedef struct qd_deferred_call_t { DEQ_DECLARE(qd_deferred_call_t, qd_deferred_call_list_t); -typedef struct qd_pn_free_link_session_t qd_pn_free_link_session_t; -DEQ_DECLARE(qd_pn_free_link_session_t, qd_pn_free_link_session_list_t); +typedef struct qd_pn_free_link_t qd_pn_free_link_t; +DEQ_DECLARE(qd_pn_free_link_t, qd_pn_free_link_list_t); + +DEQ_DECLARE(qd_session_t, qd_session_list_t); /** @@ -78,7 +80,8 @@ struct qd_connection_t { int enqueued; qd_timer_t *timer; // Timer for initial-setup pn_connection_t *pn_conn; - pn_session_t *pn_sessions[QD_SSN_CLASS_COUNT]; + qd_session_t *qd_sessions[QD_SSN_CLASS_COUNT]; // long lived inter-router sessions + qd_session_list_t child_sessions; // all active sessions qd_tls_session_t *ssl; qd_listener_t *listener; qd_connector_t *connector; @@ -97,7 +100,7 @@ struct qd_connection_t { bool policy_counted; char *role; //The specified role of the connection, e.g. "normal", "inter-router", "route-container" etc. char group_correlator[QD_DISCRIMINATOR_SIZE]; - qd_pn_free_link_session_list_t free_link_session_list; + qd_pn_free_link_list_t free_link_list; bool strip_annotations_in; bool strip_annotations_out; void (*wake)(qd_connection_t*); /* Wake method, different for libwebsockets vs. proactor */