Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #1107 - Use a tiered list of read-grant amounts based on percen… #1110

Merged
merged 5 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/qpid/dispatch/alloc_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ static inline void *qd_alloc_deref_safe_ptr(const qd_alloc_safe_ptr_t *sp)
void free_##T(T *p); \
typedef qd_alloc_safe_ptr_t T##_sp; \
void set_safe_ptr_##T(T *p, T##_sp *sp); \
T *safe_deref_##T(T##_sp sp)
T *safe_deref_##T(T##_sp sp); \
qd_alloc_stats_t *alloc_stats_##T(void)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we could call this get_alloc_stats_##T instead of alloc_stats_##T

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this a separate issue. It was already there, I just added it to the declaration.


/**
* Define allocator configuration.
Expand Down
112 changes: 106 additions & 6 deletions src/adaptors/adaptor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
* under the License.
*/
#include "adaptor_common.h"
#include "adaptor_buffer.h"

#include "qpid/dispatch/amqp.h"
#include "qpid/dispatch/connection_manager.h"
#include "qpid/dispatch/ctools.h"
#include "qpid/dispatch/log.h"

#include <proton/netaddr.h>

Expand All @@ -29,6 +31,11 @@

ALLOC_DEFINE(qd_adaptor_config_t);

static uint64_t buffer_ceiling = 0;
static uint64_t buffer_threshold_50;
static uint64_t buffer_threshold_75;
static uint64_t buffer_threshold_85;

void qd_free_adaptor_config(qd_adaptor_config_t *config)
{
if (!config)
Expand Down Expand Up @@ -72,28 +79,121 @@ qd_error_t qd_load_adaptor_config(qd_adaptor_config_t *config, qd_entity_t *enti
return qd_error_code();
}

void qd_adaptor_common_init(void)
{
if (buffer_ceiling > 0) {
return;
}

char *ceiling_string = getenv("SKUPPER_ROUTER_MEMORY_CEILING");
uint64_t memory_ceiling = (uint64_t) 4 * (uint64_t) 1024 * (uint64_t) 1024 * (uint64_t) 1024; // 4 Gig default

if (!!ceiling_string) {
long long convert = atoll(ceiling_string);
if (convert > 0) {
memory_ceiling = (uint64_t) convert;
}
}

buffer_ceiling = MAX(memory_ceiling / QD_ADAPTOR_MAX_BUFFER_SIZE, 100);
buffer_threshold_50 = buffer_ceiling / 2;
buffer_threshold_75 = (buffer_ceiling / 20) * 15;
buffer_threshold_85 = (buffer_ceiling / 20) * 17;

qd_log(LOG_ROUTER, QD_LOG_INFO, "Adaptor buffer memory ceiling: %"PRIu64" (%"PRIu64" buffers)", memory_ceiling, buffer_ceiling);
}

int qd_raw_connection_grant_read_buffers(pn_raw_connection_t *pn_raw_conn)
{
//
// Define the allocation tiers. The tier values are the number of read buffers to be granted
// to raw connections based on the percentage of usage of the router-wide buffer ceiling.
//
#define TIER_1 8 // [0% .. 50%)
#define TIER_2 4 // [50% .. 75%)
#define TIER_3 2 // [75% .. 85%)
#define TIER_4 2 // [85% .. 100%]

assert(pn_raw_conn);
pn_raw_buffer_t raw_buffers[RAW_BUFFER_BATCH];
size_t desired = pn_raw_connection_read_buffers_capacity(pn_raw_conn);
const size_t granted = desired;

while (desired) {
//
// Get the read-buffer capacity for the connection.
//
size_t capacity = pn_raw_connection_read_buffers_capacity(pn_raw_conn);

//
// If there's no capacity, exit now before doing any further wasted work.
//
if (capacity == 0) {
return 0;
}

//
// Since we can't query Proton for the maximum read-buffer capacity, we will infer it from
// calls to pn_raw_connection_read_buffers_capacity.
//
static size_t max_capacity = 0;
if (capacity > max_capacity) {
max_capacity = capacity;
}

//
// Get the "held_by_threads" stats for adaptor buffers as an approximation of how many
// buffers are in-use. This is an approximation since it also counts free buffers held
// in the per-thread free-pools. Since we will be dealing with large numbers here, the
// number of buffers in free-pools will not be significant.
//
// Note that there is a thread race on the access of this value. There's no danger associated
// with getting a partial or corrupted value from time to time.
//
// Note also that the stats pointer may be NULL if no buffers have yet been allocated.
//
qd_alloc_stats_t *stats = alloc_stats_qd_adaptor_buffer_t();
uint64_t buffers_in_use = !!stats ? stats->held_by_threads : 0;

//
// Choose the grant-allocation tier based on the number of buffers in use.
//
size_t desired = TIER_4;
if (buffers_in_use < buffer_threshold_50) {
desired = TIER_1;
} else if (buffers_in_use < buffer_threshold_75) {
desired = TIER_2;
} else if (buffers_in_use < buffer_threshold_85) {
desired = TIER_3;
}

//
// Determine how many of the desired buffers are already granted. This will always be a
// non-negative value.
//
size_t already_granted = max_capacity - capacity;

//
// If we desire to grant additional buffers, calculate the number to grant now.
//
const size_t to_grant = desired > already_granted ? desired - already_granted : 0;
size_t count = to_grant;

//
// Grant the buffers in batches.
//
while (count) {
int i;
for (i = 0; i < desired && i < RAW_BUFFER_BATCH; ++i) {
for (i = 0; i < count && i < RAW_BUFFER_BATCH; ++i) {
qd_adaptor_buffer_t *buf = qd_adaptor_buffer();
raw_buffers[i].bytes = (char *) qd_adaptor_buffer_base(buf);
raw_buffers[i].capacity = qd_adaptor_buffer_capacity(buf);
raw_buffers[i].size = 0;
raw_buffers[i].offset = 0;
raw_buffers[i].context = (uintptr_t) buf;
}
desired -= i;
count -= i;
pn_raw_connection_give_read_buffers(pn_raw_conn, raw_buffers, i);
}

return granted;
return to_grant;
}

int qd_raw_connection_write_buffers(pn_raw_connection_t *pn_raw_conn, qd_adaptor_buffer_list_t *blist)
Expand Down
5 changes: 5 additions & 0 deletions src/adaptors/adaptor_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ ALLOC_DECLARE(qd_adaptor_config_t);
qd_error_t qd_load_adaptor_config(qd_adaptor_config_t *config, qd_entity_t *entity);
void qd_free_adaptor_config(qd_adaptor_config_t *config);

/**
* Perform router-startup actions used in the common module.
*/
void qd_adaptor_common_init(void);

/**
* Grants as many read qd_adaptor buffers as returned by pn_raw_connection_read_buffers_capacity().
* Maximum read capacity is set to 16 in proton raw api.
Expand Down
1 change: 1 addition & 0 deletions src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -2234,6 +2234,7 @@ static void qdr_tcp_activate_CT(void *notused, qdr_connection_t *c)
*/
static void qdr_tcp_adaptor_init(qdr_core_t *core, void **adaptor_context)
{
qd_adaptor_common_init();
qdr_tcp_adaptor_t *adaptor = NEW(qdr_tcp_adaptor_t);
adaptor->core = core;
adaptor->adaptor = qdr_protocol_adaptor(core,
Expand Down
3 changes: 3 additions & 0 deletions tests/tsan.supp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ race:^__lws_logv$
# ISSUE-537 - Suppress the race in qd_entity_refresh_connector for now.
race:^qd_entity_refresh_connector$

# #1107 - Suppress warnings on benign race in accessing memory stats
race:^qd_raw_connection_grant_read_buffers$


#
# External libraries
Expand Down