From fd333919486e59f25adc3f2deeb03d310aa04430 Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Fri, 13 Oct 2023 10:44:49 -0400 Subject: [PATCH] Fixes #1259: New feature: periodic logging of alloc_pool usage --- include/qpid/dispatch/alloc_pool.h | 5 ++ include/qpid/dispatch/log.h | 4 +- include/qpid/dispatch/message.h | 6 +- src/alloc_pool.c | 121 +++++++++++++++++++++++++++++ src/log_private.h | 2 - src/message.c | 5 -- src/router_node.c | 6 +- src/server.c | 5 ++ src/timer.c | 3 +- tests/system_tests_tcp_adaptor.py | 41 ++++++++++ 10 files changed, 180 insertions(+), 18 deletions(-) diff --git a/include/qpid/dispatch/alloc_pool.h b/include/qpid/dispatch/alloc_pool.h index f30c3e3de..d17ccb024 100644 --- a/include/qpid/dispatch/alloc_pool.h +++ b/include/qpid/dispatch/alloc_pool.h @@ -153,4 +153,9 @@ void qd_alloc_initialize(void); void qd_alloc_debug_dump(const char *file); void qd_alloc_finalize(void); size_t qd_alloc_type_size(const qd_alloc_type_desc_t *desc); // thread safe + +// control the periodic logging of alloc pool utilization +typedef struct qd_dispatch_t qd_dispatch_t; +void qd_alloc_start_monitor(qd_dispatch_t *qd); +void qd_alloc_stop_monitor(void); #endif diff --git a/include/qpid/dispatch/log.h b/include/qpid/dispatch/log.h index b83ca9be7..8d9d7905f 100644 --- a/include/qpid/dispatch/log.h +++ b/include/qpid/dispatch/log.h @@ -113,8 +113,8 @@ void qd_vlog_impl(qd_log_module_t module, qd_log_level_t level, bool check_level qd_vlog_impl(module, level, true, __FILE__, __LINE__, fmt, ap); \ } while (0) -/** Maximum length for a log message */ -int qd_log_max_len(void); +/** Maximum length for a log message (including null terminator byte!) */ +#define QD_LOG_TEXT_MAX 2048 // note: keep this small to allow stack-based buffers void qd_format_string(char *buf, int buf_size, const char *fmt, ...) __attribute__((format(printf, 3, 4))); diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index c0e7d2b39..7cc13d3bb 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -482,12 +482,12 @@ int qd_message_stream_data_footer_append(qd_message_t *message, qd_buffer_list_t int qd_message_stream_data_append(qd_message_t *msg, qd_buffer_list_t *data, bool *q2_blocked); -/** Put string representation of a message suitable for logging in buffer. +/** Put string representation of a message suitable for logging in buffer. Note that log message text is limited to + * QD_LOG_TEXT_MAX bytes which includes the terminating null byte. + * * @return buffer */ char* qd_message_repr(qd_message_t *msg, char* buffer, size_t len, qd_log_bits log_message); -/** Recommended buffer length for qd_message_repr */ -int qd_message_repr_len(void); qd_log_source_t *qd_message_log_source(void); diff --git a/src/alloc_pool.c b/src/alloc_pool.c index 1ea9ef2cc..7892949d5 100644 --- a/src/alloc_pool.c +++ b/src/alloc_pool.c @@ -26,10 +26,13 @@ #include "entity_cache.h" #include "http.h" #include "qd_asan_interface.h" +#include "aprintf.h" #include "qpid/dispatch/alloc.h" #include "qpid/dispatch/ctools.h" #include "qpid/dispatch/log.h" +#include "qpid/dispatch/timer.h" +#include "qpid/dispatch/platform.h" #include "proton/version.h" @@ -697,6 +700,23 @@ size_t qd_alloc_type_size(const qd_alloc_type_desc_t *desc) return desc->total_size; } +uint64_t qd_alloc_memory_usage(void) +{ +#ifdef QD_MEMORY_DEBUG + assert(alloc_pool_ready); // need to all qd_alloc_initialize first! +#endif + + uint64_t total = 0; + for (qd_alloc_type_desc_t *desc = DEQ_HEAD(desc_list); desc; desc = DEQ_NEXT(desc)) { + sys_mutex_lock(&desc->lock); + if (desc->stats.total_alloc_from_heap) { + total += desc->stats.total_alloc_from_heap - desc->stats.total_free_to_heap; + } + sys_mutex_unlock(&desc->lock); + } + return total; +} + void qd_alloc_debug_dump(const char *file) { debug_dump = file ? strdup(file) : 0; } @@ -717,3 +737,104 @@ void qd_alloc_desc_init(const char *name, qd_alloc_type_desc_t *desc, size_t siz DEQ_ITEM_INIT(desc); DEQ_INSERT_TAIL(desc_list, desc); } + +// "Gut feel": 15 minute interval should prevent too must noise in the logs +static qd_duration_t monitor_interval = 15 * 60 * 1000; +static qd_timer_t *monitor_timer; + +// timer callback to dump memory metrics to the log +// +static void on_monitor_timer(void *context) +{ + ASSERT_PROACTOR_MODE(SYS_THREAD_PROACTOR_MODE_TIMER); + + char log_msg[QD_LOG_TEXT_MAX]; + char *begin = &log_msg[0]; + char *end = &log_msg[QD_LOG_TEXT_MAX]; // past buffer ok see aprintf.h + const char *suffix = 0; + + double msize = normalize_memory_size(qd_platform_memory_size(), &suffix); + int rc = aprintf(&begin, end, "ram:%.2f%s ", msize, suffix); + assert(rc == 0); + + msize = normalize_memory_size(qd_router_virtual_memory_usage(), &suffix); + rc = aprintf(&begin, end, "vm:%.2f%s ", msize, suffix); + assert(rc == 0); + + msize = normalize_memory_size(qd_router_rss_memory_usage(), &suffix); + rc = aprintf(&begin, end, "rss:%.2f%s ", msize, suffix); + assert(rc == 0); + + msize = normalize_memory_size(qd_alloc_memory_usage(), &suffix); + rc = aprintf(&begin, end, "pool:%.2f%s ", msize, suffix); + assert(rc == 0); + + const qd_alloc_type_desc_t *desc = DEQ_HEAD(desc_list); + while (desc) { + qd_alloc_stats_t stats = qd_alloc_desc_stats(desc); + +#ifdef NDEBUG + // For debug builds report all for verification purposes (and test the log buffer overflow path!) + if (stats.total_alloc_from_heap == 0) { // ignore unused items + desc = DEQ_NEXT(desc); + continue; + } +#endif + // log format: series of space separated item entries. Each entry has the format: + // ":<# in use by threads>:<# in global freepool>" + uint64_t total = stats.total_alloc_from_heap - stats.total_free_to_heap; + char *saved_begin = begin; + rc = aprintf(&begin, end, "%s:%" PRIu64 ":%" PRIu64" ", + desc->type_name, stats.held_by_threads, total - stats.held_by_threads); + if (rc < 0) // error? + break; + if (rc) { // overflowed + // Log what we have and reset the buffer. Unfortunately aprintf() will write the partial data to log_msg + // then move begin to the end. Trim the partial data by terminating log_msg at the end of the previous + // write. + *saved_begin = 0; + assert(log_msg[0] != 0); + qd_log(LOG_ROUTER, QD_LOG_INFO, "%s", log_msg); + begin = &log_msg[0]; + end = &log_msg[QD_LOG_TEXT_MAX]; + *begin = 0; + continue; // retry + } + desc = DEQ_NEXT(desc); + } + + if (log_msg[0]) { + qd_log(LOG_ROUTER, QD_LOG_INFO, "%s", log_msg); + } + + qd_timer_schedule(monitor_timer, monitor_interval); +} + +void qd_alloc_start_monitor(struct qd_dispatch_t *qd) +{ + // TODO: detect alloc_pool and yet initialized?? Needs to be called after alloc pool has been setup! + + // Check for override + const char *interval_str = getenv("SKUPPER_ROUTER_ALLOC_MONITOR_SECS"); + if (interval_str) { + unsigned int interval = 0; + int rc = sscanf(interval_str, "%u", &interval); + if (rc == 1) { + monitor_interval = 1000 * (qd_duration_t) interval; + qd_log(LOG_ROUTER, QD_LOG_DEBUG, "alloc_pool monitor interval overridden to %lu msecs", + (unsigned long) monitor_interval); + } + } + + if (monitor_interval) { + monitor_timer = qd_timer(qd, on_monitor_timer, 0); + qd_timer_schedule(monitor_timer, monitor_interval); + } +} + +void qd_alloc_stop_monitor(void) +{ + if (monitor_timer) + qd_timer_free(monitor_timer); +} + diff --git a/src/log_private.h b/src/log_private.h index 44590899b..6297185a8 100644 --- a/src/log_private.h +++ b/src/log_private.h @@ -27,6 +27,4 @@ void qd_log_initialize(void); void qd_log_global_options(const char* format, bool utc); void qd_log_finalize(void); void qd_log_formatted_time(const struct timeval *time, char *buf, size_t buflen); - -#define QD_LOG_TEXT_MAX 2048 #endif diff --git a/src/message.c b/src/message.c index 4634aab52..b2d1221ec 100644 --- a/src/message.c +++ b/src/message.c @@ -100,11 +100,6 @@ typedef void (*buffer_process_t) (void *context, const unsigned char *base, int void qd_message_initialize(void) {} -int qd_message_repr_len(void) -{ - return qd_log_max_len(); -} - /** * Quote non-printable characters suitable for log messages. Output in buffer. */ diff --git a/src/router_node.c b/src/router_node.c index aa9aa4684..3b15eb173 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -521,11 +521,10 @@ static void log_link_message(qd_connection_t *conn, pn_link_t *pn_link, qd_messa if (qd_log_enabled(LOG_MESSAGE, QD_LOG_DEBUG)) { const qd_server_config_t *cf = qd_connection_config(conn); if (!cf) return; - size_t repr_len = qd_message_repr_len(); - char *buf = qd_malloc(repr_len); + char buf[QD_LOG_TEXT_MAX]; const char *msg_str = qd_message_oversize(msg) ? "oversize message" : qd_message_aborted(msg) ? "aborted message" : - qd_message_repr(msg, buf, repr_len, cf->log_bits); + qd_message_repr(msg, buf, sizeof(buf), cf->log_bits); if (msg_str) { const char *src = pn_terminus_get_address(pn_link_source(pn_link)); const char *tgt = pn_terminus_get_address(pn_link_target(pn_link)); @@ -533,7 +532,6 @@ static void log_link_message(qd_connection_t *conn, pn_link_t *pn_link, qd_messa qd_connection_connection_id(conn), pn_link_is_sender(pn_link) ? "Sent" : "Received", msg_str, pn_link_name(pn_link), src ? src : "", tgt ? tgt : ""); } - free(buf); } } diff --git a/src/server.c b/src/server.c index 3fd6bb47f..050c63486 100644 --- a/src/server.c +++ b/src/server.c @@ -1512,6 +1512,9 @@ void qd_server_run(qd_dispatch_t *qd) #ifndef NDEBUG qd_log(LOG_ROUTER, QD_LOG_INFO, "Running in DEBUG Mode"); #endif + + qd_alloc_start_monitor(qd); // enable periodic alloc pool usage loggin + const int n = qd_server->thread_count; sys_thread_t **threads = (sys_thread_t **)qd_calloc(n, sizeof(sys_thread_t*)); for (i = 0; i < n; i++) { @@ -1524,6 +1527,8 @@ void qd_server_run(qd_dispatch_t *qd) } free(threads); + qd_alloc_stop_monitor(); + qd_log(LOG_ROUTER, QD_LOG_INFO, "Shut Down"); } diff --git a/src/timer.c b/src/timer.c index acb05ec9e..5bbd8ddda 100644 --- a/src/timer.c +++ b/src/timer.c @@ -152,8 +152,7 @@ static void timer_decref_LH(qd_timer_t *timer) qd_timer_t *qd_timer(qd_dispatch_t *qd, qd_timer_cb_t cb, void* context) { qd_timer_t *timer = new_qd_timer_t(); - if (!timer) - return 0; + assert(timer); sys_cond_t cond; sys_cond_init(&cond); diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index dbfd4e23d..83daec191 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -20,6 +20,7 @@ import io import json import os +import re import socket import subprocess import time @@ -505,6 +506,9 @@ def router(name, mode, connection, extra=None, ssl=False, encapsulation="legacy" config = Qdrouterd.Config(config) cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) + # monitor router memory usage: + os.environ["SKUPPER_ROUTER_ALLOC_MONITOR_SECS"] = "1" + cls.routers = [] cls.test_ssl = test_ssl cls.encapsulation = encap @@ -1438,6 +1442,43 @@ def test_90_stats(self): assert output["bytesIn"] == output["bytesOut"] self.logger.log(tname + " SUCCESS") + @unittest.skipIf(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON) + def test_100_memory_metrics(self): + """ + Take advantage of the long running TCP test to verify that alloc_pool + metrics have been correctly written to the logs + """ + mem_re = re.compile(r' ram:[0-9]+\.[0-9]+[BKMGTi]+ vm:[0-9]+\.[0-9]+[BKMGTi]+ rss:[0-9]+\.[0-9]+[BKMGTi]+ pool:[0-9]+\.[0-9]+[BKMGTi]+') + action_re = re.compile(r' qdr_action_t:[0-9]+:[0-9]+') + for router in self.routers: + last_mem_match = None # match the start of the alloc log line + last_action_match = None # match the qdr_action_t entry in the log line + with open(router.logfile_path, 'rt') as log_file: + for line in log_file: + m = mem_re.search(line) + if m: + last_mem_match = m + m = action_re.search(line) + if m: + last_action_match = m + self.assertIsNotNone(last_mem_match, "failed to find alloc_pool output!") + self.assertIsNotNone(last_action_match, "failed to find qdr_action_t entry!") + + # Sanity check that metrics are present: + + # match = 'ram:62.49GiB vm:20.00TiB rss:58.88MiB pool:3.70KiB' + mems = last_mem_match.group().strip().split() + for mem in mems: + name, value = mem.split(':') + self.assertIn(name, ["ram", "vm", "rss", "pool"]) + self.assertTrue(int(value.split('.')[0]) > 0, + f"Expected nonzero {name} counter!") + # match = ' qdr_action_t:192:0' + name, in_use, in_free = last_action_match.group().strip().split(':') + self.assertEqual(name, "qdr_action_t", f"Name mismatch {name}") + self.assertTrue(int(in_use) + int(in_free) > 0, + f"zero alloced? {in_use} {in_free}") + class TcpAdaptor(TcpAdaptorBase, CommonTcpTests): @classmethod