From 1081684ceaa443dd69ff781cef65e3e3dcf65388 Mon Sep 17 00:00:00 2001 From: Matthew Fala Date: Tue, 12 Oct 2021 15:39:39 -0700 Subject: [PATCH 1/7] io: support timeout on synchronous calls Signed-off-by: Matthew Fala --- src/flb_io.c | 63 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) diff --git a/src/flb_io.c b/src/flb_io.c index 5f2dc630742..c00b2a1ebeb 100644 --- a/src/flb_io.c +++ b/src/flb_io.c @@ -48,6 +48,12 @@ #include #include +#ifdef FLB_SYSTEM_WINDOWS +#define poll WSAPoll +#else +#include +#endif + #include #include #include @@ -309,12 +315,67 @@ static ssize_t net_io_read(struct flb_upstream_conn *u_conn, void *buf, size_t len) { int ret; + struct pollfd pfd_read; + + /* Set socket to non-blocking mode for timeout */ + flb_net_socket_nonblocking(u_conn->fd); ret = recv(u_conn->fd, buf, len, 0); if (ret == -1) { - return -1; + /* + * An asynchronous recv can return -1, but what is important is the + * socket status, getting a EWOULDBLOCK is expected, but any other case + * means a failure. + */ + if (!FLB_WOULDBLOCK()) { + /* Generic error */ + flb_warn("[net] io_read #%i failed from: %s:%i", + u_conn->fd, u_conn->u->tcp_host, u_conn->u->tcp_port); + flb_net_socket_blocking(u_conn->fd); + return -1; + } + + /* The connection is still in progress, implement a socket timeout */ + flb_trace("[net] io_read #%i in progress from: %s:%i", + u_conn->fd, u_conn->u->tcp_host, u_conn->u->tcp_port); + /* + * Prepare a timeout using poll(2): we could use our own + * event loop mechanism for this, but it will require an + * extra file descriptor, the poll(2) call is straightforward + * for this use case. + */ + + pfd_read.fd = u_conn->fd; + pfd_read.events = POLLIN; + ret = poll(&pfd_read, 1, u_conn->u->net.connect_timeout * 1000); + if (ret == 0) { + /* Timeout */ + flb_warn("[net] io_read #%i timeout after %i seconds from: " + "%s:%i", + u_conn->fd, u_conn->u->net.connect_timeout, + u_conn->u->tcp_host, u_conn->u->tcp_port); + flb_socket_close(u_conn->fd); + flb_net_socket_blocking(u_conn->fd); + return -1; + } + else if (ret < 0) { + /* Generic error */ + flb_warn("[net] io_read #%i failed from: %s:%i", + u_conn->fd, u_conn->u->tcp_host, u_conn->u->tcp_port); + flb_socket_close(u_conn->fd); + flb_net_socket_blocking(u_conn->fd); + return -1; + } + + /* Get data */ + ret = recv(u_conn->fd, buf, len, 0); } + /* + * The read succeeded, return the normal + * non-blocking mode to the socket. + */ + flb_net_socket_blocking(u_conn->fd); return ret; } From 360c73729f58097060b77980bb1787b40b353715 Mon Sep 17 00:00:00 2001 From: Matthew Fala Date: Thu, 4 Nov 2021 00:51:42 +0000 Subject: [PATCH 2/7] Begin implementation --- include/fluent-bit/flb_io.h | 30 ++++++++++++++++++++++++++++++ src/flb_io.c | 25 +++++++++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/include/fluent-bit/flb_io.h b/include/fluent-bit/flb_io.h index 559bc95d7b2..667949c3fb1 100644 --- a/include/fluent-bit/flb_io.h +++ b/include/fluent-bit/flb_io.h @@ -42,9 +42,39 @@ /* Other features */ #define FLB_IO_IPV6 32 /* network I/O uses IPv6 */ +/* IO Wait */ +#define FLB_IO_WAIT_ERROR 0 +#define FLB_IO_WAIT_TIMEOUT 1 +#define FLB_IO_WAIT_COMPLETE 2 +typedef int flb_io_wait_ret; + int flb_io_net_connect(struct flb_upstream_conn *u_conn, struct flb_coro *th); +/* + * Wait for connection via async:mk_event_loop or sync:poll(2) + * Uses monkey event loop if async, + * Otherwise sync blocking wait. + * + * currently timeout only supported for sync waits + * + * If timeout_ms is -1, then there is no timeout. + * + * u_conn->coro and u_conn->fd must be set. + * Return FLB_IO_WAIT_ERROR on failure + * Return FLB_IO_WAIT_TIMEOUT on timeout + * Return FLB_IO_WAIT_COMPLETE on complete + * + * It is the responsability of the caller to set u_conn->coro is async + * + * @param co may be set to null if sync + * + * @param mask is an event types mask composed of MK_EVENT_ + * or the equivalent POLL + */ +flb_io_wait_ret flb_io_wait(struct flb_upstream_conn *u_conn, uint32_t mask, + struct flb_coro *co); + int flb_io_net_write(struct flb_upstream_conn *u, const void *data, size_t len, size_t *out_len); ssize_t flb_io_net_read(struct flb_upstream_conn *u, void *buf, size_t len); diff --git a/src/flb_io.c b/src/flb_io.c index c00b2a1ebeb..c9c42156f3a 100644 --- a/src/flb_io.c +++ b/src/flb_io.c @@ -124,6 +124,31 @@ int flb_io_net_connect(struct flb_upstream_conn *u_conn, return 0; } +/* + * Wait for connection via async:mk_event_loop or sync:poll(2) + * Uses monkey event loop if async, + * Otherwise sync blocking wait. + * + * currently timeout only supported for sync waits + * + * If timeout_ms is -1, then there is no timeout. + * + * u_conn->coro and u_conn->fd must be set. + * Return FLB_IO_WAIT_ERROR on failure + * Return FLB_IO_WAIT_TIMEOUT on timeout + * Return FLB_IO_WAIT_COMPLETE on complete + * + * It is the responsability of the caller to set u_conn->coro is async + * + * @param mask is an event types mask composed of MK_EVENT_ + * or the equivalent POLL + */ +flb_io_wait_ret flb_io_wait(struct flb_upstream_conn *u_conn, uint32_t mask, + int timeout_ms) +{ + +} + static int net_io_write(struct flb_upstream_conn *u_conn, const void *data, size_t len, size_t *out_len) { From 6c4203bc529face89d5351c08e9887d87ee4eb75 Mon Sep 17 00:00:00 2001 From: Matthew Fala Date: Thu, 4 Nov 2021 01:27:37 +0000 Subject: [PATCH 3/7] net: async timeout handling --- include/fluent-bit/flb_network.h | 3 +++ include/fluent-bit/flb_upstream_conn.h | 8 +++++- src/flb_upstream.c | 35 +++++++++++++++++--------- src/tls/flb_tls.c | 4 +-- 4 files changed, 35 insertions(+), 15 deletions(-) diff --git a/include/fluent-bit/flb_network.h b/include/fluent-bit/flb_network.h index 11246c1d5bb..a64d4b63686 100644 --- a/include/fluent-bit/flb_network.h +++ b/include/fluent-bit/flb_network.h @@ -40,6 +40,9 @@ struct flb_net_setup { /* max time in seconds to wait for a established connection */ int connect_timeout; + /* max time in seconds to wait for a network response */ + int response_timeout; + /* network interface to bind and use to send data */ flb_sds_t source_address; diff --git a/include/fluent-bit/flb_upstream_conn.h b/include/fluent-bit/flb_upstream_conn.h index 138202a438d..2760e42d2b2 100644 --- a/include/fluent-bit/flb_upstream_conn.h +++ b/include/fluent-bit/flb_upstream_conn.h @@ -29,6 +29,11 @@ #include #endif +#define FLB_TIMEOUT_SUBJECT_NULL 0 +#define FLB_TIMEOUT_SUBJECT_CONNECT 1 +#define FLB_TIMEOUT_SUBJECT_RESPONSE 2 +typedef int flb_timeout_subject; + /* Upstream TCP connection */ struct flb_upstream_conn { struct mk_event event; @@ -71,7 +76,8 @@ struct flb_upstream_conn { /* Connect */ time_t ts_connect_start; - time_t ts_connect_timeout; + time_t ts_timeout; + flb_timeout_subject ts_timeout_subject; /* Event loop */ struct mk_event_loop *evl; diff --git a/src/flb_upstream.c b/src/flb_upstream.c index bbc83556fce..e5553f2411f 100644 --- a/src/flb_upstream.c +++ b/src/flb_upstream.c @@ -488,10 +488,12 @@ static struct flb_upstream_conn *create_conn(struct flb_upstream *u) conn->evl = evl; if (u->net.connect_timeout > 0) { - conn->ts_connect_timeout = now + u->net.connect_timeout; + conn->ts_timeout_subject = FLB_TIMEOUT_SUBJECT_CONNECT; + conn->ts_timeout = now + u->net.connect_timeout; } else { - conn->ts_connect_timeout = -1; + conn->ts_timeout_subject = NULL; + conn->ts_timeout = -1; } #ifdef FLB_HAVE_TLS @@ -541,7 +543,8 @@ static struct flb_upstream_conn *create_conn(struct flb_upstream *u) } /* Invalidate timeout for connection */ - conn->ts_connect_timeout = -1; + conn->ts_timeout_subject = NULL; + conn->ts_timeout = -1; conn->busy_flag = FLB_FALSE; return conn; @@ -786,15 +789,23 @@ int flb_upstream_conn_timeouts(struct mk_list *list) drop = FLB_FALSE; - /* Connect timeouts */ - if (u->net.connect_timeout > 0 && - u_conn->ts_connect_timeout > 0 && - u_conn->ts_connect_timeout <= now) { - drop = FLB_TRUE; - flb_error("[upstream] connection #%i to %s:%i timed out after " - "%i seconds", - u_conn->fd, - u->tcp_host, u->tcp_port, u->net.connect_timeout); + /* All connection timeouts */ + if (u_conn->ts_timeout > 0 && + u_conn->ts_timeout <= now) { + if (u_conn->ts_timeout_subject == FLB_TIMEOUT_SUBJECT_CONNECT) { + drop = FLB_TRUE; + flb_error("[upstream] connection #%i to %s:%i timed out after " + "%i seconds", + u_conn->fd, + u->tcp_host, u->tcp_port, u->net.connect_timeout); + } + else if (u_conn->ts_timeout_subject == FLB_TIMEOUT_SUBJECT_RESPONSE) { + drop = FLB_TRUE; + flb_error("[upstream] connection #%i response to %s:%i timed out " + "after %i seconds", + u_conn->fd, + u->tcp_host, u->tcp_port, u->net.response_timeout); + } } if (drop == FLB_TRUE) { diff --git a/src/tls/flb_tls.c b/src/tls/flb_tls.c index 26c42ebd6f5..807957a9870 100644 --- a/src/tls/flb_tls.c +++ b/src/tls/flb_tls.c @@ -368,8 +368,8 @@ int flb_tls_session_create(struct flb_tls *tls, /* Connect timeout */ if (u->net.connect_timeout > 0 && - u_conn->ts_connect_timeout > 0 && - u_conn->ts_connect_timeout <= time(NULL)) { + u_conn->ts_timeout > 0 && + u_conn->ts_timeout <= time(NULL)) { flb_error("[io_tls] handshake connection #%i to %s:%i timed out after " "%i seconds", u_conn->fd, From d4982dd094d2bb44c765159676b2210495b65243 Mon Sep 17 00:00:00 2001 From: Matthew Fala Date: Thu, 4 Nov 2021 03:06:20 +0000 Subject: [PATCH 4/7] Timeout handled --- include/fluent-bit/flb_io.h | 6 ++-- src/flb_io.c | 65 +++++++++++++++++++++++++++++++++++-- src/flb_upstream.c | 11 +++++-- 3 files changed, 75 insertions(+), 7 deletions(-) diff --git a/include/fluent-bit/flb_io.h b/include/fluent-bit/flb_io.h index 667949c3fb1..e99395f80e9 100644 --- a/include/fluent-bit/flb_io.h +++ b/include/fluent-bit/flb_io.h @@ -44,7 +44,7 @@ /* IO Wait */ #define FLB_IO_WAIT_ERROR 0 -#define FLB_IO_WAIT_TIMEOUT 1 +#define FLB_IO_WAIT_TIMEDOUT 1 #define FLB_IO_WAIT_COMPLETE 2 typedef int flb_io_wait_ret; @@ -69,8 +69,8 @@ int flb_io_net_connect(struct flb_upstream_conn *u_conn, * * @param co may be set to null if sync * - * @param mask is an event types mask composed of MK_EVENT_ - * or the equivalent POLL + * @param mask is an event types mask composed of MK_EVENT_ + * or the equivalent POLL */ flb_io_wait_ret flb_io_wait(struct flb_upstream_conn *u_conn, uint32_t mask, struct flb_coro *co); diff --git a/src/flb_io.c b/src/flb_io.c index c9c42156f3a..b4ecfaed4cd 100644 --- a/src/flb_io.c +++ b/src/flb_io.c @@ -144,9 +144,70 @@ int flb_io_net_connect(struct flb_upstream_conn *u_conn, * or the equivalent POLL */ flb_io_wait_ret flb_io_wait(struct flb_upstream_conn *u_conn, uint32_t mask, - int timeout_ms) + struct flb_coro *co) { - + int ret; + uint32_t result_mask; + + if (u_conn->u->flags & FLB_IO_ASYNC) { + + /* If async, co must not be null */ + flb_bug(co != NULL); + + ret = mk_event_add(u_conn->evl, + u_conn->fd, + FLB_ENGINE_EV_THREAD, + mask, &u_conn->event); + if (ret == -1) { + /* + * If we failed here there no much that we can do, just + * let the caller we failed + */ + return -1; + } + + u_conn->coro = co; + + /* + * Return the control to the parent caller, we need to wait for + * the event loop to get back to us. + */ + flb_coro_yield(co, FLB_FALSE); + + /* We want this field to hold NULL at all times unless we are explicitly + * waiting to be resumed. + */ + u_conn->coro = NULL; + + /* Save events mask since mk_event_del() will reset it */ + result_mask = u_conn->event.mask; + + /* Remove the registered event */ + ret = mk_event_del(u_conn->evl, &u_conn->event); + if (ret == -1) { + return -1; + } + MK_EVENT_NEW(&u_conn->event); + + /* Check event status */ + /* Not yet apparent what could change the event status mask */ + if (mask & MK_EVENT_READ && !(result_mask & MK_EVENT_READ)) { + /* cleanup? */ + return FLB_IO_WAIT_ERROR; + } + + if (mask & MK_EVENT_READ && !(result_mask & MK_EVENT_WRITE)) { + /* cleanup? */ + return FLB_IO_WAIT_ERROR; + } + + /* Check if this is a timeout */ + if (u_conn->net_error == ETIMEDOUT) { + return FLB_IO_WAIT_TIMEDOUT; + } + + return FLB_IO_WAIT_COMPLETE; + } } static int net_io_write(struct flb_upstream_conn *u_conn, diff --git a/src/flb_upstream.c b/src/flb_upstream.c index e5553f2411f..ac22c11fe2e 100644 --- a/src/flb_upstream.c +++ b/src/flb_upstream.c @@ -765,6 +765,7 @@ int flb_upstream_conn_timeouts(struct mk_list *list) { time_t now; int drop; + int resume; struct mk_list *head; struct mk_list *u_head; struct mk_list *tmp; @@ -788,10 +789,16 @@ int flb_upstream_conn_timeouts(struct mk_list *list) u_conn = mk_list_entry(u_head, struct flb_upstream_conn, _head); drop = FLB_FALSE; + resume = FLB_FALSE; /* All connection timeouts */ if (u_conn->ts_timeout > 0 && u_conn->ts_timeout <= now) { + + /* timeout triggers a coro resume & raises net_error ETIMEDOUT */ + u_conn->net_error = ETIMEDOUT; + resume = FLB_TRUE; + if (u_conn->ts_timeout_subject == FLB_TIMEOUT_SUBJECT_CONNECT) { drop = FLB_TRUE; flb_error("[upstream] connection #%i to %s:%i timed out after " @@ -800,7 +807,6 @@ int flb_upstream_conn_timeouts(struct mk_list *list) u->tcp_host, u->tcp_port, u->net.connect_timeout); } else if (u_conn->ts_timeout_subject == FLB_TIMEOUT_SUBJECT_RESPONSE) { - drop = FLB_TRUE; flb_error("[upstream] connection #%i response to %s:%i timed out " "after %i seconds", u_conn->fd, @@ -819,9 +825,10 @@ int flb_upstream_conn_timeouts(struct mk_list *list) shutdown(u_conn->fd, SHUT_RDWR); } - u_conn->net_error = ETIMEDOUT; prepare_destroy_conn(u_conn); + } + if (resume == FLB_TRUE) { /* * If the connection has its coro field set it means it's waiting for a * FLB_ENGINE_EV_THREAD event which in some specific cases might never From 3b6f4166fc7930804c2864628ee51f84ece07340 Mon Sep 17 00:00:00 2001 From: Matthew Fala Date: Thu, 4 Nov 2021 17:40:57 +0000 Subject: [PATCH 5/7] reset timeout error no --- src/flb_io.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/flb_io.c b/src/flb_io.c index b4ecfaed4cd..799bdb0838b 100644 --- a/src/flb_io.c +++ b/src/flb_io.c @@ -201,8 +201,10 @@ flb_io_wait_ret flb_io_wait(struct flb_upstream_conn *u_conn, uint32_t mask, return FLB_IO_WAIT_ERROR; } - /* Check if this is a timeout */ + /* Check if resumed coro due to timeout */ if (u_conn->net_error == ETIMEDOUT) { + /* reset net_error */ + conn->net_error = -1; return FLB_IO_WAIT_TIMEDOUT; } From 9ae37d357b4f2f640024fb5dfbf58d3c4fc8c53c Mon Sep 17 00:00:00 2001 From: Matthew Fala Date: Thu, 4 Nov 2021 21:46:36 +0000 Subject: [PATCH 6/7] monkey errors --- src/flb_io.c | 52 ++++++++++++++++++++++++++++------------------------ 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/src/flb_io.c b/src/flb_io.c index 799bdb0838b..c0dc57ac5af 100644 --- a/src/flb_io.c +++ b/src/flb_io.c @@ -149,67 +149,71 @@ flb_io_wait_ret flb_io_wait(struct flb_upstream_conn *u_conn, uint32_t mask, int ret; uint32_t result_mask; + /* Asynchronous wait */ if (u_conn->u->flags & FLB_IO_ASYNC) { - /* If async, co must not be null */ flb_bug(co != NULL); + /* Add new monkey event to event loop */ ret = mk_event_add(u_conn->evl, u_conn->fd, FLB_ENGINE_EV_THREAD, mask, &u_conn->event); if (ret == -1) { /* - * If we failed here there no much that we can do, just - * let the caller we failed + * Failed to add the monkey event. + * let the caller know we failed */ - return -1; + return FLB_IO_WAIT_ERROR; } - u_conn->coro = co; - /* * Return the control to the parent caller, we need to wait for * the event loop to get back to us. - */ - flb_coro_yield(co, FLB_FALSE); - - /* We want this field to hold NULL at all times unless we are explicitly + * + * u_conn->coro should hold NULL at all times unless we are explicitly * waiting to be resumed. */ + u_conn->coro = co; + flb_coro_yield(co, FLB_FALSE); u_conn->coro = NULL; - /* Save events mask since mk_event_del() will reset it */ + /* + * Async wait complete, and thread resumed. Remove the registered + * event + */ result_mask = u_conn->event.mask; - - /* Remove the registered event */ - ret = mk_event_del(u_conn->evl, &u_conn->event); - if (ret == -1) { - return -1; - } - MK_EVENT_NEW(&u_conn->event); + ret = mk_event_del(u_conn->evl, &u_conn->event); + if (ret == -1) { + return -1; + } + MK_EVENT_NEW(&u_conn->event); /* Check event status */ - /* Not yet apparent what could change the event status mask */ + /* TODO: Not yet apparent what mk_event op could change the event status mask */ if (mask & MK_EVENT_READ && !(result_mask & MK_EVENT_READ)) { - /* cleanup? */ + u_conn->net_error = -1; return FLB_IO_WAIT_ERROR; } - if (mask & MK_EVENT_READ && !(result_mask & MK_EVENT_WRITE)) { - /* cleanup? */ + u_conn->net_error = -1; return FLB_IO_WAIT_ERROR; } /* Check if resumed coro due to timeout */ if (u_conn->net_error == ETIMEDOUT) { - /* reset net_error */ - conn->net_error = -1; + u_conn->net_error = -1; /* reset net_error */ return FLB_IO_WAIT_TIMEDOUT; } return FLB_IO_WAIT_COMPLETE; } + + /* Synchronous wait */ + else { + + + } } static int net_io_write(struct flb_upstream_conn *u_conn, From eeb7e5379283648d956345840e351d16baa31015 Mon Sep 17 00:00:00 2001 From: Matthew Fala Date: Fri, 5 Nov 2021 02:49:18 +0000 Subject: [PATCH 7/7] Sync case --- src/flb_io.c | 46 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/src/flb_io.c b/src/flb_io.c index c0dc57ac5af..e3f2ceab4b7 100644 --- a/src/flb_io.c +++ b/src/flb_io.c @@ -138,6 +138,10 @@ int flb_io_net_connect(struct flb_upstream_conn *u_conn, * Return FLB_IO_WAIT_TIMEOUT on timeout * Return FLB_IO_WAIT_COMPLETE on complete * + * For synchronous waits, the u_conn socket file descriptor must be set to nonblocking + * flb_net_socket_nonblocking(u_conn->fd); + * flb_net_socket_blocking(u_conn->fd); + * * It is the responsability of the caller to set u_conn->coro is async * * @param mask is an event types mask composed of MK_EVENT_ @@ -148,22 +152,28 @@ flb_io_wait_ret flb_io_wait(struct flb_upstream_conn *u_conn, uint32_t mask, { int ret; uint32_t result_mask; + struct pollfd pfd_read; + time_t now; + + /* To wait, MK_EVENT_READ or MK_EVENT_WRITE must included in event mask */ + flb_bug(!(mask & MK_EVENT_READ) && !(mask & MK_EVENT_WRITE)); + int now = TIME(NULL); - /* Asynchronous wait */ + /* Asynchronous event loop wait */ if (u_conn->u->flags & FLB_IO_ASYNC) { /* If async, co must not be null */ flb_bug(co != NULL); + /* Set event loop timeout */ + + /* Add new monkey event to event loop */ ret = mk_event_add(u_conn->evl, u_conn->fd, FLB_ENGINE_EV_THREAD, mask, &u_conn->event); if (ret == -1) { - /* - * Failed to add the monkey event. - * let the caller know we failed - */ + /* Monkey event creation error */ return FLB_IO_WAIT_ERROR; } @@ -185,14 +195,15 @@ flb_io_wait_ret flb_io_wait(struct flb_upstream_conn *u_conn, uint32_t mask, result_mask = u_conn->event.mask; ret = mk_event_del(u_conn->evl, &u_conn->event); if (ret == -1) { - return -1; + flb_error("[io fd=%i] error deleting monkey event", u_conn->fd); + return FLB_IO_WAIT_ERROR; } MK_EVENT_NEW(&u_conn->event); /* Check event status */ /* TODO: Not yet apparent what mk_event op could change the event status mask */ if (mask & MK_EVENT_READ && !(result_mask & MK_EVENT_READ)) { - u_conn->net_error = -1; + u_conn->net_error = -1; /* reset net_error */ return FLB_IO_WAIT_ERROR; } if (mask & MK_EVENT_READ && !(result_mask & MK_EVENT_WRITE)) { @@ -202,17 +213,34 @@ flb_io_wait_ret flb_io_wait(struct flb_upstream_conn *u_conn, uint32_t mask, /* Check if resumed coro due to timeout */ if (u_conn->net_error == ETIMEDOUT) { - u_conn->net_error = -1; /* reset net_error */ + u_conn->net_error = -1; /* reset net_error */ return FLB_IO_WAIT_TIMEDOUT; } return FLB_IO_WAIT_COMPLETE; } - /* Synchronous wait */ + /* Synchronous blocking wait */ else { + if (mask & MK_EVENT_READ) { + pfd_read.events |= POLLIN; + } + if (mask & MK_EVENT_WRITE) { + pfd_read.events |= POLLOUT; + } + pfd_read.fd = u_conn->fd; + ret = poll(&pfd_read, 1, u_conn->ts_timeout * 1000); + if (ret == 0) { + /* Timeout */ + return FLB_IO_WAIT_TIMEDOUT; + } + else if (ret < 0) { + /* Generic poll error */ + return FLB_IO_WAIT_ERROR; + } + return FLB_IO_WAIT_COMPLETE; } }