Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

processor: added limited concurrency support #7448

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions include/fluent-bit/flb_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@
#define FLB_PROCESSOR_UNIT_NATIVE 0
#define FLB_PROCESSOR_UNIT_FILTER 1


/* The current values mean the processor stack will
* wait for 2 seconds at most in 50 millisecond increments
* for each processor unit.
*
* This is the worst case scenario and in reality there will
* be no wait in 99.9% of the cases.
*/
#define FLB_PROCESSOR_LOCK_RETRY_LIMIT 40
#define FLB_PROCESSOR_LOCK_RETRY_DELAY 50000

/* These forward definitions are necessary in order to avoid
* inclussion conflicts.
*/
Expand All @@ -63,6 +74,16 @@ struct flb_processor_unit {
*/
void *ctx;

/* This lock is meant to cover the case where two output plugin
* worker threads flb_output_flush_create calls overlap which
* could cause flb_processor_run to be invoked by both of them
* at the same time with the same context.
*
* This could cause certain non thread aware filters such as
* filter_lua to modify internal structures leading to corruption
* and crashes.
*/
pthread_mutex_t lock;
/*
* pipeline filters needs to be linked somewhere since the destroy
* function will do the mk_list_del(). To avoid corruptions we link
Expand Down
129 changes: 128 additions & 1 deletion src/flb_processor.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,72 @@
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_log_event_encoder.h>

static int acquire_lock(pthread_mutex_t *lock,
size_t retry_limit,
size_t retry_delay)
{
size_t retry_count;
int result;

retry_count = 0;

do {
result = pthread_mutex_lock(lock);

if (result != 0) {
if (result == EAGAIN) {
retry_count++;

usleep(retry_delay);
}
else {
break;
}
}
}
while (result != 0 &&
retry_count < retry_limit);

if (result != 0) {
return FLB_FALSE;
}

return FLB_TRUE;
}

static int release_lock(pthread_mutex_t *lock,
size_t retry_limit,
size_t retry_delay)
{
size_t retry_count;
int result;

retry_count = 0;

do {
result = pthread_mutex_unlock(lock);

if (result != 0) {
if (result == EAGAIN) {
retry_count++;

usleep(retry_delay);
}
else {
break;
}
}
}
while (result != 0 &&
retry_count < retry_limit);

if (result != 0) {
return FLB_FALSE;
}

return FLB_TRUE;
}

/*
* A processor creates a chain of processing units for different telemetry data
* types such as logs, metrics and traces.
Expand Down Expand Up @@ -65,6 +131,7 @@ struct flb_processor_unit *flb_processor_unit_create(struct flb_processor *proc,
int event_type,
char *unit_name)
{
int result;
struct mk_list *head;
int filter_event_type;
struct flb_filter_plugin *f = NULL;
Expand Down Expand Up @@ -111,11 +178,21 @@ struct flb_processor_unit *flb_processor_unit_create(struct flb_processor *proc,
}
mk_list_init(&pu->unused_list);

result = pthread_mutex_init(&pu->lock, NULL);

if (result != 0) {
flb_sds_destroy(pu->name);
flb_free(pu);

return NULL;
}

/* If we matched a pipeline filter, create the speacial processing unit for it */
if (f) {
/* create an instance of the filter */
f_ins = flb_filter_new(config, unit_name, NULL);
if (!f_ins) {
pthread_mutex_destroy(&pu->lock);
flb_sds_destroy(pu->name);
flb_free(pu);

Expand All @@ -124,11 +201,20 @@ struct flb_processor_unit *flb_processor_unit_create(struct flb_processor *proc,
/* matching rule: just set to workaround the pipeline initializer */
f_ins->match = flb_sds_create("*");

if (f_ins->match == NULL) {
flb_filter_instance_destroy(f_ins);

pthread_mutex_destroy(&pu->lock);
flb_sds_destroy(pu->name);
flb_free(pu);

return NULL;
}

/* unit type and context */
pu->unit_type = FLB_PROCESSOR_UNIT_FILTER;
pu->ctx = f_ins;


/*
* The filter was added to the linked list config->filters, since this filter
* won't run as part of the normal pipeline, we just unlink the node.
Expand All @@ -147,6 +233,8 @@ struct flb_processor_unit *flb_processor_unit_create(struct flb_processor *proc,
if (processor_instance == NULL) {
flb_error("[processor] error creating native processor instance %s", pu->name);

pthread_mutex_destroy(&pu->lock);
flb_sds_destroy(pu->name);
flb_free(pu);

return NULL;
Expand Down Expand Up @@ -199,6 +287,8 @@ void flb_processor_unit_destroy(struct flb_processor_unit *pu)
(struct flb_processor_instance *) pu->ctx);
}

pthread_mutex_destroy(&pu->lock);

flb_sds_destroy(pu->name);
flb_free(pu);
}
Expand Down Expand Up @@ -304,6 +394,7 @@ int flb_processor_run(struct flb_processor *proc,
size_t tmp_size;
int decoder_result;
struct mk_list *head;
size_t lock_retry_count;
struct mk_list *list = NULL;
struct flb_log_event log_event;
struct flb_processor_unit *pu;
Expand Down Expand Up @@ -331,6 +422,14 @@ int flb_processor_run(struct flb_processor *proc,
tmp_buf = NULL;
tmp_size = 0;

ret = acquire_lock(&pu->lock,
FLB_PROCESSOR_LOCK_RETRY_LIMIT,
FLB_PROCESSOR_LOCK_RETRY_DELAY);

if (ret != FLB_TRUE) {
return -1;
}

/* run the unit */
if (pu->unit_type == FLB_PROCESSOR_UNIT_FILTER) {
/* get the filter context */
Expand Down Expand Up @@ -370,6 +469,10 @@ int flb_processor_run(struct flb_processor *proc,
*out_buf = NULL;
*out_size = 0;

release_lock(&pu->lock,
FLB_PROCESSOR_LOCK_RETRY_LIMIT,
FLB_PROCESSOR_LOCK_RETRY_DELAY);

return 0;
}

Expand Down Expand Up @@ -402,6 +505,10 @@ int flb_processor_run(struct flb_processor *proc,
flb_free(cur_buf);
}

release_lock(&pu->lock,
FLB_PROCESSOR_LOCK_RETRY_LIMIT,
FLB_PROCESSOR_LOCK_RETRY_DELAY);

return -1;
}

Expand Down Expand Up @@ -431,6 +538,10 @@ int flb_processor_run(struct flb_processor *proc,
if (ret != FLB_PROCESSOR_SUCCESS) {
flb_log_event_encoder_reset(p_ins->log_encoder);

release_lock(&pu->lock,
FLB_PROCESSOR_LOCK_RETRY_LIMIT,
FLB_PROCESSOR_LOCK_RETRY_DELAY);

return -1;
}

Expand All @@ -440,6 +551,10 @@ int flb_processor_run(struct flb_processor *proc,
*out_buf = NULL;
*out_size = 0;

release_lock(&pu->lock,
FLB_PROCESSOR_LOCK_RETRY_LIMIT,
FLB_PROCESSOR_LOCK_RETRY_DELAY);

return 0;
}

Expand All @@ -460,6 +575,10 @@ int flb_processor_run(struct flb_processor *proc,
tag_len);

if (ret != FLB_PROCESSOR_SUCCESS) {
release_lock(&pu->lock,
FLB_PROCESSOR_LOCK_RETRY_LIMIT,
FLB_PROCESSOR_LOCK_RETRY_DELAY);

return -1;
}
}
Expand All @@ -472,11 +591,19 @@ int flb_processor_run(struct flb_processor *proc,
tag_len);

if (ret != FLB_PROCESSOR_SUCCESS) {
release_lock(&pu->lock,
FLB_PROCESSOR_LOCK_RETRY_LIMIT,
FLB_PROCESSOR_LOCK_RETRY_DELAY);

return -1;
}
}
}
}

release_lock(&pu->lock,
FLB_PROCESSOR_LOCK_RETRY_LIMIT,
FLB_PROCESSOR_LOCK_RETRY_DELAY);
}

/* set output buffer */
Expand Down