Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract poll sync+async timeout diff #5

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions include/fluent-bit/flb_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,39 @@
/* Other features */
#define FLB_IO_IPV6 32 /* network I/O uses IPv6 */

/* IO Wait */
#define FLB_IO_WAIT_ERROR 0
#define FLB_IO_WAIT_TIMEDOUT 1
#define FLB_IO_WAIT_COMPLETE 2
typedef int flb_io_wait_ret;

int flb_io_net_connect(struct flb_upstream_conn *u_conn,
struct flb_coro *th);

/*
* Wait for connection via async:mk_event_loop or sync:poll(2)
* Uses monkey event loop if async,
* Otherwise sync blocking wait.
*
* currently timeout only supported for sync waits
*
* If timeout_ms is -1, then there is no timeout.
*
* u_conn->coro and u_conn->fd must be set.
* Return FLB_IO_WAIT_ERROR on failure
* Return FLB_IO_WAIT_TIMEOUT on timeout
* Return FLB_IO_WAIT_COMPLETE on complete
*
* It is the responsability of the caller to set u_conn->coro is async
*
* @param co may be set to null if sync
*
* @param mask is an event types mask composed of MK_EVENT_<READ, WRITE>
* or the equivalent POLL<IN, OUT>
*/
flb_io_wait_ret flb_io_wait(struct flb_upstream_conn *u_conn, uint32_t mask,
struct flb_coro *co);

int flb_io_net_write(struct flb_upstream_conn *u, const void *data,
size_t len, size_t *out_len);
ssize_t flb_io_net_read(struct flb_upstream_conn *u, void *buf, size_t len);
Expand Down
3 changes: 3 additions & 0 deletions include/fluent-bit/flb_network.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ struct flb_net_setup {
/* max time in seconds to wait for a established connection */
int connect_timeout;

/* max time in seconds to wait for a network response */
int response_timeout;

/* network interface to bind and use to send data */
flb_sds_t source_address;

Expand Down
8 changes: 7 additions & 1 deletion include/fluent-bit/flb_upstream_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
#include <mbedtls/net.h>
#endif

#define FLB_TIMEOUT_SUBJECT_NULL 0
#define FLB_TIMEOUT_SUBJECT_CONNECT 1
#define FLB_TIMEOUT_SUBJECT_RESPONSE 2
typedef int flb_timeout_subject;

/* Upstream TCP connection */
struct flb_upstream_conn {
struct mk_event event;
Expand Down Expand Up @@ -71,7 +76,8 @@ struct flb_upstream_conn {

/* Connect */
time_t ts_connect_start;
time_t ts_connect_timeout;
time_t ts_timeout;
flb_timeout_subject ts_timeout_subject;

/* Event loop */
struct mk_event_loop *evl;
Expand Down
183 changes: 182 additions & 1 deletion src/flb_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@
#include <limits.h>
#include <assert.h>

#ifdef FLB_SYSTEM_WINDOWS
#define poll WSAPoll
#else
#include <sys/poll.h>
#endif

#include <monkey/mk_core.h>
#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_config.h>
Expand Down Expand Up @@ -118,6 +124,126 @@ int flb_io_net_connect(struct flb_upstream_conn *u_conn,
return 0;
}

/*
* Wait for connection via async:mk_event_loop or sync:poll(2)
* Uses monkey event loop if async,
* Otherwise sync blocking wait.
*
* currently timeout only supported for sync waits
*
* If timeout_ms is -1, then there is no timeout.
*
* u_conn->coro and u_conn->fd must be set.
* Return FLB_IO_WAIT_ERROR on failure
* Return FLB_IO_WAIT_TIMEOUT on timeout
* Return FLB_IO_WAIT_COMPLETE on complete
*
* For synchronous waits, the u_conn socket file descriptor must be set to nonblocking
* flb_net_socket_nonblocking(u_conn->fd);
* flb_net_socket_blocking(u_conn->fd);
*
* It is the responsability of the caller to set u_conn->coro is async
*
* @param mask is an event types mask composed of MK_EVENT_<READ, WRITE, ...>
* or the equivalent POLL<IN, OUT, ...>
*/
flb_io_wait_ret flb_io_wait(struct flb_upstream_conn *u_conn, uint32_t mask,
struct flb_coro *co)
{
int ret;
uint32_t result_mask;
struct pollfd pfd_read;
time_t now;

/* To wait, MK_EVENT_READ or MK_EVENT_WRITE must included in event mask */
flb_bug(!(mask & MK_EVENT_READ) && !(mask & MK_EVENT_WRITE));
int now = TIME(NULL);

/* Asynchronous event loop wait */
if (u_conn->u->flags & FLB_IO_ASYNC) {
/* If async, co must not be null */
flb_bug(co != NULL);

/* Set event loop timeout */


/* Add new monkey event to event loop */
ret = mk_event_add(u_conn->evl,
u_conn->fd,
FLB_ENGINE_EV_THREAD,
mask, &u_conn->event);
if (ret == -1) {
/* Monkey event creation error */
return FLB_IO_WAIT_ERROR;
}

/*
* Return the control to the parent caller, we need to wait for
* the event loop to get back to us.
*
* u_conn->coro should hold NULL at all times unless we are explicitly
* waiting to be resumed.
*/
u_conn->coro = co;
flb_coro_yield(co, FLB_FALSE);
u_conn->coro = NULL;

/*
* Async wait complete, and thread resumed. Remove the registered
* event
*/
result_mask = u_conn->event.mask;
ret = mk_event_del(u_conn->evl, &u_conn->event);
if (ret == -1) {
flb_error("[io fd=%i] error deleting monkey event", u_conn->fd);
return FLB_IO_WAIT_ERROR;
}
MK_EVENT_NEW(&u_conn->event);

/* Check event status */
/* TODO: Not yet apparent what mk_event op could change the event status mask */
if (mask & MK_EVENT_READ && !(result_mask & MK_EVENT_READ)) {
u_conn->net_error = -1; /* reset net_error */
return FLB_IO_WAIT_ERROR;
}
if (mask & MK_EVENT_READ && !(result_mask & MK_EVENT_WRITE)) {
u_conn->net_error = -1;
return FLB_IO_WAIT_ERROR;
}

/* Check if resumed coro due to timeout */
if (u_conn->net_error == ETIMEDOUT) {
u_conn->net_error = -1; /* reset net_error */
return FLB_IO_WAIT_TIMEDOUT;
}

return FLB_IO_WAIT_COMPLETE;
}

/* Synchronous blocking wait */
else {
if (mask & MK_EVENT_READ) {
pfd_read.events |= POLLIN;
}
if (mask & MK_EVENT_WRITE) {
pfd_read.events |= POLLOUT;
}

pfd_read.fd = u_conn->fd;
ret = poll(&pfd_read, 1, u_conn->ts_timeout * 1000);
if (ret == 0) {
/* Timeout */
return FLB_IO_WAIT_TIMEDOUT;
}
else if (ret < 0) {
/* Generic poll error */
return FLB_IO_WAIT_ERROR;
}

return FLB_IO_WAIT_COMPLETE;
}
}

static int net_io_write(struct flb_upstream_conn *u_conn,
const void *data, size_t len, size_t *out_len)
{
Expand Down Expand Up @@ -309,12 +435,67 @@ static ssize_t net_io_read(struct flb_upstream_conn *u_conn,
void *buf, size_t len)
{
int ret;
struct pollfd pfd_read;

/* Set socket to non-blocking mode for timeout */
flb_net_socket_nonblocking(u_conn->fd);

ret = recv(u_conn->fd, buf, len, 0);
if (ret == -1) {
return -1;
/*
* An asynchronous recv can return -1, but what is important is the
* socket status, getting a EWOULDBLOCK is expected, but any other case
* means a failure.
*/
if (!FLB_WOULDBLOCK()) {
/* Generic error */
flb_warn("[net] io_read #%i failed from: %s:%i",
u_conn->fd, u_conn->u->tcp_host, u_conn->u->tcp_port);
flb_net_socket_blocking(u_conn->fd);
return -1;
}

/* The connection is still in progress, implement a socket timeout */
flb_trace("[net] io_read #%i in progress from: %s:%i",
u_conn->fd, u_conn->u->tcp_host, u_conn->u->tcp_port);
/*
* Prepare a timeout using poll(2): we could use our own
* event loop mechanism for this, but it will require an
* extra file descriptor, the poll(2) call is straightforward
* for this use case.
*/

pfd_read.fd = u_conn->fd;
pfd_read.events = POLLIN;
ret = poll(&pfd_read, 1, u_conn->u->net.connect_timeout * 1000);
if (ret == 0) {
/* Timeout */
flb_warn("[net] io_read #%i timeout after %i seconds from: "
"%s:%i",
u_conn->fd, u_conn->u->net.connect_timeout,
u_conn->u->tcp_host, u_conn->u->tcp_port);
flb_socket_close(u_conn->fd);
flb_net_socket_blocking(u_conn->fd);
return -1;
}
else if (ret < 0) {
/* Generic error */
flb_warn("[net] io_read #%i failed from: %s:%i",
u_conn->fd, u_conn->u->tcp_host, u_conn->u->tcp_port);
flb_socket_close(u_conn->fd);
flb_net_socket_blocking(u_conn->fd);
return -1;
}

/* Get data */
ret = recv(u_conn->fd, buf, len, 0);
}

/*
* The read succeeded, return the normal
* non-blocking mode to the socket.
*/
flb_net_socket_blocking(u_conn->fd);
return ret;
}

Expand Down
44 changes: 31 additions & 13 deletions src/flb_upstream.c
Original file line number Diff line number Diff line change
Expand Up @@ -488,10 +488,12 @@ static struct flb_upstream_conn *create_conn(struct flb_upstream *u)
conn->evl = evl;

if (u->net.connect_timeout > 0) {
conn->ts_connect_timeout = now + u->net.connect_timeout;
conn->ts_timeout_subject = FLB_TIMEOUT_SUBJECT_CONNECT;
conn->ts_timeout = now + u->net.connect_timeout;
}
else {
conn->ts_connect_timeout = -1;
conn->ts_timeout_subject = NULL;
conn->ts_timeout = -1;
}

#ifdef FLB_HAVE_TLS
Expand Down Expand Up @@ -541,7 +543,8 @@ static struct flb_upstream_conn *create_conn(struct flb_upstream *u)
}

/* Invalidate timeout for connection */
conn->ts_connect_timeout = -1;
conn->ts_timeout_subject = NULL;
conn->ts_timeout = -1;
conn->busy_flag = FLB_FALSE;

return conn;
Expand Down Expand Up @@ -762,6 +765,7 @@ int flb_upstream_conn_timeouts(struct mk_list *list)
{
time_t now;
int drop;
int resume;
struct mk_list *head;
struct mk_list *u_head;
struct mk_list *tmp;
Expand All @@ -785,16 +789,29 @@ int flb_upstream_conn_timeouts(struct mk_list *list)
u_conn = mk_list_entry(u_head, struct flb_upstream_conn, _head);

drop = FLB_FALSE;
resume = FLB_FALSE;

/* Connect timeouts */
if (u->net.connect_timeout > 0 &&
u_conn->ts_connect_timeout > 0 &&
u_conn->ts_connect_timeout <= now) {
drop = FLB_TRUE;
flb_error("[upstream] connection #%i to %s:%i timed out after "
"%i seconds",
u_conn->fd,
u->tcp_host, u->tcp_port, u->net.connect_timeout);
/* All connection timeouts */
if (u_conn->ts_timeout > 0 &&
u_conn->ts_timeout <= now) {

/* timeout triggers a coro resume & raises net_error ETIMEDOUT */
u_conn->net_error = ETIMEDOUT;
resume = FLB_TRUE;

if (u_conn->ts_timeout_subject == FLB_TIMEOUT_SUBJECT_CONNECT) {
drop = FLB_TRUE;
flb_error("[upstream] connection #%i to %s:%i timed out after "
"%i seconds",
u_conn->fd,
u->tcp_host, u->tcp_port, u->net.connect_timeout);
}
else if (u_conn->ts_timeout_subject == FLB_TIMEOUT_SUBJECT_RESPONSE) {
flb_error("[upstream] connection #%i response to %s:%i timed out "
"after %i seconds",
u_conn->fd,
u->tcp_host, u->tcp_port, u->net.response_timeout);
}
}

if (drop == FLB_TRUE) {
Expand All @@ -808,9 +825,10 @@ int flb_upstream_conn_timeouts(struct mk_list *list)
shutdown(u_conn->fd, SHUT_RDWR);
}

u_conn->net_error = ETIMEDOUT;
prepare_destroy_conn(u_conn);
}

if (resume == FLB_TRUE) {
/*
* If the connection has its coro field set it means it's waiting for a
* FLB_ENGINE_EV_THREAD event which in some specific cases might never
Expand Down
4 changes: 2 additions & 2 deletions src/tls/flb_tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ int flb_tls_session_create(struct flb_tls *tls,

/* Connect timeout */
if (u->net.connect_timeout > 0 &&
u_conn->ts_connect_timeout > 0 &&
u_conn->ts_connect_timeout <= time(NULL)) {
u_conn->ts_timeout > 0 &&
u_conn->ts_timeout <= time(NULL)) {
flb_error("[io_tls] handshake connection #%i to %s:%i timed out after "
"%i seconds",
u_conn->fd,
Expand Down