diff --git a/src/flb_output_thread.c b/src/flb_output_thread.c index c7bf5e0a405..1ff45ccd236 100644 --- a/src/flb_output_thread.c +++ b/src/flb_output_thread.c @@ -92,7 +92,7 @@ static inline int handle_output_event(struct flb_config *config, * Notify the parent event loop the return status, just forward the same * 64 bits value. */ - ret = write(ch_parent, &val, sizeof(val)); + ret = flb_pipe_w(ch_parent, &val, sizeof(val)); if (ret == -1) { flb_errno(); return -1; @@ -376,7 +376,9 @@ int flb_output_thread_pool_flush(struct flb_task *task, flb_plg_debug(out_ins, "task_id=%i assigned to thread #%i", task->id, th->id); - n = write(th_ins->ch_parent_events[1], &task, sizeof(struct flb_task *)); + + n = flb_pipe_w(th_ins->ch_parent_events[1], &task, sizeof(struct flb_task*)); + if (n == -1) { flb_errno(); return -1; @@ -416,6 +418,8 @@ int flb_output_thread_pool_create(struct flb_config *config, flb_errno(); continue; } + memset(th_ins, 0, sizeof(struct flb_out_thread_instance)); + th_ins->config = config; th_ins->ins = ins; th_ins->coro_id = 0; @@ -518,7 +522,7 @@ void flb_output_thread_pool_destroy(struct flb_output_instance *ins) } th_ins = th->params.data; - n = write(th_ins->ch_parent_events[1], &stop, sizeof(stop)); + n = flb_pipe_w(th_ins->ch_parent_events[1], &stop, sizeof(stop)); if (n < 0) { flb_errno(); flb_plg_error(th_ins->ins, "could not signal worker thread");