Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upstream: fix deadlock when destroying connections #4362

Merged
merged 7 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct flb_config {

int support_mode; /* enterprise support mode ? */
int is_ingestion_active; /* date ingestion active/allowed */
int is_shutting_down; /* is the service shutting down ? */
int is_running; /* service running ? */
double flush; /* Flush timeout */
int grace; /* Maximum grace time on shutdown */
Expand Down
7 changes: 7 additions & 0 deletions include/fluent-bit/flb_upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,16 @@ struct flb_upstream {
struct flb_tls *tls;
#endif

struct flb_config *config;
struct mk_list _head;
};


static inline int flb_upstream_is_shutting_down(struct flb_upstream *u)
{
return u->config->is_shutting_down;
}

void flb_upstream_queue_init(struct flb_upstream_queue *uq);
struct flb_upstream_queue *flb_upstream_queue_get(struct flb_upstream *u);
void flb_upstream_list_set(struct mk_list *list);
Expand Down
3 changes: 2 additions & 1 deletion src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ static inline int handle_output_event(flb_pipefd_t fd, uint64_t ts,
flb_task_retry_clean(task, ins);
flb_task_users_dec(task, FLB_TRUE);
}
else if (ret == FLB_RETRY) {
else if (ret == FLB_RETRY && config->is_running && !config->is_shutting_down) {
if (ins->retry_limit == FLB_OUT_RETRY_NONE) {
/* cmetrics: output_dropped_records_total */
cmt_counter_add(ins->cmt_dropped_records, ts, task->records,
Expand Down Expand Up @@ -859,6 +859,7 @@ int flb_engine_exit(struct flb_config *config)
uint64_t val = FLB_ENGINE_EV_STOP;

config->is_ingestion_active = FLB_FALSE;
config->is_shutting_down = FLB_TRUE;

flb_input_pause_all(config);

Expand Down
12 changes: 11 additions & 1 deletion src/flb_network.c
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,16 @@ static int net_connect_async(int fd,
/* Save the mask before the event handler do a reset */
mask = u_conn->event.mask;

/*
* If the socket has been invalidated (e.g: timeout or shutdown), just
* print a debug message and return.
*/
if (u_conn->fd == -1) {
flb_debug("[net] TCP connection not longer available: %s:%i",
u->tcp_host, u->tcp_port);
return -1;
}

/* We got a notification, remove the event registered */
ret = mk_event_del(u_conn->evl, &u_conn->event);
if (ret == -1) {
Expand Down Expand Up @@ -1208,7 +1218,7 @@ flb_sockfd_t flb_net_tcp_connect(const char *host, unsigned long port,
}

if (fd == -1) {
flb_error("[net] could not connect to %s:%s",
flb_debug("[net] could not connect to %s:%s",
host, _port);
}

Expand Down
21 changes: 14 additions & 7 deletions src/flb_upstream.c
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ struct flb_upstream *flb_upstream_create(struct flb_config *config,
flb_errno();
return NULL;
}
u->config = config;

/* Set default networking setup values */
flb_net_setup_init(&u->net);
Expand Down Expand Up @@ -254,7 +255,6 @@ struct flb_upstream *flb_upstream_create(struct flb_config *config,
u->flags |= FLB_IO_ASYNC;
u->thread_safe = FLB_FALSE;


/* Initialize queues */
flb_upstream_queue_init(&u->queue);

Expand Down Expand Up @@ -429,15 +429,19 @@ static int prepare_destroy_conn(struct flb_upstream_conn *u_conn)
static inline int prepare_destroy_conn_safe(struct flb_upstream_conn *u_conn)
{
int ret;
int locked = FLB_FALSE;
struct flb_upstream *u = u_conn->u;

if (u->thread_safe == FLB_TRUE) {
pthread_mutex_lock(&u->mutex_lists);
ret = pthread_mutex_trylock(&u->mutex_lists);
if (ret == 0) {
locked = FLB_TRUE;
}
}

ret = prepare_destroy_conn(u_conn);

if (u->thread_safe == FLB_TRUE) {
if (locked) {
pthread_mutex_unlock(&u->mutex_lists);
}

Expand Down Expand Up @@ -791,10 +795,13 @@ int flb_upstream_conn_timeouts(struct mk_list *list)
u_conn->ts_connect_timeout > 0 &&
u_conn->ts_connect_timeout <= now) {
drop = FLB_TRUE;
flb_error("[upstream] connection #%i to %s:%i timed out after "
"%i seconds",
u_conn->fd,
u->tcp_host, u->tcp_port, u->net.connect_timeout);

if (!flb_upstream_is_shutting_down(u)) {
flb_error("[upstream] connection #%i to %s:%i timed out after "
"%i seconds",
u_conn->fd,
u->tcp_host, u->tcp_port, u->net.connect_timeout);
}
}

if (drop == FLB_TRUE) {
Expand Down