Skip to content

Commit

Permalink
Fixes skupperproject#1259: New feature: periodic logging of alloc_poo…
Browse files Browse the repository at this point in the history
…l usage
  • Loading branch information
kgiusti committed Oct 17, 2023
1 parent 3d81fb8 commit fd33391
Show file tree
Hide file tree
Showing 10 changed files with 180 additions and 18 deletions.
5 changes: 5 additions & 0 deletions include/qpid/dispatch/alloc_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,9 @@ void qd_alloc_initialize(void);
void qd_alloc_debug_dump(const char *file);
void qd_alloc_finalize(void);
size_t qd_alloc_type_size(const qd_alloc_type_desc_t *desc); // thread safe

// control the periodic logging of alloc pool utilization
typedef struct qd_dispatch_t qd_dispatch_t;
void qd_alloc_start_monitor(qd_dispatch_t *qd);
void qd_alloc_stop_monitor(void);
#endif
4 changes: 2 additions & 2 deletions include/qpid/dispatch/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ void qd_vlog_impl(qd_log_module_t module, qd_log_level_t level, bool check_level
qd_vlog_impl(module, level, true, __FILE__, __LINE__, fmt, ap); \
} while (0)

/** Maximum length for a log message */
int qd_log_max_len(void);
/** Maximum length for a log message (including null terminator byte!) */
#define QD_LOG_TEXT_MAX 2048 // note: keep this small to allow stack-based buffers

void qd_format_string(char *buf, int buf_size, const char *fmt, ...) __attribute__((format(printf, 3, 4)));

Expand Down
6 changes: 3 additions & 3 deletions include/qpid/dispatch/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -482,12 +482,12 @@ int qd_message_stream_data_footer_append(qd_message_t *message, qd_buffer_list_t
int qd_message_stream_data_append(qd_message_t *msg, qd_buffer_list_t *data, bool *q2_blocked);


/** Put string representation of a message suitable for logging in buffer.
/** Put string representation of a message suitable for logging in buffer. Note that log message text is limited to
* QD_LOG_TEXT_MAX bytes which includes the terminating null byte.
*
* @return buffer
*/
char* qd_message_repr(qd_message_t *msg, char* buffer, size_t len, qd_log_bits log_message);
/** Recommended buffer length for qd_message_repr */
int qd_message_repr_len(void);

qd_log_source_t *qd_message_log_source(void);

Expand Down
121 changes: 121 additions & 0 deletions src/alloc_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
#include "entity_cache.h"
#include "http.h"
#include "qd_asan_interface.h"
#include "aprintf.h"

#include "qpid/dispatch/alloc.h"
#include "qpid/dispatch/ctools.h"
#include "qpid/dispatch/log.h"
#include "qpid/dispatch/timer.h"
#include "qpid/dispatch/platform.h"

#include "proton/version.h"

Expand Down Expand Up @@ -697,6 +700,23 @@ size_t qd_alloc_type_size(const qd_alloc_type_desc_t *desc)
return desc->total_size;
}

uint64_t qd_alloc_memory_usage(void)
{
#ifdef QD_MEMORY_DEBUG
assert(alloc_pool_ready); // need to all qd_alloc_initialize first!
#endif

uint64_t total = 0;
for (qd_alloc_type_desc_t *desc = DEQ_HEAD(desc_list); desc; desc = DEQ_NEXT(desc)) {
sys_mutex_lock(&desc->lock);
if (desc->stats.total_alloc_from_heap) {
total += desc->stats.total_alloc_from_heap - desc->stats.total_free_to_heap;
}
sys_mutex_unlock(&desc->lock);
}
return total;
}

void qd_alloc_debug_dump(const char *file) {
debug_dump = file ? strdup(file) : 0;
}
Expand All @@ -717,3 +737,104 @@ void qd_alloc_desc_init(const char *name, qd_alloc_type_desc_t *desc, size_t siz
DEQ_ITEM_INIT(desc);
DEQ_INSERT_TAIL(desc_list, desc);
}

// "Gut feel": 15 minute interval should prevent too must noise in the logs
static qd_duration_t monitor_interval = 15 * 60 * 1000;
static qd_timer_t *monitor_timer;

// timer callback to dump memory metrics to the log
//
static void on_monitor_timer(void *context)
{
ASSERT_PROACTOR_MODE(SYS_THREAD_PROACTOR_MODE_TIMER);

char log_msg[QD_LOG_TEXT_MAX];
char *begin = &log_msg[0];
char *end = &log_msg[QD_LOG_TEXT_MAX]; // past buffer ok see aprintf.h
const char *suffix = 0;

double msize = normalize_memory_size(qd_platform_memory_size(), &suffix);
int rc = aprintf(&begin, end, "ram:%.2f%s ", msize, suffix);
assert(rc == 0);

msize = normalize_memory_size(qd_router_virtual_memory_usage(), &suffix);
rc = aprintf(&begin, end, "vm:%.2f%s ", msize, suffix);
assert(rc == 0);

msize = normalize_memory_size(qd_router_rss_memory_usage(), &suffix);
rc = aprintf(&begin, end, "rss:%.2f%s ", msize, suffix);
assert(rc == 0);

msize = normalize_memory_size(qd_alloc_memory_usage(), &suffix);
rc = aprintf(&begin, end, "pool:%.2f%s ", msize, suffix);
assert(rc == 0);

const qd_alloc_type_desc_t *desc = DEQ_HEAD(desc_list);
while (desc) {
qd_alloc_stats_t stats = qd_alloc_desc_stats(desc);

#ifdef NDEBUG
// For debug builds report all for verification purposes (and test the log buffer overflow path!)
if (stats.total_alloc_from_heap == 0) { // ignore unused items
desc = DEQ_NEXT(desc);
continue;
}
#endif
// log format: series of space separated item entries. Each entry has the format:
// "<type-name>:<# in use by threads>:<# in global freepool>"
uint64_t total = stats.total_alloc_from_heap - stats.total_free_to_heap;
char *saved_begin = begin;
rc = aprintf(&begin, end, "%s:%" PRIu64 ":%" PRIu64" ",
desc->type_name, stats.held_by_threads, total - stats.held_by_threads);
if (rc < 0) // error?
break;
if (rc) { // overflowed
// Log what we have and reset the buffer. Unfortunately aprintf() will write the partial data to log_msg
// then move begin to the end. Trim the partial data by terminating log_msg at the end of the previous
// write.
*saved_begin = 0;
assert(log_msg[0] != 0);
qd_log(LOG_ROUTER, QD_LOG_INFO, "%s", log_msg);
begin = &log_msg[0];
end = &log_msg[QD_LOG_TEXT_MAX];
*begin = 0;
continue; // retry
}
desc = DEQ_NEXT(desc);
}

if (log_msg[0]) {
qd_log(LOG_ROUTER, QD_LOG_INFO, "%s", log_msg);
}

qd_timer_schedule(monitor_timer, monitor_interval);
}

void qd_alloc_start_monitor(struct qd_dispatch_t *qd)
{
// TODO: detect alloc_pool and yet initialized?? Needs to be called after alloc pool has been setup!

// Check for override
const char *interval_str = getenv("SKUPPER_ROUTER_ALLOC_MONITOR_SECS");
if (interval_str) {
unsigned int interval = 0;
int rc = sscanf(interval_str, "%u", &interval);
if (rc == 1) {
monitor_interval = 1000 * (qd_duration_t) interval;
qd_log(LOG_ROUTER, QD_LOG_DEBUG, "alloc_pool monitor interval overridden to %lu msecs",
(unsigned long) monitor_interval);
}
}

if (monitor_interval) {
monitor_timer = qd_timer(qd, on_monitor_timer, 0);
qd_timer_schedule(monitor_timer, monitor_interval);
}
}

void qd_alloc_stop_monitor(void)
{
if (monitor_timer)
qd_timer_free(monitor_timer);
}

2 changes: 0 additions & 2 deletions src/log_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,4 @@ void qd_log_initialize(void);
void qd_log_global_options(const char* format, bool utc);
void qd_log_finalize(void);
void qd_log_formatted_time(const struct timeval *time, char *buf, size_t buflen);

#define QD_LOG_TEXT_MAX 2048
#endif
5 changes: 0 additions & 5 deletions src/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,6 @@ typedef void (*buffer_process_t) (void *context, const unsigned char *base, int

void qd_message_initialize(void) {}

int qd_message_repr_len(void)
{
return qd_log_max_len();
}

/**
* Quote non-printable characters suitable for log messages. Output in buffer.
*/
Expand Down
6 changes: 2 additions & 4 deletions src/router_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -521,19 +521,17 @@ static void log_link_message(qd_connection_t *conn, pn_link_t *pn_link, qd_messa
if (qd_log_enabled(LOG_MESSAGE, QD_LOG_DEBUG)) {
const qd_server_config_t *cf = qd_connection_config(conn);
if (!cf) return;
size_t repr_len = qd_message_repr_len();
char *buf = qd_malloc(repr_len);
char buf[QD_LOG_TEXT_MAX];
const char *msg_str = qd_message_oversize(msg) ? "oversize message" :
qd_message_aborted(msg) ? "aborted message" :
qd_message_repr(msg, buf, repr_len, cf->log_bits);
qd_message_repr(msg, buf, sizeof(buf), cf->log_bits);
if (msg_str) {
const char *src = pn_terminus_get_address(pn_link_source(pn_link));
const char *tgt = pn_terminus_get_address(pn_link_target(pn_link));
qd_log(LOG_MESSAGE, QD_LOG_DEBUG, "[C%" PRIu64 "]: %s %s on link '%s' (%s -> %s)",
qd_connection_connection_id(conn), pn_link_is_sender(pn_link) ? "Sent" : "Received", msg_str,
pn_link_name(pn_link), src ? src : "", tgt ? tgt : "");
}
free(buf);
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,9 @@ void qd_server_run(qd_dispatch_t *qd)
#ifndef NDEBUG
qd_log(LOG_ROUTER, QD_LOG_INFO, "Running in DEBUG Mode");
#endif

qd_alloc_start_monitor(qd); // enable periodic alloc pool usage loggin

const int n = qd_server->thread_count;
sys_thread_t **threads = (sys_thread_t **)qd_calloc(n, sizeof(sys_thread_t*));
for (i = 0; i < n; i++) {
Expand All @@ -1524,6 +1527,8 @@ void qd_server_run(qd_dispatch_t *qd)
}
free(threads);

qd_alloc_stop_monitor();

qd_log(LOG_ROUTER, QD_LOG_INFO, "Shut Down");
}

Expand Down
3 changes: 1 addition & 2 deletions src/timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ static void timer_decref_LH(qd_timer_t *timer)
qd_timer_t *qd_timer(qd_dispatch_t *qd, qd_timer_cb_t cb, void* context)
{
qd_timer_t *timer = new_qd_timer_t();
if (!timer)
return 0;
assert(timer);

sys_cond_t cond;
sys_cond_init(&cond);
Expand Down
41 changes: 41 additions & 0 deletions tests/system_tests_tcp_adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io
import json
import os
import re
import socket
import subprocess
import time
Expand Down Expand Up @@ -505,6 +506,9 @@ def router(name, mode, connection, extra=None, ssl=False, encapsulation="legacy"
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))

# monitor router memory usage:
os.environ["SKUPPER_ROUTER_ALLOC_MONITOR_SECS"] = "1"

cls.routers = []
cls.test_ssl = test_ssl
cls.encapsulation = encap
Expand Down Expand Up @@ -1438,6 +1442,43 @@ def test_90_stats(self):
assert output["bytesIn"] == output["bytesOut"]
self.logger.log(tname + " SUCCESS")

@unittest.skipIf(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON)
def test_100_memory_metrics(self):
"""
Take advantage of the long running TCP test to verify that alloc_pool
metrics have been correctly written to the logs
"""
mem_re = re.compile(r' ram:[0-9]+\.[0-9]+[BKMGTi]+ vm:[0-9]+\.[0-9]+[BKMGTi]+ rss:[0-9]+\.[0-9]+[BKMGTi]+ pool:[0-9]+\.[0-9]+[BKMGTi]+')
action_re = re.compile(r' qdr_action_t:[0-9]+:[0-9]+')
for router in self.routers:
last_mem_match = None # match the start of the alloc log line
last_action_match = None # match the qdr_action_t entry in the log line
with open(router.logfile_path, 'rt') as log_file:
for line in log_file:
m = mem_re.search(line)
if m:
last_mem_match = m
m = action_re.search(line)
if m:
last_action_match = m
self.assertIsNotNone(last_mem_match, "failed to find alloc_pool output!")
self.assertIsNotNone(last_action_match, "failed to find qdr_action_t entry!")

# Sanity check that metrics are present:

# match = 'ram:62.49GiB vm:20.00TiB rss:58.88MiB pool:3.70KiB'
mems = last_mem_match.group().strip().split()
for mem in mems:
name, value = mem.split(':')
self.assertIn(name, ["ram", "vm", "rss", "pool"])
self.assertTrue(int(value.split('.')[0]) > 0,
f"Expected nonzero {name} counter!")
# match = ' qdr_action_t:192:0'
name, in_use, in_free = last_action_match.group().strip().split(':')
self.assertEqual(name, "qdr_action_t", f"Name mismatch {name}")
self.assertTrue(int(in_use) + int(in_free) > 0,
f"zero alloced? {in_use} {in_free}")


class TcpAdaptor(TcpAdaptorBase, CommonTcpTests):
@classmethod
Expand Down

0 comments on commit fd33391

Please sign in to comment.