diff --git a/include/fluent-bit/flb_upstream_conn.h b/include/fluent-bit/flb_upstream_conn.h index 08e2741cef7..73663713743 100644 --- a/include/fluent-bit/flb_upstream_conn.h +++ b/include/fluent-bit/flb_upstream_conn.h @@ -66,6 +66,14 @@ struct flb_upstream_conn { */ int busy_flag; + /* This flag is used to determine if the connection was shut down to ensure we + * don't do it twice when a timeout is detected. + * + * This is required in order to overcome a limitation in the async read / write + * functions that will be addressed as soon as possible. + */ + int shutdown_flag; + /* Timestamps */ time_t ts_assigned; time_t ts_created; diff --git a/src/flb_upstream.c b/src/flb_upstream.c index 72f93c4a671..dd16c3edb8a 100644 --- a/src/flb_upstream.c +++ b/src/flb_upstream.c @@ -414,6 +414,19 @@ struct flb_upstream *flb_upstream_create_url(struct flb_config *config, return u; } +/* This function shuts the connection down in order to cause + * any client code trying to read or write from it to fail. + */ +static void shutdown_conn(struct flb_upstream_conn *u_conn) +{ + if (u_conn->fd > 0 && + !u_conn->shutdown_flag) { + shutdown(u_conn->fd, SHUT_RDWR); + + u_conn->shutdown_flag = FLB_TRUE; + } +} + /* * This function moves the 'upstream connection' into the queue to be * destroyed. Note that the caller is responsible to validate and check @@ -434,7 +447,17 @@ static int prepare_destroy_conn(struct flb_upstream_conn *u_conn) } if (u_conn->fd > 0) { +#ifdef FLB_HAVE_TLS + if (u_conn->tls_session != NULL) { + flb_tls_session_destroy(u_conn->tls, u_conn); + + u_conn->tls_session = NULL; + } +#endif + shutdown_conn(u_conn); + flb_socket_close(u_conn->fd); + u_conn->fd = -1; u_conn->event.fd = -1; } @@ -537,6 +560,8 @@ static struct flb_upstream_conn *create_conn(struct flb_upstream *u) conn->ts_available = 0; conn->ka_count = 0; conn->coro = coro; + conn->busy_flag = FLB_FALSE; + conn->shutdown_flag = FLB_FALSE; if (u->net.keepalive == FLB_TRUE) { flb_upstream_conn_recycle(conn, FLB_TRUE); @@ -798,7 +823,6 @@ int flb_upstream_conn_timeouts(struct mk_list *list) { time_t now; int drop; - int inject; struct mk_list *head; struct mk_list *u_head; struct mk_list *tmp; @@ -846,22 +870,54 @@ int flb_upstream_conn_timeouts(struct mk_list *list) } } - if (drop == FLB_TRUE) { - inject = FLB_FALSE; - if (u_conn->event.status != MK_EVENT_NONE) { - inject = FLB_TRUE; - } + if (drop == FLB_TRUE) { u_conn->net_error = ETIMEDOUT; - prepare_destroy_conn(u_conn); - /* - * prepare_destroy_conn calls mk_list_del on the event in the - * priority bucket queue, so for safety, we inject it afterwards + + /* We need to shut the connection down + * in order to cause some functions that are not + * aware of the connection error signaling + * mechanism to fail and abort. + * + * These functions do not check the net_error field + * in the connection instance upon being awakened + * so we need to ensure that any read/write + * operations on the socket generate an error. + * + * net_io_write_async + * net_io_read_async + * flb_tls_net_write_async + * flb_tls_net_read_async + * + * This operation could be selectively performed for + * connections that have already been established + * with no side effects because the connection + * establishment code honors `net_error` but + * taking in account that the previous version of + * the code did it unconditionally with no noticeable + * side effects leaving it that way is the safest + * choice at the moment. */ - if (inject == FLB_TRUE) { - mk_event_inject(u_conn->evl, &u_conn->event, - MK_EVENT_READ | MK_EVENT_WRITE, + + if (MK_EVENT_IS_REGISTERED((&u_conn->event))) { + shutdown_conn(u_conn); + + mk_event_inject(u_conn->evl, + &u_conn->event, + u_conn->event.mask, FLB_TRUE); } + else { + /* I can't think of a valid reason for this code path + * to be taken but considering that it was previously + * possible for it to happen (maybe wesley can shed + * some light on it if he remembers) I'll leave this + * for the moment. + * In any case, it's proven not to interfere with the + * coroutine awakening issue this change addresses. + */ + + prepare_destroy_conn(u_conn); + } } }