Skip to content

Commit

Permalink
in_tail: multiline: flush pending data on static files at exit (#2668)
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Nov 23, 2020
1 parent 4d5b9b0 commit 836cc1b
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 22 deletions.
4 changes: 4 additions & 0 deletions plugins/in_tail/tail.c
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,10 @@ static void in_tail_pause(void *data, struct flb_config *config)

if (ctx->multiline == FLB_TRUE) {
flb_input_collector_pause(ctx->coll_fd_mult_flush, ctx->ins);
if (config->is_ingestion_active == FLB_FALSE) {
flb_plg_info(ctx->ins, "flushing pending multiline data...");
flb_tail_mult_pending_flush_all(ctx);
}
}

/* Pause file system backend handlers */
Expand Down
79 changes: 57 additions & 22 deletions plugins/in_tail/tail_multiline.c
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,56 @@ int flb_tail_mult_flush(msgpack_sbuffer *mp_sbuf, msgpack_packer *mp_pck,
return 0;
}

static void file_pending_flush(struct flb_tail_config *ctx,
struct flb_tail_file *file, time_t now)
{
msgpack_sbuffer mp_sbuf;
msgpack_packer mp_pck;

if (file->mult_flush_timeout > now) {
return;
}

if (file->mult_firstline == FLB_FALSE) {
if (file->mult_sbuf.data == NULL || file->mult_sbuf.size <= 0) {
return;
}
}

msgpack_sbuffer_init(&mp_sbuf);
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);

flb_tail_mult_flush(&mp_sbuf, &mp_pck, file, ctx);

flb_input_chunk_append_raw(ctx->ins,
file->tag_buf,
file->tag_len,
mp_sbuf.data,
mp_sbuf.size);
msgpack_sbuffer_destroy(&mp_sbuf);
}

int flb_tail_mult_pending_flush_all(struct flb_tail_config *ctx)
{
time_t expired;
struct mk_list *head;
struct flb_tail_file *file;

expired = time(NULL) + 3600;

/* Iterate promoted event files with pending bytes */
mk_list_foreach(head, &ctx->files_static) {
file = mk_list_entry(head, struct flb_tail_file, _head);
file_pending_flush(ctx, file, expired);
}

/* Iterate promoted event files with pending bytes */
mk_list_foreach(head, &ctx->files_event) {
file = mk_list_entry(head, struct flb_tail_file, _head);
file_pending_flush(ctx, file, expired);
}
}

int flb_tail_mult_pending_flush(struct flb_input_instance *ins,
struct flb_config *config, void *context)
{
Expand All @@ -504,30 +554,15 @@ int flb_tail_mult_pending_flush(struct flb_input_instance *ins,
now = time(NULL);

/* Iterate promoted event files with pending bytes */
mk_list_foreach(head, &ctx->files_event) {
mk_list_foreach(head, &ctx->files_static) {
file = mk_list_entry(head, struct flb_tail_file, _head);
file_pending_flush(ctx, file, now);
}

if (file->mult_flush_timeout > now) {
continue;
}

if (file->mult_firstline == FLB_FALSE) {
if (file->mult_sbuf.data == NULL || file->mult_sbuf.size <= 0) {
continue;
}
}

msgpack_sbuffer_init(&mp_sbuf);
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);

flb_tail_mult_flush(&mp_sbuf, &mp_pck, file, ctx);

flb_input_chunk_append_raw(ins,
file->tag_buf,
file->tag_len,
mp_sbuf.data,
mp_sbuf.size);
msgpack_sbuffer_destroy(&mp_sbuf);
/* Iterate promoted event files with pending bytes */
mk_list_foreach(head, &ctx->files_event) {
file = mk_list_entry(head, struct flb_tail_file, _head);
file_pending_flush(ctx, file, now);
}

return 0;
Expand Down
1 change: 1 addition & 0 deletions plugins/in_tail/tail_multiline.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,6 @@ int flb_tail_mult_flush(msgpack_sbuffer *mp_sbuf,

int flb_tail_mult_pending_flush(struct flb_input_instance *ins,
struct flb_config *config, void *context);
int flb_tail_mult_pending_flush_all(struct flb_tail_config *ctx);

#endif

0 comments on commit 836cc1b

Please sign in to comment.