Skip to content

Commit

Permalink
io: add connection backoff
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Kabakaev <kabakaev@gmail.com>
  • Loading branch information
kabakaev committed Mar 22, 2021

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
1 parent 961ed61 commit 4a5e951
Showing 5 changed files with 102 additions and 4 deletions.
6 changes: 6 additions & 0 deletions include/fluent-bit/flb_network.h
Original file line number Diff line number Diff line change
@@ -38,6 +38,12 @@ struct flb_net_setup {
/* max time in seconds to wait for a established connection */
int connect_timeout;

/* backoff period in seconds after the first connection failure */
int backoff_init;

/* maximum connection backoff period in seconds */
int backoff_max;

/* network interface to bind and use to send data */
flb_sds_t source_address;

4 changes: 4 additions & 0 deletions include/fluent-bit/flb_upstream.h
Original file line number Diff line number Diff line change
@@ -83,6 +83,10 @@ struct flb_upstream {
#endif

struct mk_list _head;

/* Backoff state. */
time_t backoff_next_attempt_time;
int backoff_last_duration;
};

void flb_upstream_queue_init(struct flb_upstream_queue *uq);
68 changes: 66 additions & 2 deletions src/flb_io.c
Original file line number Diff line number Diff line change
@@ -63,9 +63,56 @@
#include <fluent-bit/flb_coro.h>
#include <fluent-bit/flb_http_client.h>

/* Increase backoff time of an upstream */
static void flb_io_backoff_upstream(struct flb_upstream *u)
{
time_t now = time(NULL);
int max_jitter, jitter = 0;

if (u->net.backoff_max < 1 || u->backoff_next_attempt_time > now) {
return;
}

if (u->backoff_last_duration > 0) {
u->backoff_last_duration = 2 * u->backoff_last_duration;
} else {
srand(now);
u->backoff_last_duration = u->net.backoff_init;
}
if (u->backoff_last_duration > u->net.backoff_max) {
u->backoff_last_duration = u->net.backoff_max;
}

if (u->backoff_last_duration > 2) {
/* Add plus/minus 10% jitter */
max_jitter = u->backoff_last_duration % 10 + 1;
/* Random int from -max_jitter to +max_jitter */
jitter = rand() % (2*max_jitter) - max_jitter;
}

u->backoff_next_attempt_time = now + u->backoff_last_duration + jitter;
flb_debug("[io] backoff connecting to %s:%i for %i seconds with jitter %i seconds",
u->tcp_host, u->tcp_port, u->backoff_last_duration, jitter);
}

/* Returns true if upstream should be skipped. */
static int flb_io_in_backoff(struct flb_upstream *u)
{
if (u->backoff_next_attempt_time > time(NULL)) {
flb_debug("[io] skipping connection to %s:%i because of backoff for another %i seconds",
u->tcp_host, u->tcp_port, u->backoff_next_attempt_time - time(NULL));
return FLB_TRUE;
}
return FLB_FALSE;
}

int flb_io_net_connect(struct flb_upstream_conn *u_conn,
struct flb_coro *coro)
{
if (flb_io_in_backoff(u_conn->u) == FLB_TRUE) {
return -1;
}

int ret;
int async = FLB_FALSE;
flb_sockfd_t fd = -1;
@@ -87,16 +134,18 @@ int flb_io_net_connect(struct flb_upstream_conn *u_conn,
fd = flb_net_tcp_connect(u->tcp_host, u->tcp_port, u->net.source_address,
u->net.connect_timeout, async, coro, u_conn);
if (fd == -1) {
flb_io_backoff_upstream(u);
return -1;
}

if (u->proxied_host) {
ret = flb_http_client_proxy_connect(u_conn);
if (ret == -1) {
flb_io_backoff_upstream(u);
flb_debug("[http_client] flb_http_client_proxy_connect connection #%i failed to %s:%i.",
u_conn->fd, u->tcp_host, u->tcp_port);
flb_socket_close(fd);
return -1;
flb_socket_close(fd);
return -1;
}
flb_debug("[http_client] flb_http_client_proxy_connect connection #%i connected to %s:%i.",
u_conn->fd, u->tcp_host, u->tcp_port);
@@ -107,6 +156,8 @@ int flb_io_net_connect(struct flb_upstream_conn *u_conn,
if (u->flags & FLB_IO_TLS) {
ret = flb_tls_session_create(u->tls, u_conn, coro);
if (ret != 0) {
flb_io_backoff_upstream(u);
flb_debug("[io] flb_tls_session_create failed on connection #%i", u_conn->fd);
flb_socket_close(fd);
return -1;
}
@@ -342,6 +393,10 @@ static FLB_INLINE ssize_t net_io_read_async(struct flb_coro *co,
int flb_io_net_write(struct flb_upstream_conn *u_conn, const void *data,
size_t len, size_t *out_len)
{
if (flb_io_in_backoff(u_conn->u) == FLB_TRUE) {
return -1;
}

int ret = -1;
struct flb_upstream *u = u_conn->u;
struct flb_coro *coro = flb_coro_get();
@@ -367,6 +422,15 @@ int flb_io_net_write(struct flb_upstream_conn *u_conn, const void *data,
}
#endif

if (ret == -1) {
flb_io_backoff_upstream(u);
flb_debug("[io] cannot write data to connection #%i at %s:%i",
u_conn->fd, u->tcp_host, u->tcp_port);
} else if (u->backoff_next_attempt_time < time(NULL) && u->backoff_last_duration > 0) {
flb_debug("[io] connection #%i was successful, reset backoff period", u_conn->fd);
u->backoff_last_duration = 0;
}

if (ret == -1 && u_conn->fd > 0) {
flb_socket_close(u_conn->fd);
u_conn->fd = -1;
2 changes: 2 additions & 0 deletions src/flb_network.c
Original file line number Diff line number Diff line change
@@ -58,6 +58,8 @@ void flb_net_setup_init(struct flb_net_setup *net)
net->keepalive_max_recycle = 0;
net->connect_timeout = 10;
net->source_address = NULL;
net->backoff_init = 0;
net->backoff_max = 0;
}

int flb_net_host_set(const char *plugin_name, struct flb_net_host *host, const char *address)
26 changes: 24 additions & 2 deletions src/flb_upstream.c
Original file line number Diff line number Diff line change
@@ -65,6 +65,18 @@ struct flb_config_map upstream_net[] = {
"before it is retired."
},

{
FLB_CONFIG_MAP_TIME, "net.backoff_init", "0s",
0, FLB_TRUE, offsetof(struct flb_net_setup, backoff_init),
"Set backoff time in seconds after the first connection failure."
},

{
FLB_CONFIG_MAP_TIME, "net.backoff_max", "0s",
0, FLB_TRUE, offsetof(struct flb_net_setup, backoff_max),
"Set maximal connection backoff time in seconds."
},

/* EOF */
{0}
};
@@ -363,6 +375,12 @@ static struct flb_upstream_conn *create_conn(struct flb_upstream *u)

now = time(NULL);

if (u->backoff_next_attempt_time > now) {
flb_debug("[upstream] skipping connection to %s:%i because of connection backoff for another %i seconds",
u->tcp_host, u->tcp_port, u->backoff_next_attempt_time - now);
return NULL;
}

conn = flb_calloc(1, sizeof(struct flb_upstream_conn));
if (!conn) {
flb_errno();
@@ -495,12 +513,16 @@ struct flb_upstream_conn *flb_upstream_conn_get(struct flb_upstream *u)
"net.connect_timeout = %i seconds\n"
"net.source_address = %s\n"
"net.keepalive = %s\n"
"net.keepalive_idle_timeout = %i seconds",
"net.keepalive_idle_timeout = %i seconds\n"
"net.backoff_init = %i seconds\n"
"net.backoff_max = %i seconds",
u->tcp_host, u->tcp_port,
u->net.connect_timeout,
u->net.source_address ? u->net.source_address: "any",
u->net.keepalive ? "enabled": "disabled",
u->net.keepalive_idle_timeout);
u->net.keepalive_idle_timeout,
u->net.backoff_init,
u->net.backoff_max);

/* On non Keepalive mode, always create a new TCP connection */
if (u->net.keepalive == FLB_FALSE) {

0 comments on commit 4a5e951

Please sign in to comment.