Skip to content

Commit

Permalink
[io] add connection backoff
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Kabakaev <kabakaev@gmail.com>
  • Loading branch information
kabakaev committed Mar 9, 2021
1 parent 2dfd703 commit 995a16d
Showing 6 changed files with 109 additions and 4 deletions.
4 changes: 4 additions & 0 deletions include/fluent-bit/flb_io.h
Original file line number Diff line number Diff line change
@@ -49,4 +49,8 @@ int flb_io_net_write(struct flb_upstream_conn *u, const void *data,
size_t len, size_t *out_len);
ssize_t flb_io_net_read(struct flb_upstream_conn *u, void *buf, size_t len);

void flb_io_backoff_upstream(struct flb_upstream *u);
int flb_io_in_backoff(struct flb_upstream *u);


#endif
6 changes: 6 additions & 0 deletions include/fluent-bit/flb_network.h
Original file line number Diff line number Diff line change
@@ -38,6 +38,12 @@ struct flb_net_setup {
/* max time in seconds to wait for a established connection */
int connect_timeout;

/* backoff period in seconds after the first connection failure */
int initial_backoff;

/* maximum connection backoff period in seconds */
int max_backoff;

/* network interface to bind and use to send data */
flb_sds_t source_address;

4 changes: 4 additions & 0 deletions include/fluent-bit/flb_upstream.h
Original file line number Diff line number Diff line change
@@ -83,6 +83,10 @@ struct flb_upstream {
#endif

struct mk_list _head;

/* Backoff state. */
time_t next_attempt_time;
int last_backoff_seconds;
};

void flb_upstream_queue_init(struct flb_upstream_queue *uq);
68 changes: 66 additions & 2 deletions src/flb_io.c
Original file line number Diff line number Diff line change
@@ -63,9 +63,56 @@
#include <fluent-bit/flb_coro.h>
#include <fluent-bit/flb_http_client.h>

/* Increase backoff time of an upstream */
void flb_io_backoff_upstream(struct flb_upstream *u)
{
time_t now = time(NULL);
int max_jitter, jitter = 0;

if (u->net.max_backoff < 1 || u->next_attempt_time > now) {
return;
}

if (u->last_backoff_seconds > 0) {
u->last_backoff_seconds = 2 * u->last_backoff_seconds;
} else {
srand(now); // Question to reviewer: should the RNG be (re)initialized?
u->last_backoff_seconds = u->net.initial_backoff;
}
if (u->last_backoff_seconds > u->net.max_backoff) {
u->last_backoff_seconds = u->net.max_backoff;
}

if (u->last_backoff_seconds > 2) {
/* Add plus/minus 10% jitter */
max_jitter = u->last_backoff_seconds % 10 + 1;
/* Random int from -max_jitter to +max_jitter */
jitter = rand() % (2*max_jitter) - max_jitter;
}

u->next_attempt_time = now + u->last_backoff_seconds + jitter;
flb_debug("[io] backoff connecting to %s:%i for %i seconds with jitter %i seconds",
u->tcp_host, u->tcp_port, u->last_backoff_seconds, jitter);
}

/* Returns true if upstream should be skipped. */
int flb_io_in_backoff(struct flb_upstream *u)
{
if (u->next_attempt_time > time(NULL)) {
flb_debug("[io] skipping connection to %s:%i because of backoff for another %i seconds",
u->tcp_host, u->tcp_port, u->next_attempt_time - time(NULL));
return FLB_TRUE;
}
return FLB_FALSE;
}

int flb_io_net_connect(struct flb_upstream_conn *u_conn,
struct flb_coro *coro)
{
if (flb_io_in_backoff(u_conn->u) == FLB_TRUE) {
return -1;
}

int ret;
int async = FLB_FALSE;
flb_sockfd_t fd = -1;
@@ -87,16 +134,18 @@ int flb_io_net_connect(struct flb_upstream_conn *u_conn,
fd = flb_net_tcp_connect(u->tcp_host, u->tcp_port, u->net.source_address,
u->net.connect_timeout, async, coro, u_conn);
if (fd == -1) {
flb_io_backoff_upstream(u);
return -1;
}

if (u->proxied_host) {
ret = flb_http_client_proxy_connect(u_conn);
if (ret == -1) {
flb_io_backoff_upstream(u);
flb_debug("[http_client] flb_http_client_proxy_connect connection #%i failed to %s:%i.",
u_conn->fd, u->tcp_host, u->tcp_port);
flb_socket_close(fd);
return -1;
flb_socket_close(fd);
return -1;
}
flb_debug("[http_client] flb_http_client_proxy_connect connection #%i connected to %s:%i.",
u_conn->fd, u->tcp_host, u->tcp_port);
@@ -107,6 +156,8 @@ int flb_io_net_connect(struct flb_upstream_conn *u_conn,
if (u->flags & FLB_IO_TLS) {
ret = flb_tls_session_create(u->tls, u_conn, coro);
if (ret != 0) {
flb_io_backoff_upstream(u);
flb_debug("[io] flb_tls_session_create failed on connection #%i", u_conn->fd);
flb_socket_close(fd);
return -1;
}
@@ -342,6 +393,10 @@ static FLB_INLINE ssize_t net_io_read_async(struct flb_coro *co,
int flb_io_net_write(struct flb_upstream_conn *u_conn, const void *data,
size_t len, size_t *out_len)
{
if (flb_io_in_backoff(u_conn->u) == FLB_TRUE) {
return -1;
}

int ret = -1;
struct flb_upstream *u = u_conn->u;
struct flb_coro *coro = flb_coro_get();
@@ -367,6 +422,15 @@ int flb_io_net_write(struct flb_upstream_conn *u_conn, const void *data,
}
#endif

if (ret == -1) {
flb_io_backoff_upstream(u);
flb_debug("[io] cannot write data to connection #%i at %s:%i",
u_conn->fd, u->tcp_host, u->tcp_port, u->next_attempt_time - time(NULL));
} else if (u->next_attempt_time < time(NULL) && u->last_backoff_seconds > 0) {
flb_debug("[upstream] OK, reset last_backoff_seconds");
u->last_backoff_seconds = 0;
}

if (ret == -1 && u_conn->fd > 0) {
flb_socket_close(u_conn->fd);
u_conn->fd = -1;
2 changes: 2 additions & 0 deletions src/flb_network.c
Original file line number Diff line number Diff line change
@@ -58,6 +58,8 @@ void flb_net_setup_init(struct flb_net_setup *net)
net->keepalive_max_recycle = 0;
net->connect_timeout = 10;
net->source_address = NULL;
net->initial_backoff = 0;
net->max_backoff = 0;
}

int flb_net_host_set(const char *plugin_name, struct flb_net_host *host, const char *address)
29 changes: 27 additions & 2 deletions src/flb_upstream.c
Original file line number Diff line number Diff line change
@@ -65,6 +65,18 @@ struct flb_config_map upstream_net[] = {
"before it is retired."
},

{
FLB_CONFIG_MAP_TIME, "net.initial_backoff", "0s",
0, FLB_TRUE, offsetof(struct flb_net_setup, initial_backoff),
"Set backoff time in seconds after the first connection failure."
},

{
FLB_CONFIG_MAP_TIME, "net.max_backoff", "0s",
0, FLB_TRUE, offsetof(struct flb_net_setup, max_backoff),
"Set maximal connection backoff time in seconds."
},

/* EOF */
{0}
};
@@ -363,6 +375,12 @@ static struct flb_upstream_conn *create_conn(struct flb_upstream *u)

now = time(NULL);

if (u->next_attempt_time > now) {
flb_debug("[upstream] skipping connection to %s:%i because of connection backoff for another %i seconds",
u->tcp_host, u->tcp_port, u->next_attempt_time - now);
return NULL;
}

conn = flb_calloc(1, sizeof(struct flb_upstream_conn));
if (!conn) {
flb_errno();
@@ -419,6 +437,9 @@ static struct flb_upstream_conn *create_conn(struct flb_upstream *u)
if (ret == -1) {
flb_debug("[upstream] connection #%i failed to %s:%i",
conn->fd, u->tcp_host, u->tcp_port);
//if (u->next_attempt_time < time(NULL)) {
// flb_io_backoff_upstream(u);
//}
prepare_destroy_conn(conn);
return NULL;
}
@@ -495,12 +516,16 @@ struct flb_upstream_conn *flb_upstream_conn_get(struct flb_upstream *u)
"net.connect_timeout = %i seconds\n"
"net.source_address = %s\n"
"net.keepalive = %s\n"
"net.keepalive_idle_timeout = %i seconds",
"net.keepalive_idle_timeout = %i seconds\n"
"net.initial_backoff = %i seconds\n"
"net.max_backoff = %i seconds",
u->tcp_host, u->tcp_port,
u->net.connect_timeout,
u->net.source_address ? u->net.source_address: "any",
u->net.keepalive ? "enabled": "disabled",
u->net.keepalive_idle_timeout);
u->net.keepalive_idle_timeout,
u->net.initial_backoff,
u->net.max_backoff);

/* On non Keepalive mode, always create a new TCP connection */
if (u->net.keepalive == FLB_FALSE) {

0 comments on commit 995a16d

Please sign in to comment.