From 7a6b161569bebc04b2387dd7a300ac73b5dd5021 Mon Sep 17 00:00:00 2001 From: Leonardo Alminana Date: Mon, 14 Jun 2021 09:41:49 -0600 Subject: [PATCH] output_thread: fixed wrong pipe io function usage output_thread patch #1 : pipe creation is platform dependent, in unixes it's achieved through the pipe syscall but in windows it's a socket pair created through libevent which means instead of read/write we need to use recv/send which was already abstracted through flb_pipe_(r/w). output_thread patch #2 : in windows the SOCKET data type is defined as an UINT_PTR which means in 64 bit operating systems it's an 8 byte number instead of 4. libevent abstracts this through the evutil_socket_t data type which in turn is abstracted using the flb_pipefd_t data type in fluent bit. The problem comes to play when calling mk_event_channel_create which receives 2 int pointers to return both pipe endpoints. This means there are 2 possible bugs (one of which was happening and the other is not really a concern I think) : Since mk_event_channel_create was only modifying the low part of the elements in the ch_parent_events array and the structure was not zeroed when allocated, the high parts of those 64 bit members contained garbage which caused any winsock calls to return error 10038 (not a socket). That's the reason why I added a memset call in line 421. There is still a possible issue with mk_event_channel_create which we should fix by moving our platform dependent data type to mk_lib and defining our flb local data types to those if need be (for consistency). Signed-off-by: Leonardo Alminana --- src/flb_output_thread.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/flb_output_thread.c b/src/flb_output_thread.c index c7bf5e0a405..1ff45ccd236 100644 --- a/src/flb_output_thread.c +++ b/src/flb_output_thread.c @@ -92,7 +92,7 @@ static inline int handle_output_event(struct flb_config *config, * Notify the parent event loop the return status, just forward the same * 64 bits value. */ - ret = write(ch_parent, &val, sizeof(val)); + ret = flb_pipe_w(ch_parent, &val, sizeof(val)); if (ret == -1) { flb_errno(); return -1; @@ -376,7 +376,9 @@ int flb_output_thread_pool_flush(struct flb_task *task, flb_plg_debug(out_ins, "task_id=%i assigned to thread #%i", task->id, th->id); - n = write(th_ins->ch_parent_events[1], &task, sizeof(struct flb_task *)); + + n = flb_pipe_w(th_ins->ch_parent_events[1], &task, sizeof(struct flb_task*)); + if (n == -1) { flb_errno(); return -1; @@ -416,6 +418,8 @@ int flb_output_thread_pool_create(struct flb_config *config, flb_errno(); continue; } + memset(th_ins, 0, sizeof(struct flb_out_thread_instance)); + th_ins->config = config; th_ins->ins = ins; th_ins->coro_id = 0; @@ -518,7 +522,7 @@ void flb_output_thread_pool_destroy(struct flb_output_instance *ins) } th_ins = th->params.data; - n = write(th_ins->ch_parent_events[1], &stop, sizeof(stop)); + n = flb_pipe_w(th_ins->ch_parent_events[1], &stop, sizeof(stop)); if (n < 0) { flb_errno(); flb_plg_error(th_ins->ins, "could not signal worker thread");