Skip to content

Commit

Permalink
io: support timeout on synchronous calls
Browse files Browse the repository at this point in the history
Signed-off-by: Matthew Fala <[email protected]>
  • Loading branch information
matthewfala committed Oct 13, 2021
1 parent 4ec24d5 commit 4b2014d
Showing 1 changed file with 60 additions and 1 deletion.
61 changes: 60 additions & 1 deletion src/flb_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@
#include <limits.h>
#include <assert.h>

#ifdef FLB_SYSTEM_WINDOWS
#define poll WSAPoll
#else
#include <sys/poll.h>
#endif

#include <monkey/mk_core.h>
#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_config.h>
Expand Down Expand Up @@ -307,12 +313,65 @@ static ssize_t net_io_read(struct flb_upstream_conn *u_conn,
void *buf, size_t len)
{
int ret;
struct pollfd pfd_read;

/* Set socket to non-blocking mode for timeout */
flb_net_socket_nonblocking(u_conn->fd);

ret = recv(u_conn->fd, buf, len, 0);
if (ret == -1) {
return -1;
/*
* An asynchronous recv can return -1, but what is important is the
* socket status, getting a EWOULDBLOCK is expected, but any other case
* means a failure.
*/
if (!FLB_WOULDBLOCK()) {
/* Generic error */
flb_warn("[net] io_read #%i failed from: %s:%i",
u_conn->fd, u_conn->u->tcp_host, u_conn->u->tcp_port);
flb_net_socket_blocking(u_conn->fd);
return -1;
}

/* The connection is still in progress, implement a socket timeout */
flb_trace("[net] io_read #%i in progress from: %s:%i",
u_conn->fd, u_conn->u->tcp_host, u_conn->u->tcp_port);
/*
* Prepare a timeout using poll(2): we could use our own
* event loop mechanism for this, but it will require an
* extra file descriptor, the poll(2) call is straightforward
* for this use case.
*/

pfd_read.fd = u_conn->fd;
pfd_read.events = POLLIN;
ret = poll(&pfd_read, 1, u_conn->u->net.connect_timeout * 1000);
if (ret == 0) {
/* Timeout */
flb_warn("[net] io_read #%i timeout after %i seconds from: "
"%s:%i",
u_conn->fd, u_conn->u->net.connect_timeout,
u_conn->u->tcp_host, u_conn->u->tcp_port);
flb_net_socket_blocking(u_conn->fd);
return -1;
}
else if (ret < 0) {
/* Generic error */
flb_warn("[net] io_read #%i failed from: %s:%i",
u_conn->fd, u_conn->u->tcp_host, u_conn->u->tcp_port);
flb_net_socket_blocking(u_conn->fd);
return -1;
}

/* Get data */
ret = recv(u_conn->fd, buf, len, 0);
}

/*
* The read succeeded, return the normal
* non-blocking mode to the socket.
*/
flb_net_socket_blocking(u_conn->fd);
return ret;
}

Expand Down

0 comments on commit 4b2014d

Please sign in to comment.