Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Oct 13, 2023
1 parent 3d81fb8 commit 546d097
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 18 deletions.
5 changes: 5 additions & 0 deletions include/qpid/dispatch/alloc_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,9 @@ void qd_alloc_initialize(void);
void qd_alloc_debug_dump(const char *file);
void qd_alloc_finalize(void);
size_t qd_alloc_type_size(const qd_alloc_type_desc_t *desc); // thread safe

// control the periodic logging of alloc pool utilization
typedef struct qd_dispatch_t qd_dispatch_t;
void qd_alloc_start_monitor(qd_dispatch_t *qd);
void qd_alloc_stop_monitor(void);
#endif
4 changes: 2 additions & 2 deletions include/qpid/dispatch/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ void qd_vlog_impl(qd_log_module_t module, qd_log_level_t level, bool check_level
qd_vlog_impl(module, level, true, __FILE__, __LINE__, fmt, ap); \
} while (0)

/** Maximum length for a log message */
int qd_log_max_len(void);
/** Maximum length for a log message (including null terminator byte!) */
#define QD_LOG_TEXT_MAX 2048 // note: keep this small to allow stack-based buffers

void qd_format_string(char *buf, int buf_size, const char *fmt, ...) __attribute__((format(printf, 3, 4)));

Expand Down
6 changes: 3 additions & 3 deletions include/qpid/dispatch/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -482,12 +482,12 @@ int qd_message_stream_data_footer_append(qd_message_t *message, qd_buffer_list_t
int qd_message_stream_data_append(qd_message_t *msg, qd_buffer_list_t *data, bool *q2_blocked);


/** Put string representation of a message suitable for logging in buffer.
/** Put string representation of a message suitable for logging in buffer. Note that log message text is limited to
* QD_LOG_TEXT_MAX bytes which includes the terminating null byte.
*
* @return buffer
*/
char* qd_message_repr(qd_message_t *msg, char* buffer, size_t len, qd_log_bits log_message);
/** Recommended buffer length for qd_message_repr */
int qd_message_repr_len(void);

qd_log_source_t *qd_message_log_source(void);

Expand Down
81 changes: 81 additions & 0 deletions src/alloc_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
#include "entity_cache.h"
#include "http.h"
#include "qd_asan_interface.h"
#include "aprintf.h"

#include "qpid/dispatch/alloc.h"
#include "qpid/dispatch/ctools.h"
#include "qpid/dispatch/log.h"
#include "qpid/dispatch/timer.h"

#include "proton/version.h"

Expand Down Expand Up @@ -717,3 +719,82 @@ void qd_alloc_desc_init(const char *name, qd_alloc_type_desc_t *desc, size_t siz
DEQ_ITEM_INIT(desc);
DEQ_INSERT_TAIL(desc_list, desc);
}

// "Gut feel": 15 minute interval should prevent too must noise in the logs
static qd_duration_t monitor_interval = 15 * 60 * 1000;
static qd_timer_t *monitor_timer;
static void on_monitor_timer(void *context)
{
ASSERT_PROACTOR_MODE(SYS_THREAD_PROACTOR_MODE_TIMER);

char log_msg[QD_LOG_TEXT_MAX];
const qd_alloc_type_desc_t *desc = DEQ_HEAD(desc_list);

log_msg[0] = 0;
char *begin = &log_msg[0];
char *end = &log_msg[QD_LOG_TEXT_MAX]; // past buffer ok see aprintf.h
while (desc) {
qd_alloc_stats_t stats = qd_alloc_desc_stats(desc);
if (stats.total_alloc_from_heap == 0) { // ignore unused items
desc = DEQ_NEXT(desc);
continue;
}
// log format: series of space separated item entries. Each entry has the format:
// "<type-name>:<# in use by threads>:<# not in use>"
uint64_t total = stats.total_alloc_from_heap - stats.total_free_to_heap;
char *saved_begin = begin;
int rc = aprintf(&begin, end, "%s:%" PRIu64 ":%" PRIu64" ",
desc->type_name,
stats.held_by_threads, total - stats.held_by_threads);
if (rc < 0) // error?
break;
if (rc) { // overflowed
// Log what we have and reset the buffer. Unfortunately aprintf() will write the partial data to log_msg
// then move begin to the end. Trim the partial data by terminating log_msg at the end of the previous
// write.
*saved_begin = 0;
assert(log_msg[0] != 0);
qd_log(LOG_ROUTER, QD_LOG_INFO, "%s", log_msg);
begin = &log_msg[0];
end = &log_msg[QD_LOG_TEXT_MAX];
*begin = 0;
continue; // retry
}
desc = DEQ_NEXT(desc);
}

if (log_msg[0]) {
qd_log(LOG_ROUTER, QD_LOG_INFO, "%s", log_msg);
}

qd_timer_schedule(monitor_timer, monitor_interval);
}

void qd_alloc_start_monitor(struct qd_dispatch_t *qd)
{
// TODO: detect alloc_pool and yet initialized?? Needs to be called after alloc pool has been setup!

// Check for override
const char *interval_str = getenv("SKUPPER_ROUTER_ALLOC_MONITOR_SECS");
if (interval_str) {
unsigned int interval = 0;
int rc = sscanf(interval_str, "%u", &interval);
if (rc == 1) {
monitor_interval = 1000 * (qd_duration_t) interval;
qd_log(LOG_ROUTER, QD_LOG_DEBUG, "alloc_pool monitor interval overridden to %lu msecs",
(unsigned long) monitor_interval);
}
}

if (monitor_interval) {
monitor_timer = qd_timer(qd, on_monitor_timer, 0);
qd_timer_schedule(monitor_timer, monitor_interval);
}
}

void qd_alloc_stop_monitor(void)
{
if (monitor_timer)
qd_timer_free(monitor_timer);
}

2 changes: 0 additions & 2 deletions src/log_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,4 @@ void qd_log_initialize(void);
void qd_log_global_options(const char* format, bool utc);
void qd_log_finalize(void);
void qd_log_formatted_time(const struct timeval *time, char *buf, size_t buflen);

#define QD_LOG_TEXT_MAX 2048
#endif
5 changes: 0 additions & 5 deletions src/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,6 @@ typedef void (*buffer_process_t) (void *context, const unsigned char *base, int

void qd_message_initialize(void) {}

int qd_message_repr_len(void)
{
return qd_log_max_len();
}

/**
* Quote non-printable characters suitable for log messages. Output in buffer.
*/
Expand Down
6 changes: 2 additions & 4 deletions src/router_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -521,19 +521,17 @@ static void log_link_message(qd_connection_t *conn, pn_link_t *pn_link, qd_messa
if (qd_log_enabled(LOG_MESSAGE, QD_LOG_DEBUG)) {
const qd_server_config_t *cf = qd_connection_config(conn);
if (!cf) return;
size_t repr_len = qd_message_repr_len();
char *buf = qd_malloc(repr_len);
char buf[QD_LOG_TEXT_MAX];
const char *msg_str = qd_message_oversize(msg) ? "oversize message" :
qd_message_aborted(msg) ? "aborted message" :
qd_message_repr(msg, buf, repr_len, cf->log_bits);
qd_message_repr(msg, buf, sizeof(buf), cf->log_bits);
if (msg_str) {
const char *src = pn_terminus_get_address(pn_link_source(pn_link));
const char *tgt = pn_terminus_get_address(pn_link_target(pn_link));
qd_log(LOG_MESSAGE, QD_LOG_DEBUG, "[C%" PRIu64 "]: %s %s on link '%s' (%s -> %s)",
qd_connection_connection_id(conn), pn_link_is_sender(pn_link) ? "Sent" : "Received", msg_str,
pn_link_name(pn_link), src ? src : "", tgt ? tgt : "");
}
free(buf);
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,9 @@ void qd_server_run(qd_dispatch_t *qd)
#ifndef NDEBUG
qd_log(LOG_ROUTER, QD_LOG_INFO, "Running in DEBUG Mode");
#endif

qd_alloc_start_monitor(qd); // enable periodic alloc pool usage loggin

const int n = qd_server->thread_count;
sys_thread_t **threads = (sys_thread_t **)qd_calloc(n, sizeof(sys_thread_t*));
for (i = 0; i < n; i++) {
Expand All @@ -1524,6 +1527,8 @@ void qd_server_run(qd_dispatch_t *qd)
}
free(threads);

qd_alloc_stop_monitor();

qd_log(LOG_ROUTER, QD_LOG_INFO, "Shut Down");
}

Expand Down
3 changes: 1 addition & 2 deletions src/timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ static void timer_decref_LH(qd_timer_t *timer)
qd_timer_t *qd_timer(qd_dispatch_t *qd, qd_timer_cb_t cb, void* context)
{
qd_timer_t *timer = new_qd_timer_t();
if (!timer)
return 0;
assert(timer);

sys_cond_t cond;
sys_cond_init(&cond);
Expand Down

0 comments on commit 546d097

Please sign in to comment.