Skip to content

Commit

Permalink
http_server/health: Implement throughput health check using ring buffer
Browse files Browse the repository at this point in the history
Use a ring buffer for storing samples as per Leonardo's suggestion.

Signed-off-by: Thiago Padilha <[email protected]>
  • Loading branch information
tchrono committed Jul 22, 2022
1 parent 2240019 commit 5178cca
Showing 1 changed file with 75 additions and 78 deletions.
153 changes: 75 additions & 78 deletions src/http_server/api/v1/health.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,57 @@ struct flb_hs_throughput_sample {
uint64_t in_records;
uint64_t out_records;
uint64_t timestamp_seconds;
struct mk_list _head;
};

/* ring buffer + helper functions for storing samples */
struct flb_hs_throughput_samples {
struct flb_hs_throughput_sample *items;
int size;
int count;
int insert;
};

static struct flb_hs_throughput_sample *samples_add(
struct flb_hs_throughput_samples *samples)
{
struct flb_hs_throughput_sample *sample = samples->items + samples->insert;
samples->insert = (samples->insert + 1) % samples->size;
if (samples->count < samples->size) {
samples->count++;
}
return sample;
}

static int samples_translate_index(
struct flb_hs_throughput_samples *samples, int index)
{
if (index >= samples->count || index < 0) {
return -1;
}
int end_index = samples->insert;
int start_index = end_index - samples->count;
int modulo = (start_index + index) % samples->size;
return modulo < 0 ? modulo + samples->size : modulo;
}

static struct flb_hs_throughput_sample *samples_get(
struct flb_hs_throughput_samples *samples, int index)
{
int real_index = samples_translate_index(samples, index);
if (real_index < 0) {
return NULL;
}

return samples->items + samples_translate_index(samples, index);
}

struct {
int enabled;
struct mk_list *input_plugins;
struct mk_list *output_plugins;
double out_in_ratio_threshold;
int min_failures;

struct mk_list *sample_list;
struct flb_hs_throughput_samples samples;
bool healthy;
} throughput_check_state = {0};

Expand Down Expand Up @@ -305,78 +345,45 @@ static int cleanup_metrics()

static int check_throughput_health(uint64_t in_records,
uint64_t out_records,
struct mk_list *sample_list,
int sample_count,
struct flb_hs_throughput_samples *samples,
double out_in_ratio_threshold) {
int i;
struct flb_time tp;
uint64_t timestamp_seconds;
uint64_t in_rate;
uint64_t out_rate;
struct mk_list *tmp;
struct mk_list *head;
double out_in_ratio;
struct flb_hs_throughput_sample *entry;
struct flb_hs_throughput_sample *prev;
struct flb_hs_throughput_sample *sample;
struct flb_hs_throughput_sample *last_sample = NULL;
int count;
bool healthy;
bool rv;

flb_time_get(&tp);
timestamp_seconds = flb_time_to_seconds(&tp);

if (mk_list_is_empty(sample_list) != 0) {
last_sample = mk_list_entry_last(sample_list,
struct flb_hs_throughput_sample,
_head);
}

if (!last_sample ||
in_records != last_sample->in_records ||
out_records != last_sample->out_records) {

sample = flb_malloc(sizeof(struct flb_hs_throughput_sample));

if (sample) {
sample->timestamp_seconds = timestamp_seconds;
sample->in_records = in_records;
sample->out_records = out_records;
mk_list_add(&sample->_head, sample_list);
} else {
flb_error("[api/v1/health/throughput]: failed to allocate sample");
}

} else {
/* don't collect another sample unless either in_records or out_records have
* changed since last check */
flb_debug("[api/v1/health/throughput]: no changes since last check");
}
sample = samples_add(samples);
sample->timestamp_seconds = flb_time_to_seconds(&tp);
sample->in_records = in_records;
sample->out_records = out_records;

flb_debug("[api/v1/health/throughput]: check samples start %d %f",
sample_count,
samples->size,
out_in_ratio_threshold);

healthy = false;
mk_list_foreach_safe_r(head, tmp, sample_list) {
entry = mk_list_entry(head, struct flb_hs_throughput_sample, _head);
if (entry == mk_list_entry_first(sample_list,
struct flb_hs_throughput_sample,
_head)) {
break;
for (i = samples->count - 1; i > 0; i--) {
entry = samples_get(samples, i);
prev = samples_get(samples, i - 1);
uint64_t timestamp_delta = entry->timestamp_seconds - prev->timestamp_seconds;
if (timestamp_delta == 0) {
/* check against divide by zero */
continue;
}

prev = mk_list_entry(entry->_head.prev,
struct flb_hs_throughput_sample,
_head);
in_rate = (entry->in_records - prev->in_records) /
(entry->timestamp_seconds - prev->timestamp_seconds);
out_rate = (entry->out_records - prev->out_records) /
(entry->timestamp_seconds - prev->timestamp_seconds);
in_rate = (entry->in_records - prev->in_records) / timestamp_delta;
out_rate = (entry->out_records - prev->out_records) / timestamp_delta;
out_in_ratio = (double)out_rate / (double)in_rate;
healthy = healthy || out_in_ratio > out_in_ratio_threshold;

flb_debug("[api/v1/health/throughput]: out: %"PRIu64" in: %"PRIu64" ratio: %f\n",
flb_debug("[api/v1/health/throughput]: out: %"PRIu64" in: %"PRIu64" ratio: %f",
out_in_ratio,
out_rate,
in_rate);
Expand All @@ -386,19 +393,7 @@ static int check_throughput_health(uint64_t in_records,
}
}

count = 0;
mk_list_foreach_safe_r(head, tmp, sample_list) {
entry = mk_list_entry(head, struct flb_hs_throughput_sample, _head);
if (count == sample_count) {
mk_list_del(&entry->_head);
flb_free(entry);
}
else {
count++;
}
}

rv = count < sample_count || healthy;
rv = samples->count < samples->size || healthy;
flb_debug("checking throughput samples stop, result: %s",
rv ? "healthy" :"unhealthy");

Expand Down Expand Up @@ -463,8 +458,7 @@ static void cb_mq_health(mk_mq_t *queue, void *data, size_t size)
throughput_check_state.healthy =
check_throughput_health(input_records,
output_records,
throughput_check_state.sample_list,
throughput_check_state.min_failures,
&throughput_check_state.samples,
throughput_check_state.out_in_ratio_threshold);
}

Expand Down Expand Up @@ -498,6 +492,7 @@ static void configure_throughput_check(struct flb_config *config)
{
bool enabled = config->hc_throughput;

memset(&throughput_check_state, 0, sizeof(throughput_check_state));
throughput_check_state.enabled = false;
throughput_check_state.healthy = true;

Expand All @@ -522,18 +517,10 @@ static void configure_throughput_check(struct flb_config *config)
return;
}

throughput_check_state.sample_list = flb_malloc(sizeof(struct mk_list));
if (!throughput_check_state.sample_list) {
flb_errno();
return;
}
mk_list_init(throughput_check_state.sample_list);

throughput_check_state.input_plugins =
flb_utils_split(config->hc_throughput_input_plugins, ',', 0);

if (!throughput_check_state.input_plugins) {
flb_free(throughput_check_state.sample_list);
flb_errno();
return;
}
Expand All @@ -542,16 +529,26 @@ static void configure_throughput_check(struct flb_config *config)
flb_utils_split(config->hc_throughput_output_plugins, ',', 0);

if (!throughput_check_state.output_plugins) {
flb_free(throughput_check_state.sample_list);
flb_free(throughput_check_state.input_plugins);
flb_errno();
return;
}

throughput_check_state.out_in_ratio_threshold = config->hc_throughput_ratio_threshold;
throughput_check_state.min_failures = config->hc_throughput_min_failures;
throughput_check_state.enabled = true;

throughput_check_state.samples.items = flb_calloc(
config->hc_throughput_min_failures,
sizeof(struct flb_hs_throughput_sample));

if (!throughput_check_state.samples.items) {
flb_free(throughput_check_state.input_plugins);
flb_free(throughput_check_state.output_plugins);
flb_errno();
return;
}
throughput_check_state.samples.size = config->hc_throughput_min_failures;

flb_info("[api/v1/health/throughput]: configuration complete. "
"input plugins: %s | "
"output plugins: %s | "
Expand Down

0 comments on commit 5178cca

Please sign in to comment.