From bb5b031c9fc64b2f4f5597bda0b86dd7cce8173c Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 23 Nov 2020 17:35:27 -0600 Subject: [PATCH] in_tail: dockermode: flush pending data on static files at exit (#2668) Signed-off-by: Eduardo Silva --- plugins/in_tail/tail.c | 4 ++ plugins/in_tail/tail_dockermode.c | 75 ++++++++++++++++++++++--------- plugins/in_tail/tail_dockermode.h | 1 + 3 files changed, 60 insertions(+), 20 deletions(-) diff --git a/plugins/in_tail/tail.c b/plugins/in_tail/tail.c index 4911f1d86bb..b245bc8f717 100644 --- a/plugins/in_tail/tail.c +++ b/plugins/in_tail/tail.c @@ -429,6 +429,10 @@ static void in_tail_pause(void *data, struct flb_config *config) if (ctx->docker_mode == FLB_TRUE) { flb_input_collector_pause(ctx->coll_fd_dmode_flush, ctx->ins); + if (config->is_ingestion_active == FLB_FALSE) { + flb_plg_info(ctx->ins, "flushing pending docker mode data..."); + flb_tail_dmode_pending_flush_all(ctx); + } } if (ctx->multiline == FLB_TRUE) { diff --git a/plugins/in_tail/tail_dockermode.c b/plugins/in_tail/tail_dockermode.c index 86912a54501..0b09613f672 100644 --- a/plugins/in_tail/tail_dockermode.c +++ b/plugins/in_tail/tail_dockermode.c @@ -388,41 +388,76 @@ void flb_tail_dmode_flush(msgpack_sbuffer *mp_sbuf, msgpack_packer *mp_pck, flb_free(out_buf); } -int flb_tail_dmode_pending_flush(struct flb_input_instance *ins, - struct flb_config *config, void *context) +static void file_pending_flush(struct flb_tail_config *ctx, + struct flb_tail_file *file, time_t now) { - time_t now; msgpack_sbuffer mp_sbuf; msgpack_packer mp_pck; + + if (file->dmode_flush_timeout > now) { + return; + } + + if (flb_sds_len(file->dmode_lastline) == 0) { + return; + } + + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + flb_tail_dmode_flush(&mp_sbuf, &mp_pck, file, ctx); + + flb_input_chunk_append_raw(ctx->ins, + file->tag_buf, + file->tag_len, + mp_sbuf.data, + mp_sbuf.size); + msgpack_sbuffer_destroy(&mp_sbuf); +} + +int flb_tail_dmode_pending_flush_all(struct flb_tail_config *ctx) +{ + time_t expired; struct mk_list *head; struct flb_tail_file *file; - struct flb_tail_config *ctx = context; - now = time(NULL); + expired = time(NULL) + 3600; + + /* Iterate promoted event files with pending bytes */ + mk_list_foreach(head, &ctx->files_static) { + file = mk_list_entry(head, struct flb_tail_file, _head); + file_pending_flush(ctx, file, expired); + } /* Iterate promoted event files with pending bytes */ mk_list_foreach(head, &ctx->files_event) { file = mk_list_entry(head, struct flb_tail_file, _head); + file_pending_flush(ctx, file, expired); + } - if (file->dmode_flush_timeout > now) { - continue; - } + return 0; +} - if (flb_sds_len(file->dmode_lastline) == 0) { - continue; - } +int flb_tail_dmode_pending_flush(struct flb_input_instance *ins, + struct flb_config *config, void *context) +{ + time_t now; + struct mk_list *head; + struct flb_tail_file *file; + struct flb_tail_config *ctx = context; - msgpack_sbuffer_init(&mp_sbuf); - msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + now = time(NULL); - flb_tail_dmode_flush(&mp_sbuf, &mp_pck, file, ctx); + /* Iterate static event files with pending bytes */ + mk_list_foreach(head, &ctx->files_static) { + file = mk_list_entry(head, struct flb_tail_file, _head); + file_pending_flush(ctx, file, now); + } - flb_input_chunk_append_raw(ins, - file->tag_buf, - file->tag_len, - mp_sbuf.data, - mp_sbuf.size); - msgpack_sbuffer_destroy(&mp_sbuf); + /* Iterate promoted event files with pending bytes */ + mk_list_foreach(head, &ctx->files_event) { + file = mk_list_entry(head, struct flb_tail_file, _head); + file_pending_flush(ctx, file, now); } return 0; diff --git a/plugins/in_tail/tail_dockermode.h b/plugins/in_tail/tail_dockermode.h index 8fdedccfdcc..4549c2d23af 100644 --- a/plugins/in_tail/tail_dockermode.h +++ b/plugins/in_tail/tail_dockermode.h @@ -36,5 +36,6 @@ void flb_tail_dmode_flush(msgpack_sbuffer *mp_sbuf, msgpack_packer *mp_pck, struct flb_tail_file *file, struct flb_tail_config *ctx); int flb_tail_dmode_pending_flush(struct flb_input_instance *ins, struct flb_config *config, void *context); +int flb_tail_dmode_pending_flush_all(struct flb_tail_config *ctx); #endif