diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index e1360e8fdef..f7cc224dac5 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -45,6 +45,7 @@ struct flb_config { int support_mode; /* enterprise support mode ? */ int is_ingestion_active; /* date ingestion active/allowed */ + int is_shutting_down; /* is the service shutting down ? */ int is_running; /* service running ? */ double flush; /* Flush timeout */ int grace; /* Maximum grace time on shutdown */ diff --git a/include/fluent-bit/flb_upstream.h b/include/fluent-bit/flb_upstream.h index b7f67802aad..0393c35f6fd 100644 --- a/include/fluent-bit/flb_upstream.h +++ b/include/fluent-bit/flb_upstream.h @@ -82,9 +82,16 @@ struct flb_upstream { struct flb_tls *tls; #endif + struct flb_config *config; struct mk_list _head; }; + +static inline int flb_upstream_is_shutting_down(struct flb_upstream *u) +{ + return u->config->is_shutting_down; +} + void flb_upstream_queue_init(struct flb_upstream_queue *uq); struct flb_upstream_queue *flb_upstream_queue_get(struct flb_upstream *u); void flb_upstream_list_set(struct mk_list *list); diff --git a/src/flb_engine.c b/src/flb_engine.c index 99f5920df5e..4ae9663a390 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -234,7 +234,7 @@ static inline int handle_output_event(flb_pipefd_t fd, uint64_t ts, flb_task_retry_clean(task, ins); flb_task_users_dec(task, FLB_TRUE); } - else if (ret == FLB_RETRY) { + else if (ret == FLB_RETRY && config->is_running && !config->is_shutting_down) { if (ins->retry_limit == FLB_OUT_RETRY_NONE) { /* cmetrics: output_dropped_records_total */ cmt_counter_add(ins->cmt_dropped_records, ts, task->records, @@ -859,6 +859,7 @@ int flb_engine_exit(struct flb_config *config) uint64_t val = FLB_ENGINE_EV_STOP; config->is_ingestion_active = FLB_FALSE; + config->is_shutting_down = FLB_TRUE; flb_input_pause_all(config); diff --git a/src/flb_network.c b/src/flb_network.c index d25ab457470..8036a113145 100644 --- a/src/flb_network.c +++ b/src/flb_network.c @@ -444,6 +444,16 @@ static int net_connect_async(int fd, /* Save the mask before the event handler do a reset */ mask = u_conn->event.mask; + /* + * If the socket has been invalidated (e.g: timeout or shutdown), just + * print a debug message and return. + */ + if (u_conn->fd == -1) { + flb_debug("[net] TCP connection not longer available: %s:%i", + u->tcp_host, u->tcp_port); + return -1; + } + /* We got a notification, remove the event registered */ ret = mk_event_del(u_conn->evl, &u_conn->event); if (ret == -1) { @@ -1208,7 +1218,7 @@ flb_sockfd_t flb_net_tcp_connect(const char *host, unsigned long port, } if (fd == -1) { - flb_error("[net] could not connect to %s:%s", + flb_debug("[net] could not connect to %s:%s", host, _port); } diff --git a/src/flb_upstream.c b/src/flb_upstream.c index bbc83556fce..99ff826a2bc 100644 --- a/src/flb_upstream.c +++ b/src/flb_upstream.c @@ -209,6 +209,7 @@ struct flb_upstream *flb_upstream_create(struct flb_config *config, flb_errno(); return NULL; } + u->config = config; /* Set default networking setup values */ flb_net_setup_init(&u->net); @@ -254,7 +255,6 @@ struct flb_upstream *flb_upstream_create(struct flb_config *config, u->flags |= FLB_IO_ASYNC; u->thread_safe = FLB_FALSE; - /* Initialize queues */ flb_upstream_queue_init(&u->queue); @@ -429,15 +429,19 @@ static int prepare_destroy_conn(struct flb_upstream_conn *u_conn) static inline int prepare_destroy_conn_safe(struct flb_upstream_conn *u_conn) { int ret; + int locked = FLB_FALSE; struct flb_upstream *u = u_conn->u; if (u->thread_safe == FLB_TRUE) { - pthread_mutex_lock(&u->mutex_lists); + ret = pthread_mutex_trylock(&u->mutex_lists); + if (ret == 0) { + locked = FLB_TRUE; + } } ret = prepare_destroy_conn(u_conn); - if (u->thread_safe == FLB_TRUE) { + if (locked) { pthread_mutex_unlock(&u->mutex_lists); } @@ -791,10 +795,13 @@ int flb_upstream_conn_timeouts(struct mk_list *list) u_conn->ts_connect_timeout > 0 && u_conn->ts_connect_timeout <= now) { drop = FLB_TRUE; - flb_error("[upstream] connection #%i to %s:%i timed out after " - "%i seconds", - u_conn->fd, - u->tcp_host, u->tcp_port, u->net.connect_timeout); + + if (!flb_upstream_is_shutting_down(u)) { + flb_error("[upstream] connection #%i to %s:%i timed out after " + "%i seconds", + u_conn->fd, + u->tcp_host, u->tcp_port, u->net.connect_timeout); + } } if (drop == FLB_TRUE) {