From 5178cca99a7b2e7d6f352240d50bcf5ae3b5e9fd Mon Sep 17 00:00:00 2001 From: Thiago Padilha Date: Fri, 22 Jul 2022 11:39:31 -0300 Subject: [PATCH] http_server/health: Implement throughput health check using ring buffer Use a ring buffer for storing samples as per Leonardo's suggestion. Signed-off-by: Thiago Padilha --- src/http_server/api/v1/health.c | 153 ++++++++++++++++---------------- 1 file changed, 75 insertions(+), 78 deletions(-) diff --git a/src/http_server/api/v1/health.c b/src/http_server/api/v1/health.c index 0c2b5016fa5..403ddf31153 100644 --- a/src/http_server/api/v1/health.c +++ b/src/http_server/api/v1/health.c @@ -37,17 +37,57 @@ struct flb_hs_throughput_sample { uint64_t in_records; uint64_t out_records; uint64_t timestamp_seconds; - struct mk_list _head; }; +/* ring buffer + helper functions for storing samples */ +struct flb_hs_throughput_samples { + struct flb_hs_throughput_sample *items; + int size; + int count; + int insert; +}; + +static struct flb_hs_throughput_sample *samples_add( + struct flb_hs_throughput_samples *samples) +{ + struct flb_hs_throughput_sample *sample = samples->items + samples->insert; + samples->insert = (samples->insert + 1) % samples->size; + if (samples->count < samples->size) { + samples->count++; + } + return sample; +} + +static int samples_translate_index( + struct flb_hs_throughput_samples *samples, int index) +{ + if (index >= samples->count || index < 0) { + return -1; + } + int end_index = samples->insert; + int start_index = end_index - samples->count; + int modulo = (start_index + index) % samples->size; + return modulo < 0 ? modulo + samples->size : modulo; +} + +static struct flb_hs_throughput_sample *samples_get( + struct flb_hs_throughput_samples *samples, int index) +{ + int real_index = samples_translate_index(samples, index); + if (real_index < 0) { + return NULL; + } + + return samples->items + samples_translate_index(samples, index); +} + struct { int enabled; struct mk_list *input_plugins; struct mk_list *output_plugins; double out_in_ratio_threshold; - int min_failures; - struct mk_list *sample_list; + struct flb_hs_throughput_samples samples; bool healthy; } throughput_check_state = {0}; @@ -305,78 +345,45 @@ static int cleanup_metrics() static int check_throughput_health(uint64_t in_records, uint64_t out_records, - struct mk_list *sample_list, - int sample_count, + struct flb_hs_throughput_samples *samples, double out_in_ratio_threshold) { + int i; struct flb_time tp; - uint64_t timestamp_seconds; uint64_t in_rate; uint64_t out_rate; - struct mk_list *tmp; - struct mk_list *head; double out_in_ratio; struct flb_hs_throughput_sample *entry; struct flb_hs_throughput_sample *prev; struct flb_hs_throughput_sample *sample; - struct flb_hs_throughput_sample *last_sample = NULL; - int count; bool healthy; bool rv; flb_time_get(&tp); - timestamp_seconds = flb_time_to_seconds(&tp); - - if (mk_list_is_empty(sample_list) != 0) { - last_sample = mk_list_entry_last(sample_list, - struct flb_hs_throughput_sample, - _head); - } - if (!last_sample || - in_records != last_sample->in_records || - out_records != last_sample->out_records) { - - sample = flb_malloc(sizeof(struct flb_hs_throughput_sample)); - - if (sample) { - sample->timestamp_seconds = timestamp_seconds; - sample->in_records = in_records; - sample->out_records = out_records; - mk_list_add(&sample->_head, sample_list); - } else { - flb_error("[api/v1/health/throughput]: failed to allocate sample"); - } - - } else { - /* don't collect another sample unless either in_records or out_records have - * changed since last check */ - flb_debug("[api/v1/health/throughput]: no changes since last check"); - } + sample = samples_add(samples); + sample->timestamp_seconds = flb_time_to_seconds(&tp); + sample->in_records = in_records; + sample->out_records = out_records; flb_debug("[api/v1/health/throughput]: check samples start %d %f", - sample_count, + samples->size, out_in_ratio_threshold); healthy = false; - mk_list_foreach_safe_r(head, tmp, sample_list) { - entry = mk_list_entry(head, struct flb_hs_throughput_sample, _head); - if (entry == mk_list_entry_first(sample_list, - struct flb_hs_throughput_sample, - _head)) { - break; + for (i = samples->count - 1; i > 0; i--) { + entry = samples_get(samples, i); + prev = samples_get(samples, i - 1); + uint64_t timestamp_delta = entry->timestamp_seconds - prev->timestamp_seconds; + if (timestamp_delta == 0) { + /* check against divide by zero */ + continue; } - - prev = mk_list_entry(entry->_head.prev, - struct flb_hs_throughput_sample, - _head); - in_rate = (entry->in_records - prev->in_records) / - (entry->timestamp_seconds - prev->timestamp_seconds); - out_rate = (entry->out_records - prev->out_records) / - (entry->timestamp_seconds - prev->timestamp_seconds); + in_rate = (entry->in_records - prev->in_records) / timestamp_delta; + out_rate = (entry->out_records - prev->out_records) / timestamp_delta; out_in_ratio = (double)out_rate / (double)in_rate; healthy = healthy || out_in_ratio > out_in_ratio_threshold; - flb_debug("[api/v1/health/throughput]: out: %"PRIu64" in: %"PRIu64" ratio: %f\n", + flb_debug("[api/v1/health/throughput]: out: %"PRIu64" in: %"PRIu64" ratio: %f", out_in_ratio, out_rate, in_rate); @@ -386,19 +393,7 @@ static int check_throughput_health(uint64_t in_records, } } - count = 0; - mk_list_foreach_safe_r(head, tmp, sample_list) { - entry = mk_list_entry(head, struct flb_hs_throughput_sample, _head); - if (count == sample_count) { - mk_list_del(&entry->_head); - flb_free(entry); - } - else { - count++; - } - } - - rv = count < sample_count || healthy; + rv = samples->count < samples->size || healthy; flb_debug("checking throughput samples stop, result: %s", rv ? "healthy" :"unhealthy"); @@ -463,8 +458,7 @@ static void cb_mq_health(mk_mq_t *queue, void *data, size_t size) throughput_check_state.healthy = check_throughput_health(input_records, output_records, - throughput_check_state.sample_list, - throughput_check_state.min_failures, + &throughput_check_state.samples, throughput_check_state.out_in_ratio_threshold); } @@ -498,6 +492,7 @@ static void configure_throughput_check(struct flb_config *config) { bool enabled = config->hc_throughput; + memset(&throughput_check_state, 0, sizeof(throughput_check_state)); throughput_check_state.enabled = false; throughput_check_state.healthy = true; @@ -522,18 +517,10 @@ static void configure_throughput_check(struct flb_config *config) return; } - throughput_check_state.sample_list = flb_malloc(sizeof(struct mk_list)); - if (!throughput_check_state.sample_list) { - flb_errno(); - return; - } - mk_list_init(throughput_check_state.sample_list); - throughput_check_state.input_plugins = flb_utils_split(config->hc_throughput_input_plugins, ',', 0); if (!throughput_check_state.input_plugins) { - flb_free(throughput_check_state.sample_list); flb_errno(); return; } @@ -542,16 +529,26 @@ static void configure_throughput_check(struct flb_config *config) flb_utils_split(config->hc_throughput_output_plugins, ',', 0); if (!throughput_check_state.output_plugins) { - flb_free(throughput_check_state.sample_list); flb_free(throughput_check_state.input_plugins); flb_errno(); return; } throughput_check_state.out_in_ratio_threshold = config->hc_throughput_ratio_threshold; - throughput_check_state.min_failures = config->hc_throughput_min_failures; throughput_check_state.enabled = true; + throughput_check_state.samples.items = flb_calloc( + config->hc_throughput_min_failures, + sizeof(struct flb_hs_throughput_sample)); + + if (!throughput_check_state.samples.items) { + flb_free(throughput_check_state.input_plugins); + flb_free(throughput_check_state.output_plugins); + flb_errno(); + return; + } + throughput_check_state.samples.size = config->hc_throughput_min_failures; + flb_info("[api/v1/health/throughput]: configuration complete. " "input plugins: %s | " "output plugins: %s | "