Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.32.0 premature connection destruction #62

Open
wants to merge 2 commits into
base: 2.31.12-all-cherrypicks
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/fluent-bit/flb_upstream_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ struct flb_upstream_conn {
*/
int busy_flag;

/* This flag is used to determine if the connection was shut down to ensure we
* don't do it twice when a timeout is detected.
*
* This is required in order to overcome a limitation in the async read / write
* functions that will be addressed as soon as possible.
*/
int shutdown_flag;

/* Timestamps */
time_t ts_assigned;
time_t ts_created;
Expand Down
82 changes: 69 additions & 13 deletions src/flb_upstream.c
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,19 @@ struct flb_upstream *flb_upstream_create_url(struct flb_config *config,
return u;
}

/* This function shuts the connection down in order to cause
* any client code trying to read or write from it to fail.
*/
static void shutdown_conn(struct flb_upstream_conn *u_conn)
{
if (u_conn->fd > 0 &&
!u_conn->shutdown_flag) {
shutdown(u_conn->fd, SHUT_RDWR);

u_conn->shutdown_flag = FLB_TRUE;
}
}

/*
* This function moves the 'upstream connection' into the queue to be
* destroyed. Note that the caller is responsible to validate and check
Expand All @@ -434,7 +447,17 @@ static int prepare_destroy_conn(struct flb_upstream_conn *u_conn)
}

if (u_conn->fd > 0) {
#ifdef FLB_HAVE_TLS
if (u_conn->tls_session != NULL) {
flb_tls_session_destroy(u_conn->tls, u_conn);

u_conn->tls_session = NULL;
}
#endif
shutdown_conn(u_conn);

flb_socket_close(u_conn->fd);

u_conn->fd = -1;
u_conn->event.fd = -1;
}
Expand Down Expand Up @@ -537,6 +560,8 @@ static struct flb_upstream_conn *create_conn(struct flb_upstream *u)
conn->ts_available = 0;
conn->ka_count = 0;
conn->coro = coro;
conn->busy_flag = FLB_FALSE;
conn->shutdown_flag = FLB_FALSE;

if (u->net.keepalive == FLB_TRUE) {
flb_upstream_conn_recycle(conn, FLB_TRUE);
Expand Down Expand Up @@ -798,7 +823,6 @@ int flb_upstream_conn_timeouts(struct mk_list *list)
{
time_t now;
int drop;
int inject;
struct mk_list *head;
struct mk_list *u_head;
struct mk_list *tmp;
Expand Down Expand Up @@ -846,22 +870,54 @@ int flb_upstream_conn_timeouts(struct mk_list *list)
}
}

if (drop == FLB_TRUE) {
inject = FLB_FALSE;
if (u_conn->event.status != MK_EVENT_NONE) {
inject = FLB_TRUE;
}
if (drop == FLB_TRUE) {
u_conn->net_error = ETIMEDOUT;
prepare_destroy_conn(u_conn);
/*
* prepare_destroy_conn calls mk_list_del on the event in the
* priority bucket queue, so for safety, we inject it afterwards

/* We need to shut the connection down
* in order to cause some functions that are not
* aware of the connection error signaling
* mechanism to fail and abort.
*
* These functions do not check the net_error field
* in the connection instance upon being awakened
* so we need to ensure that any read/write
* operations on the socket generate an error.
*
* net_io_write_async
* net_io_read_async
* flb_tls_net_write_async
* flb_tls_net_read_async
*
* This operation could be selectively performed for
* connections that have already been established
* with no side effects because the connection
* establishment code honors `net_error` but
* taking in account that the previous version of
* the code did it unconditionally with no noticeable
* side effects leaving it that way is the safest
* choice at the moment.
*/
if (inject == FLB_TRUE) {
mk_event_inject(u_conn->evl, &u_conn->event,
MK_EVENT_READ | MK_EVENT_WRITE,

if (MK_EVENT_IS_REGISTERED((&u_conn->event))) {
shutdown_conn(u_conn);

mk_event_inject(u_conn->evl,
&u_conn->event,
u_conn->event.mask,
FLB_TRUE);
}
else {
/* I can't think of a valid reason for this code path
* to be taken but considering that it was previously
* possible for it to happen (maybe wesley can shed
* some light on it if he remembers) I'll leave this
* for the moment.
* In any case, it's proven not to interfere with the
* coroutine awakening issue this change addresses.
*/

prepare_destroy_conn(u_conn);
}
}
}

Expand Down
Loading