From e5d68181f69fdf4cb36d54d9b89912ad814d4606 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Mon, 23 Oct 2023 09:53:19 -0700 Subject: [PATCH 01/12] Prototype for creating one timer coro per output thread Signed-off-by: Wesley Pettit --- plugins/out_s3/s3.c | 47 ++++++++++++++++++++++++++++++++++++++++++++- plugins/out_s3/s3.h | 2 ++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index fabb39d854d..ab4ebcf6669 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -593,6 +593,16 @@ static int cb_s3_init(struct flb_output_instance *ins, return -1; } + if (ctx->ins->is_threaded == FLB_TRUE) { + ctx->thread_instances = flb_calloc(1, sizeof(struct flb_out_thread_instance *) * ctx->ins->tp_workers); + if (!ctx->thread_instances) { + flb_errno(); + return -1; + } + } else { + ctx->thread_instances = NULL; + } + /* the check against -1 is works here because size_t is unsigned * and (int) -1 == unsigned max value * Fluent Bit uses -1 (which becomes max value) to indicate undefined @@ -1988,10 +1998,35 @@ static void s3_flush_init(struct flb_config *config, struct flb_s3 *ctx) { struct flb_sched *sched; int ret; + struct flb_out_thread_instance *current_th_ins; + struct flb_out_thread_instance *th_ins; + int start = FLB_FALSE; + int i; flush_startup_chunks(ctx); - if (ctx->timer_created == FLB_FALSE) { + + /* Check if current worker thread has a timer scheduled on its evl */ + if (ctx->ins->is_threaded == FLB_TRUE) { + current_th_ins = flb_output_thread_instance_get(); + start = FLB_TRUE; + if (current_th_ins == NULL) { + //TODO: ? + } + + for (i = 0; i < ctx->ins->tp_workers; i++) { + th_ins = ctx->thread_instances[i]; + + if (th_ins != NULL && th_ins == current_th_ins) { + start = FLB_FALSE; + } + } + } else { + start = !ctx->timer_created; + } + + /* Schedule the new timer */ + if (start == FLB_TRUE) { sched = flb_sched_ctx_get(); ret = flb_sched_out_async_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, @@ -2003,6 +2038,16 @@ static void s3_flush_init(struct flb_config *config, struct flb_s3 *ctx) return; } ctx->timer_created = FLB_TRUE; + + /* Save the worker thread pointer in our list */ + if (ctx->ins->is_threaded == FLB_TRUE) { + for (i = 0; i < ctx->ins->tp_workers; i++) { + th_ins = ctx->thread_instances[i]; + if (th_ins == NULL) { + ctx->thread_instances[i] = current_th_ins; + } + } + } } } diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index c04d3b5bfeb..973901e71e0 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -172,6 +172,8 @@ struct flb_s3 { */ pthread_mutex_t cb_flush_mutex; + struct flb_out_thread_instance **thread_instances; + struct flb_output_instance *ins; }; From 0e0116d9cf13ed5262728822f79cc8b19f647c20 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Thu, 26 Oct 2023 16:09:18 -0700 Subject: [PATCH 02/12] out_s3: create an async timer on each worker thread Signed-off-by: Wesley Pettit --- plugins/out_s3/s3.c | 82 +++++++++++++++++++++++++-------------------- plugins/out_s3/s3.h | 8 ++++- 2 files changed, 53 insertions(+), 37 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index ab4ebcf6669..b1adb493810 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -583,6 +583,7 @@ static int cb_s3_init(struct flb_output_instance *ins, pthread_mutex_init(&ctx->upload_queue_mutex, NULL); pthread_mutex_init(&ctx->cb_flush_mutex, NULL); + pthread_mutex_init(&ctx->create_timer_mutex, NULL); /* Export context */ flb_output_set_context(ins, ctx); @@ -968,7 +969,7 @@ static int cb_s3_init(struct flb_output_instance *ins, ctx->provider->provider_vtable->sync(ctx->provider); ctx->provider->provider_vtable->init(ctx->provider); - ctx->timer_created = FLB_FALSE; + ctx->timers_created = 0; ctx->timer_ms = (int) (ctx->upload_timeout / 6) * 1000; if (ctx->timer_ms > UPLOAD_TIMER_MAX_WAIT) { ctx->timer_ms = UPLOAD_TIMER_MAX_WAIT; @@ -1994,6 +1995,43 @@ static void async_timer_cb(struct flb_config *config, void *data) cb_s3_upload(config, ctx); } +static void create_timer_on_thread(struct flb_config *config, struct flb_s3 *ctx) +{ + struct flb_sched *sched; + struct flb_out_thread_instance *th_ins; + struct flb_out_thread_instance *current_th_ins; + int i; + int ret; + + pthread_mutex_lock(&ctx->create_timer_mutex); + sched = flb_sched_ctx_get(); + ret = flb_sched_out_async_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, + ctx->timer_ms, ctx->ins, + S3_UPLOAD_JOB_NAME, async_timer_cb, + ctx, NULL); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to create upload timer"); + pthread_mutex_unlock(&ctx->create_timer_mutex); + return; + } + + ctx->timers_created++; + /* Save the worker thread pointer in the list */ + if (ctx->ins->is_threaded == FLB_TRUE) { + current_th_ins = flb_output_thread_instance_get(); + for (i = 0; i < ctx->ins->tp_workers; i++) { + th_ins = ctx->thread_instances[i]; + if (th_ins == current_th_ins) { + return; + } + if (th_ins == NULL) { + ctx->thread_instances[i] = current_th_ins; + } + } + } + pthread_mutex_unlock(&ctx->create_timer_mutex); +} + static void s3_flush_init(struct flb_config *config, struct flb_s3 *ctx) { struct flb_sched *sched; @@ -2005,49 +2043,21 @@ static void s3_flush_init(struct flb_config *config, struct flb_s3 *ctx) flush_startup_chunks(ctx); - - /* Check if current worker thread has a timer scheduled on its evl */ - if (ctx->ins->is_threaded == FLB_TRUE) { + if (ctx->timers_created == 0 ) { + create_timer_on_thread(config, ctx); + } + if (ctx->timers_created < ctx->ins->tp_workers && ctx->ins->is_threaded == FLB_TRUE) { + /* Check if current worker thread has a timer scheduled on its evl */ current_th_ins = flb_output_thread_instance_get(); - start = FLB_TRUE; - if (current_th_ins == NULL) { - //TODO: ? - } for (i = 0; i < ctx->ins->tp_workers; i++) { th_ins = ctx->thread_instances[i]; if (th_ins != NULL && th_ins == current_th_ins) { - start = FLB_FALSE; - } - } - } else { - start = !ctx->timer_created; - } - - /* Schedule the new timer */ - if (start == FLB_TRUE) { - sched = flb_sched_ctx_get(); - - ret = flb_sched_out_async_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, - ctx->timer_ms, ctx->ins, - S3_UPLOAD_JOB_NAME, async_timer_cb, - ctx, NULL); - if (ret < 0) { - flb_plg_error(ctx->ins, "Failed to create upload timer"); - return; - } - ctx->timer_created = FLB_TRUE; - - /* Save the worker thread pointer in our list */ - if (ctx->ins->is_threaded == FLB_TRUE) { - for (i = 0; i < ctx->ins->tp_workers; i++) { - th_ins = ctx->thread_instances[i]; - if (th_ins == NULL) { - ctx->thread_instances[i] = current_th_ins; - } + return; } } + create_timer_on_thread(config, ctx); } } diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 973901e71e0..593d19be69d 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -149,7 +149,6 @@ struct flb_s3 { size_t upload_chunk_size; time_t upload_timeout; - int timer_created; int timer_ms; int key_fmt_has_uuid; @@ -172,7 +171,14 @@ struct flb_s3 { */ pthread_mutex_t cb_flush_mutex; + /* + * Need to create a timer on each worker thread. Store a + * array of pointers to the thread instance with a mutex to + * protect the array and timers_created counter. + */ + pthread_mutex_t create_timer_mutex; struct flb_out_thread_instance **thread_instances; + int timers_created; struct flb_output_instance *ins; }; From 09d72bf6d7fc6657d4c016fe9e37213e6a997ee8 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Thu, 26 Oct 2023 16:34:34 -0700 Subject: [PATCH 03/12] wip --- src/flb_async_timer.c | 2 +- src/flb_engine.c | 2 +- src/flb_output_thread.c | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/flb_async_timer.c b/src/flb_async_timer.c index 33ed3606d64..413955b0d87 100644 --- a/src/flb_async_timer.c +++ b/src/flb_async_timer.c @@ -167,7 +167,7 @@ void flb_thread_pool_async_timers_print(struct flb_output_instance *ins) th_ins = th->params.data; pthread_mutex_lock(&th_ins->sched->async_timer_mutex); - flb_async_timers_print(&th_ins->sched->async_timer_list); + flb_async_timers_print(th_ins->sched); pthread_mutex_unlock(&th_ins->sched->async_timer_mutex); } } diff --git a/src/flb_engine.c b/src/flb_engine.c index 7e51df71c55..80082071ea0 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -916,7 +916,7 @@ int flb_engine_start(struct flb_config *config) flb_net_dns_lookup_context_cleanup(&dns_ctx); flb_sched_timer_cleanup(config->sched); flb_upstream_conn_pending_destroy_list(&config->upstreams); - flb_async_timer_cleanup(config->sched->async_timer_list_destroy); + flb_async_timer_cleanup(config->sched); /* * depend on main thread to clean up expired message diff --git a/src/flb_output_thread.c b/src/flb_output_thread.c index ade649a912c..3f77df19a58 100644 --- a/src/flb_output_thread.c +++ b/src/flb_output_thread.c @@ -360,7 +360,7 @@ static void output_thread(void *data) upstream_thread_destroy(th_ins); flb_upstream_conn_active_destroy_list(&th_ins->upstreams); flb_upstream_conn_pending_destroy_list(&th_ins->upstreams); - flb_async_timer_cleanup(&th_ins->sched); + flb_async_timer_cleanup(th_ins->sched); flb_sched_destroy(sched); flush_params = FLB_TLS_GET(out_flush_params); From c8dbf2aace1cae151543abffb7d741c86ae6d390 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Thu, 26 Oct 2023 16:38:01 -0700 Subject: [PATCH 04/12] wip --- src/flb_output_thread.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/flb_output_thread.c b/src/flb_output_thread.c index 3f77df19a58..bee986d5aac 100644 --- a/src/flb_output_thread.c +++ b/src/flb_output_thread.c @@ -333,7 +333,6 @@ static void output_thread(void *data) /* Destroy upstream connections from the 'pending destroy list' */ flb_upstream_conn_pending_destroy_list(&th_ins->upstreams); flb_sched_timer_cleanup(sched); - flb_async_timer_cleanup(&th_ins->sched); /* Check if we should stop the event loop */ if (stopping == FLB_TRUE && mk_list_size(&th_ins->flush_list) == 0 && mk_list_size(&th_ins->sched->async_timer_list) == 0) { From 415b5936fbd1cbb593b40278dfaa8653213729b0 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 31 Oct 2023 11:13:20 -0700 Subject: [PATCH 05/12] wip --- include/fluent-bit/flb_output.h | 1 - 1 file changed, 1 deletion(-) diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index 21ab05b3637..2656e0e501d 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -43,7 +43,6 @@ #include #include #include -#include #include #include #include From 8de7ac3cf54e4a45adc090b335faa9791a900d5d Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Fri, 17 Nov 2023 10:20:00 -0800 Subject: [PATCH 06/12] ??? build failing why ??? Signed-off-by: Wesley Pettit --- include/fluent-bit/flb_output.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index 2656e0e501d..ce9f1a0c015 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -76,6 +76,10 @@ struct flb_output_flush; +struct flb_out_thread_instance; +int flb_output_thread_pool_coros_size(struct flb_output_instance *ins); +struct flb_out_thread_instance *flb_output_thread_instance_get(); + /* * Tests callbacks * =============== From 2289649ac37bd9153c21455bc5bda14d7c9685f8 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Mon, 27 Nov 2023 19:56:31 -0800 Subject: [PATCH 07/12] wip --- include/fluent-bit/flb_output.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index ce9f1a0c015..bfd6de7a86a 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -76,6 +76,8 @@ struct flb_output_flush; +struct flb_out_thread_instance; + struct flb_out_thread_instance; int flb_output_thread_pool_coros_size(struct flb_output_instance *ins); struct flb_out_thread_instance *flb_output_thread_instance_get(); From 85ab1a37d70c54010fd0ae626c7881c849c6b22f Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 28 Nov 2023 11:10:50 -0800 Subject: [PATCH 08/12] wip --- include/fluent-bit/flb_output.h | 38 ++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index bfd6de7a86a..5be4ec6d72c 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -76,7 +76,43 @@ struct flb_output_flush; -struct flb_out_thread_instance; +struct flb_out_thread_instance { + struct mk_event event; /* event context to associate events */ + struct mk_event_loop *evl; /* thread event loop context */ + struct flb_bucket_queue *evl_bktq; /* bucket queue for evl track event priority */ + flb_pipefd_t ch_parent_events[2]; /* channel to receive parent notifications */ + flb_pipefd_t ch_thread_events[2]; /* channel to send messages local event loop */ + struct flb_output_instance *ins; /* output plugin instance */ + struct flb_config *config; + struct flb_tp_thread *th; + struct mk_list _head; + + /* + * In multithread mode, we move some contexts to independent references per thread + * so we can avoid to have shared resources and mutexes. + * + * The following 'coro' fields maintains a state of co-routines inside the thread + * event loop. + * + * note: in single-thread mode, the same fields are in 'struct flb_output_instance'. + */ + int flush_id; /* coroutine id counter */ + struct mk_list flush_list; /* flush context list */ + struct mk_list flush_list_destroy; /* flust context destroy list */ + + /* + * If the main engine (parent thread) needs to query the number of active + * 'flushes' running by a threaded instance, then the access to the 'flush_list' + * must be protected: we use 'flush_mutex for that purpose. + */ + pthread_mutex_t flush_mutex; /* mutex for 'flush_list' */ + + /* List of mapped 'upstream' contexts */ + struct mk_list upstreams; + + /* Each event loop has a scheduler instance */ + struct flb_sched *sched; +}; struct flb_out_thread_instance; int flb_output_thread_pool_coros_size(struct flb_output_instance *ins); From 4e22809a3f06fab9135b10227e4c59ee0caf1de5 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 28 Nov 2023 11:12:42 -0800 Subject: [PATCH 09/12] wip --- include/fluent-bit/flb_output.h | 38 --------------------------------- 1 file changed, 38 deletions(-) diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index 5be4ec6d72c..ce9f1a0c015 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -76,44 +76,6 @@ struct flb_output_flush; -struct flb_out_thread_instance { - struct mk_event event; /* event context to associate events */ - struct mk_event_loop *evl; /* thread event loop context */ - struct flb_bucket_queue *evl_bktq; /* bucket queue for evl track event priority */ - flb_pipefd_t ch_parent_events[2]; /* channel to receive parent notifications */ - flb_pipefd_t ch_thread_events[2]; /* channel to send messages local event loop */ - struct flb_output_instance *ins; /* output plugin instance */ - struct flb_config *config; - struct flb_tp_thread *th; - struct mk_list _head; - - /* - * In multithread mode, we move some contexts to independent references per thread - * so we can avoid to have shared resources and mutexes. - * - * The following 'coro' fields maintains a state of co-routines inside the thread - * event loop. - * - * note: in single-thread mode, the same fields are in 'struct flb_output_instance'. - */ - int flush_id; /* coroutine id counter */ - struct mk_list flush_list; /* flush context list */ - struct mk_list flush_list_destroy; /* flust context destroy list */ - - /* - * If the main engine (parent thread) needs to query the number of active - * 'flushes' running by a threaded instance, then the access to the 'flush_list' - * must be protected: we use 'flush_mutex for that purpose. - */ - pthread_mutex_t flush_mutex; /* mutex for 'flush_list' */ - - /* List of mapped 'upstream' contexts */ - struct mk_list upstreams; - - /* Each event loop has a scheduler instance */ - struct flb_sched *sched; -}; - struct flb_out_thread_instance; int flb_output_thread_pool_coros_size(struct flb_output_instance *ins); struct flb_out_thread_instance *flb_output_thread_instance_get(); From 0ba44ad7f64f97b09ea05ac548edd458b9c507ad Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Thu, 30 Nov 2023 10:49:04 -0800 Subject: [PATCH 10/12] Use lecaros suggestion to fix build issue Signed-off-by: Wesley Pettit --- include/fluent-bit/flb_async_timer.h | 1 - src/flb_async_timer.c | 1 - 2 files changed, 2 deletions(-) diff --git a/include/fluent-bit/flb_async_timer.h b/include/fluent-bit/flb_async_timer.h index 16ad39723f2..81cb2374773 100644 --- a/include/fluent-bit/flb_async_timer.h +++ b/include/fluent-bit/flb_async_timer.h @@ -29,7 +29,6 @@ #endif #include -#include #include #include diff --git a/src/flb_async_timer.c b/src/flb_async_timer.c index 413955b0d87..f850dd9458f 100644 --- a/src/flb_async_timer.c +++ b/src/flb_async_timer.c @@ -19,7 +19,6 @@ #include #include -#include #include #include From 3c5f0469b9b1fb7207b900272b64a75ca530ee3e Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Thu, 30 Nov 2023 13:59:27 -0800 Subject: [PATCH 11/12] wip --- plugins/out_s3/s3.c | 18 +++++++++--------- plugins/out_s3/s3.h | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index b1adb493810..1608f1a4a0e 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -594,15 +594,15 @@ static int cb_s3_init(struct flb_output_instance *ins, return -1; } - if (ctx->ins->is_threaded == FLB_TRUE) { - ctx->thread_instances = flb_calloc(1, sizeof(struct flb_out_thread_instance *) * ctx->ins->tp_workers); - if (!ctx->thread_instances) { - flb_errno(); - return -1; - } - } else { - ctx->thread_instances = NULL; - } + // if (ctx->ins->is_threaded == FLB_TRUE) { + // ctx->thread_instances = flb_calloc(1, sizeof(struct flb_out_thread_instance *) * ctx->ins->tp_workers); + // if (!ctx->thread_instances) { + // flb_errno(); + // return -1; + // } + // } else { + // ctx->thread_instances = NULL; + // } /* the check against -1 is works here because size_t is unsigned * and (int) -1 == unsigned max value diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 593d19be69d..8ea94632dfb 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -177,7 +177,7 @@ struct flb_s3 { * protect the array and timers_created counter. */ pthread_mutex_t create_timer_mutex; - struct flb_out_thread_instance **thread_instances; + struct flb_out_thread_instance *thread_instances[5]; int timers_created; struct flb_output_instance *ins; From e0c34acc843788454a928e9b72fccd22bad8fd89 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sat, 2 Dec 2023 12:45:24 -0800 Subject: [PATCH 12/12] wip --- plugins/out_s3/s3.c | 24 ++++++++++++------------ plugins/out_s3/s3.h | 1 + 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 1608f1a4a0e..7134396055e 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -594,15 +594,18 @@ static int cb_s3_init(struct flb_output_instance *ins, return -1; } - // if (ctx->ins->is_threaded == FLB_TRUE) { - // ctx->thread_instances = flb_calloc(1, sizeof(struct flb_out_thread_instance *) * ctx->ins->tp_workers); - // if (!ctx->thread_instances) { - // flb_errno(); - // return -1; - // } - // } else { - // ctx->thread_instances = NULL; - // } + // alloc here doesn't work somehow?? + if (ctx->ins->is_threaded == FLB_TRUE && ctx->ins->tp_workers > 0) { + ctx->thread_instances = flb_calloc(1, + sizeof(struct flb_out_thread_instance *) + * ctx->ins->tp_workers); // check that its not zero + if (!ctx->thread_instances) { + flb_errno(); + return -1; + } + } else { + ctx->thread_instances = NULL; + } /* the check against -1 is works here because size_t is unsigned * and (int) -1 == unsigned max value @@ -2034,11 +2037,8 @@ static void create_timer_on_thread(struct flb_config *config, struct flb_s3 *ctx static void s3_flush_init(struct flb_config *config, struct flb_s3 *ctx) { - struct flb_sched *sched; - int ret; struct flb_out_thread_instance *current_th_ins; struct flb_out_thread_instance *th_ins; - int start = FLB_FALSE; int i; flush_startup_chunks(ctx); diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 8ea94632dfb..c7fa96d07b6 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -178,6 +178,7 @@ struct flb_s3 { */ pthread_mutex_t create_timer_mutex; struct flb_out_thread_instance *thread_instances[5]; + struct flb_out_thread_instance **thread_instances; int timers_created; struct flb_output_instance *ins;