From 9938a7a4395d5068efbbb698d81d993f87d61776 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 23 Nov 2021 09:07:46 -0600 Subject: [PATCH 1/7] upstream: fix deadlock when destroying connections When workers are enabled and a timeout occurs in a connection most of cases a deadlock is held in the active worker: ==1654992== Thread #4: Attempt to re-lock a non-recursive lock I already hold ==1654992== at 0x484BB44: ??? (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so) ==1654992== by 0x197579: prepare_destroy_conn_safe (flb_upstream.c:435) ==1654992== by 0x197887: create_conn (flb_upstream.c:533) ==1654992== by 0x197DBB: flb_upstream_conn_get (flb_upstream.c:674) ==1654992== by 0x2396D3: http_post (http.c:86) ==1654992== by 0x23A5E5: cb_http_flush (http.c:338) ==1654992== by 0x17FE6B: output_pre_cb_flush (flb_output.h:511) ==1654992== by 0x503DAA: co_init (amd64.c:117) ==1654992== Lock was previously acquired ==1654992== at 0x484BC0F: ??? (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so) ==1654992== by 0x19815F: flb_upstream_conn_timeouts (flb_upstream.c:780) ==1654992== by 0x17FEFC: cb_thread_sched_timer (flb_output_thread.c:58) ==1654992== by 0x193ED7: flb_sched_event_handler (flb_scheduler.c:422) ==1654992== by 0x180672: output_thread (flb_output_thread.c:265) ==1654992== by 0x199602: step_callback (flb_worker.c:44) ==1654992== by 0x484E8AA: ??? (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so) ==1654992== by 0x4E3F926: start_thread (pthread_create.c:435) ==1654992== by 0x4ECF9E3: clone (clone.S:100) The following patch fix the behavior on prepare_destroy_conn_safe by 'trying to acquire' the mutex lock, if it fails to acquire it, it will asssume it's already locked and no new lock is required. Signed-off-by: Eduardo Silva --- src/flb_upstream.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/flb_upstream.c b/src/flb_upstream.c index bbc83556fce..8837c27ec88 100644 --- a/src/flb_upstream.c +++ b/src/flb_upstream.c @@ -429,15 +429,19 @@ static int prepare_destroy_conn(struct flb_upstream_conn *u_conn) static inline int prepare_destroy_conn_safe(struct flb_upstream_conn *u_conn) { int ret; + int locked = FLB_FALSE; struct flb_upstream *u = u_conn->u; if (u->thread_safe == FLB_TRUE) { - pthread_mutex_lock(&u->mutex_lists); + ret = pthread_mutex_trylock(&u->mutex_lists); + if (ret == 0) { + locked = FLB_TRUE; + } } ret = prepare_destroy_conn(u_conn); - if (u->thread_safe == FLB_TRUE) { + if (u->thread_safe == FLB_TRUE && locked) { pthread_mutex_unlock(&u->mutex_lists); } From 0347e7a8ef042e74972402f87602bc49f69fe14e Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 23 Nov 2021 11:04:01 -0600 Subject: [PATCH 2/7] config: add new 'is_shutting_down' field Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_config.h | 1 + 1 file changed, 1 insertion(+) diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index e1360e8fdef..f7cc224dac5 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -45,6 +45,7 @@ struct flb_config { int support_mode; /* enterprise support mode ? */ int is_ingestion_active; /* date ingestion active/allowed */ + int is_shutting_down; /* is the service shutting down ? */ int is_running; /* service running ? */ double flush; /* Flush timeout */ int grace; /* Maximum grace time on shutdown */ From 6f3afc25bd4375ff292ccf8097299662ee000b4d Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 23 Nov 2021 11:04:17 -0600 Subject: [PATCH 3/7] engine: do not retry if the engine is shutting down Signed-off-by: Eduardo Silva --- src/flb_engine.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/flb_engine.c b/src/flb_engine.c index 99f5920df5e..4ae9663a390 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -234,7 +234,7 @@ static inline int handle_output_event(flb_pipefd_t fd, uint64_t ts, flb_task_retry_clean(task, ins); flb_task_users_dec(task, FLB_TRUE); } - else if (ret == FLB_RETRY) { + else if (ret == FLB_RETRY && config->is_running && !config->is_shutting_down) { if (ins->retry_limit == FLB_OUT_RETRY_NONE) { /* cmetrics: output_dropped_records_total */ cmt_counter_add(ins->cmt_dropped_records, ts, task->records, @@ -859,6 +859,7 @@ int flb_engine_exit(struct flb_config *config) uint64_t val = FLB_ENGINE_EV_STOP; config->is_ingestion_active = FLB_FALSE; + config->is_shutting_down = FLB_TRUE; flb_input_pause_all(config); From 8e1e5652ee2d9c283ecbe5a375fb67068280cb41 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 23 Nov 2021 11:21:38 -0600 Subject: [PATCH 4/7] network: detect if socket has been invalidated Signed-off-by: Eduardo Silva --- src/flb_network.c | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/flb_network.c b/src/flb_network.c index d25ab457470..736de2601ed 100644 --- a/src/flb_network.c +++ b/src/flb_network.c @@ -444,6 +444,16 @@ static int net_connect_async(int fd, /* Save the mask before the event handler do a reset */ mask = u_conn->event.mask; + /* + * If the socket has been invalidated (e.g: timeout or shutdown), just + * print a debug message and return. + */ + if (u_conn->fd == -1) { + flb_debug("[net] TCP connection not longer available: %s:%i", + u->tcp_host, u->tcp_port); + return -1; + } + /* We got a notification, remove the event registered */ ret = mk_event_del(u_conn->evl, &u_conn->event); if (ret == -1) { From fd1f909b66066671c0ce893d0ab3675cb98f56ff Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 23 Nov 2021 11:22:17 -0600 Subject: [PATCH 5/7] upstream: detect shutdown and reduce log noise Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_upstream.h | 7 +++++++ src/flb_upstream.c | 13 ++++++++----- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/include/fluent-bit/flb_upstream.h b/include/fluent-bit/flb_upstream.h index b7f67802aad..0393c35f6fd 100644 --- a/include/fluent-bit/flb_upstream.h +++ b/include/fluent-bit/flb_upstream.h @@ -82,9 +82,16 @@ struct flb_upstream { struct flb_tls *tls; #endif + struct flb_config *config; struct mk_list _head; }; + +static inline int flb_upstream_is_shutting_down(struct flb_upstream *u) +{ + return u->config->is_shutting_down; +} + void flb_upstream_queue_init(struct flb_upstream_queue *uq); struct flb_upstream_queue *flb_upstream_queue_get(struct flb_upstream *u); void flb_upstream_list_set(struct mk_list *list); diff --git a/src/flb_upstream.c b/src/flb_upstream.c index 8837c27ec88..d4a69ad4821 100644 --- a/src/flb_upstream.c +++ b/src/flb_upstream.c @@ -209,6 +209,7 @@ struct flb_upstream *flb_upstream_create(struct flb_config *config, flb_errno(); return NULL; } + u->config = config; /* Set default networking setup values */ flb_net_setup_init(&u->net); @@ -254,7 +255,6 @@ struct flb_upstream *flb_upstream_create(struct flb_config *config, u->flags |= FLB_IO_ASYNC; u->thread_safe = FLB_FALSE; - /* Initialize queues */ flb_upstream_queue_init(&u->queue); @@ -795,10 +795,13 @@ int flb_upstream_conn_timeouts(struct mk_list *list) u_conn->ts_connect_timeout > 0 && u_conn->ts_connect_timeout <= now) { drop = FLB_TRUE; - flb_error("[upstream] connection #%i to %s:%i timed out after " - "%i seconds", - u_conn->fd, - u->tcp_host, u->tcp_port, u->net.connect_timeout); + + if (!flb_upstream_is_shutting_down(u)) { + flb_error("[upstream] connection #%i to %s:%i timed out after " + "%i seconds", + u_conn->fd, + u->tcp_host, u->tcp_port, u->net.connect_timeout); + } } if (drop == FLB_TRUE) { From 790422625cf286178395ec434a57022344dd30cb Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 23 Nov 2021 16:04:15 -0600 Subject: [PATCH 6/7] network: on tcp connect change exception from error to debug Signed-off-by: Eduardo Silva --- src/flb_network.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flb_network.c b/src/flb_network.c index 736de2601ed..8036a113145 100644 --- a/src/flb_network.c +++ b/src/flb_network.c @@ -1218,7 +1218,7 @@ flb_sockfd_t flb_net_tcp_connect(const char *host, unsigned long port, } if (fd == -1) { - flb_error("[net] could not connect to %s:%s", + flb_debug("[net] could not connect to %s:%s", host, _port); } From e25137850d38f6504b16ac51cc144847bdd5179f Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Fri, 26 Nov 2021 19:03:16 -0600 Subject: [PATCH 7/7] upstream: just compare against 'locked' flag Signed-off-by: Eduardo Silva --- src/flb_upstream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flb_upstream.c b/src/flb_upstream.c index d4a69ad4821..99ff826a2bc 100644 --- a/src/flb_upstream.c +++ b/src/flb_upstream.c @@ -441,7 +441,7 @@ static inline int prepare_destroy_conn_safe(struct flb_upstream_conn *u_conn) ret = prepare_destroy_conn(u_conn); - if (u->thread_safe == FLB_TRUE && locked) { + if (locked) { pthread_mutex_unlock(&u->mutex_lists); }