From b58cfc005e35e60b1445a4b699e38b67abaceb6c Mon Sep 17 00:00:00 2001 From: Alexander Kabakaev Date: Tue, 9 Mar 2021 06:57:43 +0100 Subject: [PATCH] io: add connection backoff Signed-off-by: Alexander Kabakaev --- include/fluent-bit/flb_network.h | 6 +++ include/fluent-bit/flb_upstream.h | 4 ++ src/flb_io.c | 68 ++++++++++++++++++++++++++++++- src/flb_network.c | 2 + src/flb_upstream.c | 26 +++++++++++- 5 files changed, 102 insertions(+), 4 deletions(-) diff --git a/include/fluent-bit/flb_network.h b/include/fluent-bit/flb_network.h index 0fb0dbb8f9d..89f8ac8eeb6 100644 --- a/include/fluent-bit/flb_network.h +++ b/include/fluent-bit/flb_network.h @@ -38,6 +38,12 @@ struct flb_net_setup { /* max time in seconds to wait for a established connection */ int connect_timeout; + /* backoff time in seconds after the first connection failure */ + int backoff_init; + + /* maximum connection backoff time in seconds */ + int backoff_max; + /* network interface to bind and use to send data */ flb_sds_t source_address; diff --git a/include/fluent-bit/flb_upstream.h b/include/fluent-bit/flb_upstream.h index eb260292035..da5d3f05a9e 100644 --- a/include/fluent-bit/flb_upstream.h +++ b/include/fluent-bit/flb_upstream.h @@ -83,6 +83,10 @@ struct flb_upstream { #endif struct mk_list _head; + + /* Backoff state. */ + time_t backoff_next_attempt_time; + int backoff_last_duration; }; void flb_upstream_queue_init(struct flb_upstream_queue *uq); diff --git a/src/flb_io.c b/src/flb_io.c index 43185bedadf..47484bc679c 100644 --- a/src/flb_io.c +++ b/src/flb_io.c @@ -63,9 +63,56 @@ #include #include +/* Increase backoff time of an upstream */ +static void flb_io_backoff_upstream(struct flb_upstream *u) +{ + time_t now = time(NULL); + int max_jitter, jitter = 0; + + if (u->net.backoff_max < 1 || u->backoff_next_attempt_time > now) { + return; + } + + if (u->backoff_last_duration > 0) { + u->backoff_last_duration = 2 * u->backoff_last_duration; + } else { + srand(now); + u->backoff_last_duration = u->net.backoff_init; + } + if (u->backoff_last_duration > u->net.backoff_max) { + u->backoff_last_duration = u->net.backoff_max; + } + + if (u->backoff_last_duration > 2) { + /* Add plus/minus 10% jitter */ + max_jitter = u->backoff_last_duration % 10 + 1; + /* Random int from -max_jitter to +max_jitter */ + jitter = rand() % (2*max_jitter) - max_jitter; + } + + u->backoff_next_attempt_time = now + u->backoff_last_duration + jitter; + flb_debug("[io] backoff connecting to %s:%i for %i seconds with jitter %i seconds", + u->tcp_host, u->tcp_port, u->backoff_last_duration, jitter); +} + +/* Returns true if upstream should be skipped. */ +static int flb_io_in_backoff(struct flb_upstream *u) +{ + if (u->backoff_next_attempt_time > time(NULL)) { + flb_debug("[io] skipping connection to %s:%i because of backoff for another %i seconds", + u->tcp_host, u->tcp_port, u->backoff_next_attempt_time - time(NULL)); + return FLB_TRUE; + } + return FLB_FALSE; +} + int flb_io_net_connect(struct flb_upstream_conn *u_conn, struct flb_coro *coro) { + if (flb_io_in_backoff(u_conn->u) == FLB_TRUE) { + return -1; + } + int ret; int async = FLB_FALSE; flb_sockfd_t fd = -1; @@ -87,16 +134,18 @@ int flb_io_net_connect(struct flb_upstream_conn *u_conn, fd = flb_net_tcp_connect(u->tcp_host, u->tcp_port, u->net.source_address, u->net.connect_timeout, async, coro, u_conn); if (fd == -1) { + flb_io_backoff_upstream(u); return -1; } if (u->proxied_host) { ret = flb_http_client_proxy_connect(u_conn); if (ret == -1) { + flb_io_backoff_upstream(u); flb_debug("[http_client] flb_http_client_proxy_connect connection #%i failed to %s:%i.", u_conn->fd, u->tcp_host, u->tcp_port); - flb_socket_close(fd); - return -1; + flb_socket_close(fd); + return -1; } flb_debug("[http_client] flb_http_client_proxy_connect connection #%i connected to %s:%i.", u_conn->fd, u->tcp_host, u->tcp_port); @@ -107,6 +156,8 @@ int flb_io_net_connect(struct flb_upstream_conn *u_conn, if (u->flags & FLB_IO_TLS) { ret = flb_tls_session_create(u->tls, u_conn, coro); if (ret != 0) { + flb_io_backoff_upstream(u); + flb_debug("[io] flb_tls_session_create failed on connection #%i", u_conn->fd); flb_socket_close(fd); return -1; } @@ -342,6 +393,10 @@ static FLB_INLINE ssize_t net_io_read_async(struct flb_coro *co, int flb_io_net_write(struct flb_upstream_conn *u_conn, const void *data, size_t len, size_t *out_len) { + if (flb_io_in_backoff(u_conn->u) == FLB_TRUE) { + return -1; + } + int ret = -1; struct flb_upstream *u = u_conn->u; struct flb_coro *coro = flb_coro_get(); @@ -367,6 +422,15 @@ int flb_io_net_write(struct flb_upstream_conn *u_conn, const void *data, } #endif + if (ret == -1) { + flb_io_backoff_upstream(u); + flb_debug("[io] cannot write data to connection #%i at %s:%i", + u_conn->fd, u->tcp_host, u->tcp_port); + } else if (u->backoff_next_attempt_time < time(NULL) && u->backoff_last_duration > 0) { + flb_debug("[io] connection #%i was successful, reset backoff duration", u_conn->fd); + u->backoff_last_duration = 0; + } + if (ret == -1 && u_conn->fd > 0) { flb_socket_close(u_conn->fd); u_conn->fd = -1; diff --git a/src/flb_network.c b/src/flb_network.c index c2946f22c7c..5d5768d5a40 100644 --- a/src/flb_network.c +++ b/src/flb_network.c @@ -58,6 +58,8 @@ void flb_net_setup_init(struct flb_net_setup *net) net->keepalive_max_recycle = 0; net->connect_timeout = 10; net->source_address = NULL; + net->backoff_init = 0; + net->backoff_max = 0; } int flb_net_host_set(const char *plugin_name, struct flb_net_host *host, const char *address) diff --git a/src/flb_upstream.c b/src/flb_upstream.c index d8dffff20a4..de04cdc710d 100644 --- a/src/flb_upstream.c +++ b/src/flb_upstream.c @@ -65,6 +65,18 @@ struct flb_config_map upstream_net[] = { "before it is retired." }, + { + FLB_CONFIG_MAP_TIME, "net.backoff_init", "0s", + 0, FLB_TRUE, offsetof(struct flb_net_setup, backoff_init), + "Set backoff time in seconds after the first connection failure." + }, + + { + FLB_CONFIG_MAP_TIME, "net.backoff_max", "0s", + 0, FLB_TRUE, offsetof(struct flb_net_setup, backoff_max), + "Set maximal connection backoff time in seconds." + }, + /* EOF */ {0} }; @@ -363,6 +375,12 @@ static struct flb_upstream_conn *create_conn(struct flb_upstream *u) now = time(NULL); + if (u->backoff_next_attempt_time > now) { + flb_debug("[upstream] skipping connection to %s:%i because of connection backoff for another %i seconds", + u->tcp_host, u->tcp_port, u->backoff_next_attempt_time - now); + return NULL; + } + conn = flb_calloc(1, sizeof(struct flb_upstream_conn)); if (!conn) { flb_errno(); @@ -495,12 +513,16 @@ struct flb_upstream_conn *flb_upstream_conn_get(struct flb_upstream *u) "net.connect_timeout = %i seconds\n" "net.source_address = %s\n" "net.keepalive = %s\n" - "net.keepalive_idle_timeout = %i seconds", + "net.keepalive_idle_timeout = %i seconds\n" + "net.backoff_init = %i seconds\n" + "net.backoff_max = %i seconds", u->tcp_host, u->tcp_port, u->net.connect_timeout, u->net.source_address ? u->net.source_address: "any", u->net.keepalive ? "enabled": "disabled", - u->net.keepalive_idle_timeout); + u->net.keepalive_idle_timeout, + u->net.backoff_init, + u->net.backoff_max); /* On non Keepalive mode, always create a new TCP connection */ if (u->net.keepalive == FLB_FALSE) {