From 4b2014d05a818568d922a40cf2bb63afe78b7520 Mon Sep 17 00:00:00 2001 From: Matthew Fala Date: Tue, 12 Oct 2021 15:39:39 -0700 Subject: [PATCH] io: support timeout on synchronous calls Signed-off-by: Matthew Fala --- src/flb_io.c | 61 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/src/flb_io.c b/src/flb_io.c index 30bbabd9396..5faa840c001 100644 --- a/src/flb_io.c +++ b/src/flb_io.c @@ -48,6 +48,12 @@ #include #include +#ifdef FLB_SYSTEM_WINDOWS +#define poll WSAPoll +#else +#include +#endif + #include #include #include @@ -307,12 +313,65 @@ static ssize_t net_io_read(struct flb_upstream_conn *u_conn, void *buf, size_t len) { int ret; + struct pollfd pfd_read; + + /* Set socket to non-blocking mode for timeout */ + flb_net_socket_nonblocking(u_conn->fd); ret = recv(u_conn->fd, buf, len, 0); if (ret == -1) { - return -1; + /* + * An asynchronous recv can return -1, but what is important is the + * socket status, getting a EWOULDBLOCK is expected, but any other case + * means a failure. + */ + if (!FLB_WOULDBLOCK()) { + /* Generic error */ + flb_warn("[net] io_read #%i failed from: %s:%i", + u_conn->fd, u_conn->u->tcp_host, u_conn->u->tcp_port); + flb_net_socket_blocking(u_conn->fd); + return -1; + } + + /* The connection is still in progress, implement a socket timeout */ + flb_trace("[net] io_read #%i in progress from: %s:%i", + u_conn->fd, u_conn->u->tcp_host, u_conn->u->tcp_port); + /* + * Prepare a timeout using poll(2): we could use our own + * event loop mechanism for this, but it will require an + * extra file descriptor, the poll(2) call is straightforward + * for this use case. + */ + + pfd_read.fd = u_conn->fd; + pfd_read.events = POLLIN; + ret = poll(&pfd_read, 1, u_conn->u->net.connect_timeout * 1000); + if (ret == 0) { + /* Timeout */ + flb_warn("[net] io_read #%i timeout after %i seconds from: " + "%s:%i", + u_conn->fd, u_conn->u->net.connect_timeout, + u_conn->u->tcp_host, u_conn->u->tcp_port); + flb_net_socket_blocking(u_conn->fd); + return -1; + } + else if (ret < 0) { + /* Generic error */ + flb_warn("[net] io_read #%i failed from: %s:%i", + u_conn->fd, u_conn->u->tcp_host, u_conn->u->tcp_port); + flb_net_socket_blocking(u_conn->fd); + return -1; + } + + /* Get data */ + ret = recv(u_conn->fd, buf, len, 0); } + /* + * The read succeeded, return the normal + * non-blocking mode to the socket. + */ + flb_net_socket_blocking(u_conn->fd); return ret; }