From bb2de31225a320f8acfab38d840bea9af9051952 Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Tue, 12 Nov 2024 16:36:51 -0500 Subject: [PATCH] fixup: adjust limits based on peer window, review input --- include/qpid/dispatch/amqp_adaptor.h | 5 -- src/adaptors/amqp/amqp_adaptor.c | 2 +- src/adaptors/amqp/container.c | 105 +++++++++++++++++++++++---- src/adaptors/amqp/container.h | 6 +- src/adaptors/amqp/server_config.c | 6 +- 5 files changed, 98 insertions(+), 26 deletions(-) diff --git a/include/qpid/dispatch/amqp_adaptor.h b/include/qpid/dispatch/amqp_adaptor.h index 2f9ee64a3..5e6fad9f8 100644 --- a/include/qpid/dispatch/amqp_adaptor.h +++ b/include/qpid/dispatch/amqp_adaptor.h @@ -36,11 +36,6 @@ typedef struct qd_connection_t qd_connection_t; typedef struct qd_link_t qd_link_t; typedef struct qd_session_t qd_session_t; -// Session windowing limits -extern const size_t qd_session_max_in_win_user; // incoming window byte limit for user connections -extern const size_t qd_session_max_in_win_trunk; // incoming window byte limit for inter-router connections -extern const size_t qd_session_max_outgoing_bytes; // limit to outgoing buffered data -extern const size_t qd_session_low_outgoing_bytes; // low water mark to resume buffering outgoing data // For use by message.c diff --git a/src/adaptors/amqp/amqp_adaptor.c b/src/adaptors/amqp/amqp_adaptor.c index afb256247..9a206f74b 100644 --- a/src/adaptors/amqp/amqp_adaptor.c +++ b/src/adaptors/amqp/amqp_adaptor.c @@ -1172,7 +1172,7 @@ static int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link) qd_session_t *qd_ssn = qd_link_get_session(link); if (qd_session_is_q3_blocked(qd_ssn)) { // Q3 blocked - have we drained enough outgoing bytes? - if (qd_session_get_outgoing_capacity(qd_ssn) >= qd_session_low_outgoing_bytes) { + if (qd_session_get_outgoing_capacity(qd_ssn) >= qd_session_get_outgoing_capacity_low_threshold(qd_ssn)) { // yes. We must now unblock all links that have been blocked by Q3 qd_link_list_t *blinks = qd_session_q3_blocked_links(qd_ssn); diff --git a/src/adaptors/amqp/container.c b/src/adaptors/amqp/container.c index 38c73e571..14bef638e 100644 --- a/src/adaptors/amqp/container.c +++ b/src/adaptors/amqp/container.c @@ -69,25 +69,36 @@ struct qd_session_t { sys_atomic_t ref_count; pn_session_t *pn_session; qd_link_list_t q3_blocked_links; ///< Q3 blocked if !empty + + // remotes maximum incoming frame size in bytes (see AMQP 1.0 Open Performative) uint32_t remote_max_frame; + + // remotes incoming window size in frames (see AMQP 1.0 Begin Performative) + uint32_t remote_max_incoming_window; + + // Session outgoing flow control: Stop writing outgoing data (calling pn_link_send()) to the session when the total + // number of buffered bytes has exceeded the high threshold (see Proton pn_session_outgoing_bytes()). Resume writing + // data when the session has sent enough data to reduce the number of buffered output bytes to below the low + // threshold. This prevents the router from buffering too much output data before allowing Proton to write it out. + // See qd_session_get_outgoing_capacity() for details. + size_t outgoing_bytes_high_threshold; + size_t outgoing_bytes_low_threshold; }; -// Session window limits + +// Session window limits (See Section 2.5.6 Session Flow Control in AMQP V1.0 Specification) // // A session incoming window determines how many incoming frames the session will accept across all incoming links. This // places a limit on the number of incoming data bytes that have to be buffered on the session (connection max-frame * // max incoming window frames). The local session incoming window configuration is determined by the maxFrameSize and // maxSessionFrames configuration attributes of an AMQP listener/connector. // -// The remote peers session window must be honored when writing output to a sending link. In addition we limit the -// amount of outgoing data that can be buffered on a session before control is returned to Proton. This is necessary to -// improve latency and allow capacity sharing among all links on the session. +// The remote peers session window must be honored when writing output to a sending link: we must not send more data +// than the window allows // -const size_t qd_session_max_outgoing_bytes = 1048576; // max buffered bytes on a session -const size_t qd_session_low_outgoing_bytes = 524288; // low watermark for max buffered bytes - -const size_t qd_session_max_in_win_user = (size_t) 8388608; // AMQP application in window max bytes 8MB -const size_t qd_session_max_in_win_trunk = (size_t) 134217728; // inter-router in window max bytes 128MB +// Default window settings (in bytes not frames). Give inter-router connections a larger window for better performance. +const size_t qd_session_incoming_window_normal = (size_t) 8388608; // window for role=normal connections (8MB) +const size_t qd_session_incoming_window_router = (size_t) 134217728; // window for inter-router connections (128MB) // Can we leverage the new Proton Session Window API? @@ -125,6 +136,9 @@ struct qd_container_t { }; qd_session_t *qd_session(pn_session_t *pn_ssn); +static void qd_session_configure_incoming_window(qd_session_t *qd_ssn, uint32_t in_window); +static void qd_session_set_remote_incoming_window(qd_session_t *qd_ssn, uint32_t in_window); + static inline qd_session_t *qd_session_from_pn(pn_session_t *pn_ssn) { @@ -460,9 +474,16 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, DEQ_INSERT_TAIL(qd_conn->child_sessions, qd_ssn); uint32_t in_window; qd_policy_get_session_settings(qd_conn, &in_window); - qd_session_set_max_in_window(qd_ssn, in_window); + qd_session_configure_incoming_window(qd_ssn, in_window); pn_session_open(qd_ssn->pn_session); } +#if USE_PN_SESSION_WINDOWS + // Remote has opened, now the remote incoming window is available + qd_session_t *qd_ssn = qd_session_from_pn(ssn); + assert(qd_ssn); + qd_session_set_remote_incoming_window(qd_ssn, + pn_session_remote_incoming_window(ssn)); +#endif } break; @@ -721,7 +742,7 @@ qd_link_t *qd_link(qd_connection_t *conn, qd_direction_t dir, const char* name, DEQ_INSERT_TAIL(conn->child_sessions, qd_ssn); conn->qd_sessions[ssn_class] = qd_ssn; qd_session_incref(qd_ssn); - qd_session_set_max_in_window(qd_ssn, cf->session_max_in_window); + qd_session_configure_incoming_window(qd_ssn, cf->session_max_in_window); pn_session_open(qd_ssn->pn_session); } @@ -931,6 +952,10 @@ qd_session_t *qd_session(pn_session_t *pn_ssn) pn_session_set_context(pn_ssn, qd_ssn); qd_ssn->remote_max_frame = pn_transport_get_remote_max_frame(pn_tport); assert(qd_ssn->remote_max_frame != 0); + + // These thresholds come from the old Q3 session byte limits + qd_ssn->outgoing_bytes_high_threshold = 1048576; + qd_ssn->outgoing_bytes_low_threshold = 524288; } return qd_ssn; } @@ -987,6 +1012,15 @@ bool qd_session_is_q3_blocked(const qd_session_t *qd_ssn) * * Returns the available outgoing data capacity for the session. This capacity must be shared by all sending links on * this session. + * + * The capacity is determined by the remotes current incoming window minus any outgoing bytes already written to the + * session. In other words: + * + * capacity = pn_remote_incoming_window(session) - pn_session_outgoing_bytes(session) + * + * However we must also prevent the router from buffering too much outgoing data at once. This is especially a problem when + * the remote uses an unlimited incoming window (default proton behavior). To prevent this we set an additional limit to + * the maximum amount of outgoing data that can be buffered in the session. */ size_t qd_session_get_outgoing_capacity(const qd_session_t *qd_ssn) { @@ -994,12 +1028,12 @@ size_t qd_session_get_outgoing_capacity(const qd_session_t *qd_ssn) // discount any data already written but not yet sent size_t buffered = pn_session_outgoing_bytes(qd_ssn->pn_session); - if (buffered >= qd_session_max_outgoing_bytes) + if (buffered >= qd_ssn->outgoing_bytes_high_threshold) return 0; // exceeded maximum buffered limit - size_t avail = qd_session_max_outgoing_bytes - buffered; + size_t avail = qd_ssn->outgoing_bytes_high_threshold - buffered; #if USE_PN_SESSION_WINDOWS - // never exceed the remaining in window of the peer + // never exceed the remaining incoming window capacity of the peer size_t limit = pn_session_remote_incoming_window(qd_ssn->pn_session); limit *= qd_ssn->remote_max_frame; return MIN(avail, limit); @@ -1009,12 +1043,26 @@ size_t qd_session_get_outgoing_capacity(const qd_session_t *qd_ssn) } -/** Configure the sessions incoming window limit +/** Get the sessions current outgoing capacity low threshold + * + * Returns the lower threshold for the sessions outgoing capacity. This threshold is used for resuming blocked output on + * the session. Output can resume once the available outgoing capacity increases beyond this threshold. + */ +size_t qd_session_get_outgoing_capacity_low_threshold(const qd_session_t *qd_ssn) +{ + return qd_ssn->outgoing_bytes_low_threshold; +} + + +/** Configure the sessions local incoming window limit. + * + * This sets the value of the incoming window for the session. This value is sent to the remote peer in the Begin + * Performative. * * @param qd_ssn Session to configure * @param in_window maximum incoming window in frames */ -void qd_session_set_max_in_window(qd_session_t *qd_ssn, uint32_t in_window) +static void qd_session_configure_incoming_window(qd_session_t *qd_ssn, uint32_t in_window) { // older proton session windowing would stall so do not enable it #if USE_PN_SESSION_WINDOWS @@ -1026,6 +1074,31 @@ void qd_session_set_max_in_window(qd_session_t *qd_ssn, uint32_t in_window) #endif } +/** Set the session incoming window that was advertised by the remote + * + * This is the value for the remotes incoming session window. It arrives in the Begin Performative and may be updated by + * arriving Flow Performatives. + * + * @param qd_ssn Session to update + * @param in_window the incoming window as given by the remote. + */ +static void qd_session_set_remote_incoming_window(qd_session_t *qd_ssn, uint32_t in_window) +{ + assert(in_window != 0); + + // Record the largest window advertised by the remote. + if (in_window > qd_ssn->remote_max_incoming_window) { + qd_ssn->remote_max_incoming_window = in_window; + // if the remotes max window is smaller than the default outgoing bytes limit then adjust the limits down + // otherwise we may never resume sending on blocked links (stall) since the low limit will never be exceeded. + size_t window_bytes = in_window * qd_ssn->remote_max_frame; + if (window_bytes < qd_ssn->outgoing_bytes_high_threshold) { + qd_ssn->outgoing_bytes_high_threshold = window_bytes; + qd_ssn->outgoing_bytes_low_threshold = window_bytes / 2; + } + } +} + /** Release all of the connections sessions * diff --git a/src/adaptors/amqp/container.h b/src/adaptors/amqp/container.h index e7938c43b..4766b7fe5 100644 --- a/src/adaptors/amqp/container.h +++ b/src/adaptors/amqp/container.h @@ -112,9 +112,13 @@ void qd_session_incref(qd_session_t *qd_ssn); void qd_session_decref(qd_session_t *qd_ssn); bool qd_session_is_q3_blocked(const qd_session_t *qd_ssn); qd_link_list_t *qd_session_q3_blocked_links(qd_session_t *qd_ssn); -void qd_session_set_max_in_window(qd_session_t *qd_ssn, uint32_t in_window); +size_t qd_session_get_outgoing_capacity_low_threshold(const qd_session_t *qd_ssn); void qd_connection_release_sessions(qd_connection_t *qd_conn); +// Defaults for session incoming window size +extern const size_t qd_session_incoming_window_normal; +extern const size_t qd_session_incoming_window_router; + ///@} #endif diff --git a/src/adaptors/amqp/server_config.c b/src/adaptors/amqp/server_config.c index b3c09950f..0856b6d07 100644 --- a/src/adaptors/amqp/server_config.c +++ b/src/adaptors/amqp/server_config.c @@ -20,7 +20,7 @@ /* * Configuration record for listener and connector entities */ - +#include "container.h" #include "server_config.h" #include "dispatch_private.h" #include "entity.h" @@ -219,9 +219,9 @@ qd_error_t qd_server_config_load(qd_dispatch_t *qd, qd_server_config_t *config, if (value == 0) { // Use a sane default. Allow router to router links more capacity than AMQP application links if (strcmp(config->role, "normal") == 0) { - value = qd_session_max_in_win_user / config->max_frame_size; + value = qd_session_incoming_window_normal / config->max_frame_size; } else { - value = qd_session_max_in_win_trunk / config->max_frame_size; + value = qd_session_incoming_window_router / config->max_frame_size; } // Ensure the window is at least 2 frames to allow a non-zero low water mark value = MAX(value, 2);