From b2e8ff1ae738c1db7bf50942ef619609436ffe02 Mon Sep 17 00:00:00 2001 From: Leonardo Alminana Date: Thu, 20 Jul 2023 13:59:13 +0200 Subject: [PATCH 1/2] connection: a shutdown guard flag was added Signed-off-by: Leonardo Alminana --- include/fluent-bit/flb_upstream_conn.h | 8 ++++++++ src/flb_upstream.c | 2 ++ 2 files changed, 10 insertions(+) diff --git a/include/fluent-bit/flb_upstream_conn.h b/include/fluent-bit/flb_upstream_conn.h index 08e2741cef7..73663713743 100644 --- a/include/fluent-bit/flb_upstream_conn.h +++ b/include/fluent-bit/flb_upstream_conn.h @@ -66,6 +66,14 @@ struct flb_upstream_conn { */ int busy_flag; + /* This flag is used to determine if the connection was shut down to ensure we + * don't do it twice when a timeout is detected. + * + * This is required in order to overcome a limitation in the async read / write + * functions that will be addressed as soon as possible. + */ + int shutdown_flag; + /* Timestamps */ time_t ts_assigned; time_t ts_created; diff --git a/src/flb_upstream.c b/src/flb_upstream.c index 72f93c4a671..8bff47a3836 100644 --- a/src/flb_upstream.c +++ b/src/flb_upstream.c @@ -537,6 +537,8 @@ static struct flb_upstream_conn *create_conn(struct flb_upstream *u) conn->ts_available = 0; conn->ka_count = 0; conn->coro = coro; + conn->busy_flag = FLB_FALSE; + conn->shutdown_flag = FLB_FALSE; if (u->net.keepalive == FLB_TRUE) { flb_upstream_conn_recycle(conn, FLB_TRUE); From 9e2e5d1bffca92bbcc5001fcfc34c1d9ae2716db Mon Sep 17 00:00:00 2001 From: Leonardo Alminana Date: Thu, 20 Jul 2023 14:04:51 +0200 Subject: [PATCH 2/2] upstream: decoupled socket shutdown from upstream connection release Additionally, a flag to ensure that a socket is only shut down once has been added and the timeout based event injection and connection destruction mechanism was refactored to ensure that connections aren't prematurely destroyed and coroutines are properly resumed even when the priority event loop hits its iteration limit before the injected event is processed. Signed-off-by: Leonardo Alminana --- src/flb_upstream.c | 80 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 67 insertions(+), 13 deletions(-) diff --git a/src/flb_upstream.c b/src/flb_upstream.c index 8bff47a3836..dd16c3edb8a 100644 --- a/src/flb_upstream.c +++ b/src/flb_upstream.c @@ -414,6 +414,19 @@ struct flb_upstream *flb_upstream_create_url(struct flb_config *config, return u; } +/* This function shuts the connection down in order to cause + * any client code trying to read or write from it to fail. + */ +static void shutdown_conn(struct flb_upstream_conn *u_conn) +{ + if (u_conn->fd > 0 && + !u_conn->shutdown_flag) { + shutdown(u_conn->fd, SHUT_RDWR); + + u_conn->shutdown_flag = FLB_TRUE; + } +} + /* * This function moves the 'upstream connection' into the queue to be * destroyed. Note that the caller is responsible to validate and check @@ -434,7 +447,17 @@ static int prepare_destroy_conn(struct flb_upstream_conn *u_conn) } if (u_conn->fd > 0) { +#ifdef FLB_HAVE_TLS + if (u_conn->tls_session != NULL) { + flb_tls_session_destroy(u_conn->tls, u_conn); + + u_conn->tls_session = NULL; + } +#endif + shutdown_conn(u_conn); + flb_socket_close(u_conn->fd); + u_conn->fd = -1; u_conn->event.fd = -1; } @@ -800,7 +823,6 @@ int flb_upstream_conn_timeouts(struct mk_list *list) { time_t now; int drop; - int inject; struct mk_list *head; struct mk_list *u_head; struct mk_list *tmp; @@ -848,22 +870,54 @@ int flb_upstream_conn_timeouts(struct mk_list *list) } } - if (drop == FLB_TRUE) { - inject = FLB_FALSE; - if (u_conn->event.status != MK_EVENT_NONE) { - inject = FLB_TRUE; - } + if (drop == FLB_TRUE) { u_conn->net_error = ETIMEDOUT; - prepare_destroy_conn(u_conn); - /* - * prepare_destroy_conn calls mk_list_del on the event in the - * priority bucket queue, so for safety, we inject it afterwards + + /* We need to shut the connection down + * in order to cause some functions that are not + * aware of the connection error signaling + * mechanism to fail and abort. + * + * These functions do not check the net_error field + * in the connection instance upon being awakened + * so we need to ensure that any read/write + * operations on the socket generate an error. + * + * net_io_write_async + * net_io_read_async + * flb_tls_net_write_async + * flb_tls_net_read_async + * + * This operation could be selectively performed for + * connections that have already been established + * with no side effects because the connection + * establishment code honors `net_error` but + * taking in account that the previous version of + * the code did it unconditionally with no noticeable + * side effects leaving it that way is the safest + * choice at the moment. */ - if (inject == FLB_TRUE) { - mk_event_inject(u_conn->evl, &u_conn->event, - MK_EVENT_READ | MK_EVENT_WRITE, + + if (MK_EVENT_IS_REGISTERED((&u_conn->event))) { + shutdown_conn(u_conn); + + mk_event_inject(u_conn->evl, + &u_conn->event, + u_conn->event.mask, FLB_TRUE); } + else { + /* I can't think of a valid reason for this code path + * to be taken but considering that it was previously + * possible for it to happen (maybe wesley can shed + * some light on it if he remembers) I'll leave this + * for the moment. + * In any case, it's proven not to interfere with the + * coroutine awakening issue this change addresses. + */ + + prepare_destroy_conn(u_conn); + } } }