diff --git a/include/fluent-bit/flb_output_thread.h b/include/fluent-bit/flb_output_thread.h index 16295a6fa1e..3c28e1c849c 100644 --- a/include/fluent-bit/flb_output_thread.h +++ b/include/fluent-bit/flb_output_thread.h @@ -23,6 +23,7 @@ struct flb_out_thread_instance { struct mk_event event; /* event context to associate events */ + struct mk_event_loop *evl; /* thread event loop context */ flb_pipefd_t ch_parent_events[2]; /* channel to receive parent notifications */ flb_pipefd_t ch_thread_events[2]; /* channel to send messages local event loop */ struct flb_output_instance *ins; /* output plugin instance */ diff --git a/src/flb_output_thread.c b/src/flb_output_thread.c index 35259d80543..ce980f8713c 100644 --- a/src/flb_output_thread.c +++ b/src/flb_output_thread.c @@ -118,7 +118,6 @@ static void output_thread(void *data) char tmp[64]; struct mk_event event_local; struct mk_event *event; - struct mk_event_loop *evl; struct flb_sched *sched; struct flb_task *task; struct flb_upstream_conn *u_conn; @@ -133,18 +132,10 @@ static void output_thread(void *data) ins = th_ins->ins; thread_id = th_ins->th->id; - /* Create the event loop for this thread */ - evl = mk_event_loop_create(64); - if (!evl) { - flb_plg_error(ins, "could not create thread event loop"); - return; - } - /* Create a scheduler context */ - sched = flb_sched_create(ins->config, evl); + sched = flb_sched_create(ins->config, th_ins->evl); if (!sched) { flb_plg_error(ins, "could not create thread scheduler"); - mk_event_loop_destroy(evl); return; } flb_sched_ctx_set(sched); @@ -171,39 +162,16 @@ static void output_thread(void *data) * it sets the event loop reference in a TLS (thread local storage) variable * of the scope of this thread. */ - flb_engine_evl_set(evl); - - /* - * Event loop setup between parent engine and this thread - * - * - FLB engine uses 'ch_parent_events[1]' to dispatch tasks to this thread - * - Thread receive message on ch_parent_events[0] - * - * The mk_event_channel_create() will attach the pipe read end ch_parent_events[0] - * to the local event loop 'evl'. - */ - ret = mk_event_channel_create(evl, - &th_ins->ch_parent_events[0], - &th_ins->ch_parent_events[1], - th_ins); - if (ret == -1) { - flb_plg_error(th_ins->ins, "could not create thread channel"); - flb_engine_evl_set(NULL); - mk_event_loop_destroy(evl); - return; - } - /* Signal type to indicate a "flush" request */ - th_ins->event.type = FLB_ENGINE_EV_THREAD_OUTPUT; + flb_engine_evl_set(th_ins->evl); /* Channel used by flush callbacks to notify it return status */ - ret = mk_event_channel_create(evl, + ret = mk_event_channel_create(th_ins->evl, &th_ins->ch_thread_events[0], &th_ins->ch_thread_events[1], &event_local); if (ret == -1) { flb_plg_error(th_ins->ins, "could not create thread channel"); flb_engine_evl_set(NULL); - mk_event_loop_destroy(evl); return; } event_local.type = FLB_ENGINE_EV_OUTPUT; @@ -212,8 +180,8 @@ static void output_thread(void *data) /* Thread event loop */ while (running) { - mk_event_wait(evl); - mk_event_foreach(event, evl) { + mk_event_wait(th_ins->evl); + mk_event_foreach(event, th_ins->evl) { /* * FIXME * ----- @@ -294,7 +262,6 @@ static void output_thread(void *data) } flb_sched_destroy(sched); - mk_event_loop_destroy(evl); params = FLB_TLS_GET(out_coro_params); if (params) { @@ -335,8 +302,10 @@ int flb_output_thread_pool_create(struct flb_config *config, struct flb_output_instance *ins) { int i; + int ret; struct flb_tp *tp; struct flb_tp_thread *th; + struct mk_event_loop *evl; struct flb_out_thread_instance *th_ins; /* Create the thread pool context */ @@ -363,6 +332,37 @@ int flb_output_thread_pool_create(struct flb_config *config, th_ins->config = config; th_ins->ins = ins; + /* Create the event loop for this thread */ + evl = mk_event_loop_create(64); + if (!evl) { + flb_plg_error(ins, "could not create thread event loop"); + flb_free(th_ins); + continue; + } + th_ins->evl = evl; + + /* + * Event loop setup between parent engine and this thread + * + * - FLB engine uses 'ch_parent_events[1]' to dispatch tasks to this thread + * - Thread receive message on ch_parent_events[0] + * + * The mk_event_channel_create() will attach the pipe read end ch_parent_events[0] + * to the local event loop 'evl'. + */ + ret = mk_event_channel_create(th_ins->evl, + &th_ins->ch_parent_events[0], + &th_ins->ch_parent_events[1], + th_ins); + if (ret == -1) { + flb_plg_error(th_ins->ins, "could not create thread channel"); + mk_event_loop_destroy(th_ins->evl); + flb_free(th_ins); + continue; + } + /* Signal type to indicate a "flush" request */ + th_ins->event.type = FLB_ENGINE_EV_THREAD_OUTPUT; + /* Spawn the thread */ th = flb_tp_thread_create(tp, output_thread, th_ins, config); if (!th) {