diff --git a/include/fluent-bit/flb_async_timer.h b/include/fluent-bit/flb_async_timer.h index 16ad39723f2..81cb2374773 100644 --- a/include/fluent-bit/flb_async_timer.h +++ b/include/fluent-bit/flb_async_timer.h @@ -29,7 +29,6 @@ #endif #include -#include #include #include diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index 21ab05b3637..ce9f1a0c015 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -43,7 +43,6 @@ #include #include #include -#include #include #include #include @@ -77,6 +76,10 @@ struct flb_output_flush; +struct flb_out_thread_instance; +int flb_output_thread_pool_coros_size(struct flb_output_instance *ins); +struct flb_out_thread_instance *flb_output_thread_instance_get(); + /* * Tests callbacks * =============== diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index fabb39d854d..7134396055e 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -583,6 +583,7 @@ static int cb_s3_init(struct flb_output_instance *ins, pthread_mutex_init(&ctx->upload_queue_mutex, NULL); pthread_mutex_init(&ctx->cb_flush_mutex, NULL); + pthread_mutex_init(&ctx->create_timer_mutex, NULL); /* Export context */ flb_output_set_context(ins, ctx); @@ -593,6 +594,19 @@ static int cb_s3_init(struct flb_output_instance *ins, return -1; } + // alloc here doesn't work somehow?? + if (ctx->ins->is_threaded == FLB_TRUE && ctx->ins->tp_workers > 0) { + ctx->thread_instances = flb_calloc(1, + sizeof(struct flb_out_thread_instance *) + * ctx->ins->tp_workers); // check that its not zero + if (!ctx->thread_instances) { + flb_errno(); + return -1; + } + } else { + ctx->thread_instances = NULL; + } + /* the check against -1 is works here because size_t is unsigned * and (int) -1 == unsigned max value * Fluent Bit uses -1 (which becomes max value) to indicate undefined @@ -958,7 +972,7 @@ static int cb_s3_init(struct flb_output_instance *ins, ctx->provider->provider_vtable->sync(ctx->provider); ctx->provider->provider_vtable->init(ctx->provider); - ctx->timer_created = FLB_FALSE; + ctx->timers_created = 0; ctx->timer_ms = (int) (ctx->upload_timeout / 6) * 1000; if (ctx->timer_ms > UPLOAD_TIMER_MAX_WAIT) { ctx->timer_ms = UPLOAD_TIMER_MAX_WAIT; @@ -1984,25 +1998,66 @@ static void async_timer_cb(struct flb_config *config, void *data) cb_s3_upload(config, ctx); } -static void s3_flush_init(struct flb_config *config, struct flb_s3 *ctx) +static void create_timer_on_thread(struct flb_config *config, struct flb_s3 *ctx) { struct flb_sched *sched; + struct flb_out_thread_instance *th_ins; + struct flb_out_thread_instance *current_th_ins; + int i; int ret; + pthread_mutex_lock(&ctx->create_timer_mutex); + sched = flb_sched_ctx_get(); + ret = flb_sched_out_async_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, + ctx->timer_ms, ctx->ins, + S3_UPLOAD_JOB_NAME, async_timer_cb, + ctx, NULL); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to create upload timer"); + pthread_mutex_unlock(&ctx->create_timer_mutex); + return; + } + + ctx->timers_created++; + /* Save the worker thread pointer in the list */ + if (ctx->ins->is_threaded == FLB_TRUE) { + current_th_ins = flb_output_thread_instance_get(); + for (i = 0; i < ctx->ins->tp_workers; i++) { + th_ins = ctx->thread_instances[i]; + if (th_ins == current_th_ins) { + return; + } + if (th_ins == NULL) { + ctx->thread_instances[i] = current_th_ins; + } + } + } + pthread_mutex_unlock(&ctx->create_timer_mutex); +} + +static void s3_flush_init(struct flb_config *config, struct flb_s3 *ctx) +{ + struct flb_out_thread_instance *current_th_ins; + struct flb_out_thread_instance *th_ins; + int i; + flush_startup_chunks(ctx); - if (ctx->timer_created == FLB_FALSE) { - sched = flb_sched_ctx_get(); + if (ctx->timers_created == 0 ) { + create_timer_on_thread(config, ctx); + } + if (ctx->timers_created < ctx->ins->tp_workers && ctx->ins->is_threaded == FLB_TRUE) { + /* Check if current worker thread has a timer scheduled on its evl */ + current_th_ins = flb_output_thread_instance_get(); + + for (i = 0; i < ctx->ins->tp_workers; i++) { + th_ins = ctx->thread_instances[i]; - ret = flb_sched_out_async_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, - ctx->timer_ms, ctx->ins, - S3_UPLOAD_JOB_NAME, async_timer_cb, - ctx, NULL); - if (ret < 0) { - flb_plg_error(ctx->ins, "Failed to create upload timer"); - return; + if (th_ins != NULL && th_ins == current_th_ins) { + return; + } } - ctx->timer_created = FLB_TRUE; + create_timer_on_thread(config, ctx); } } diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index c04d3b5bfeb..c7fa96d07b6 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -149,7 +149,6 @@ struct flb_s3 { size_t upload_chunk_size; time_t upload_timeout; - int timer_created; int timer_ms; int key_fmt_has_uuid; @@ -172,6 +171,16 @@ struct flb_s3 { */ pthread_mutex_t cb_flush_mutex; + /* + * Need to create a timer on each worker thread. Store a + * array of pointers to the thread instance with a mutex to + * protect the array and timers_created counter. + */ + pthread_mutex_t create_timer_mutex; + struct flb_out_thread_instance *thread_instances[5]; + struct flb_out_thread_instance **thread_instances; + int timers_created; + struct flb_output_instance *ins; }; diff --git a/src/flb_async_timer.c b/src/flb_async_timer.c index 33ed3606d64..f850dd9458f 100644 --- a/src/flb_async_timer.c +++ b/src/flb_async_timer.c @@ -19,7 +19,6 @@ #include #include -#include #include #include @@ -167,7 +166,7 @@ void flb_thread_pool_async_timers_print(struct flb_output_instance *ins) th_ins = th->params.data; pthread_mutex_lock(&th_ins->sched->async_timer_mutex); - flb_async_timers_print(&th_ins->sched->async_timer_list); + flb_async_timers_print(th_ins->sched); pthread_mutex_unlock(&th_ins->sched->async_timer_mutex); } } diff --git a/src/flb_engine.c b/src/flb_engine.c index 7e51df71c55..80082071ea0 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -916,7 +916,7 @@ int flb_engine_start(struct flb_config *config) flb_net_dns_lookup_context_cleanup(&dns_ctx); flb_sched_timer_cleanup(config->sched); flb_upstream_conn_pending_destroy_list(&config->upstreams); - flb_async_timer_cleanup(config->sched->async_timer_list_destroy); + flb_async_timer_cleanup(config->sched); /* * depend on main thread to clean up expired message diff --git a/src/flb_output_thread.c b/src/flb_output_thread.c index ade649a912c..bee986d5aac 100644 --- a/src/flb_output_thread.c +++ b/src/flb_output_thread.c @@ -333,7 +333,6 @@ static void output_thread(void *data) /* Destroy upstream connections from the 'pending destroy list' */ flb_upstream_conn_pending_destroy_list(&th_ins->upstreams); flb_sched_timer_cleanup(sched); - flb_async_timer_cleanup(&th_ins->sched); /* Check if we should stop the event loop */ if (stopping == FLB_TRUE && mk_list_size(&th_ins->flush_list) == 0 && mk_list_size(&th_ins->sched->async_timer_list) == 0) { @@ -360,7 +359,7 @@ static void output_thread(void *data) upstream_thread_destroy(th_ins); flb_upstream_conn_active_destroy_list(&th_ins->upstreams); flb_upstream_conn_pending_destroy_list(&th_ins->upstreams); - flb_async_timer_cleanup(&th_ins->sched); + flb_async_timer_cleanup(th_ins->sched); flb_sched_destroy(sched); flush_params = FLB_TLS_GET(out_flush_params);