Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter_multiline: implement Docker partial_message support #5037

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion plugins/filter_multiline/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
set(src
ml.c)
ml.c
ml_concat.c)

FLB_PLUGIN(filter_multiline "${src}" "")
300 changes: 263 additions & 37 deletions plugins/filter_multiline/ml.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
#include <fluent-bit/flb_storage.h>
#include <fluent-bit/multiline/flb_ml.h>
#include <fluent-bit/multiline/flb_ml_parser.h>
#include <fluent-bit/flb_scheduler.h>

#include "ml.h"
#include "ml_concat.h"

static struct ml_stream *get_by_id(struct ml_ctx *ctx, uint64_t stream_id);

Expand Down Expand Up @@ -197,6 +199,7 @@ static int cb_ml_init(struct flb_filter_instance *ins,
ctx->ins = ins;
ctx->debug_flush = FLB_FALSE;
ctx->config = config;
ctx->timer_created = FLB_FALSE;

/*
* Config map is not yet set at this point in the code
Expand All @@ -207,6 +210,26 @@ static int cb_ml_init(struct flb_filter_instance *ins,
if (tmp) {
ctx->use_buffer = flb_utils_bool(tmp);
}
ctx->partial_mode = FLB_FALSE;
tmp = (char *) flb_filter_get_property("mode", ins);
if (tmp != NULL) {
if (strcasecmp(tmp, FLB_MULTILINE_MODE_PARTIAL_MESSAGE) == 0) {
ctx->partial_mode = FLB_TRUE;
} else if (strcasecmp(tmp, FLB_MULTILINE_MODE_PARSER) == 0) {
ctx->partial_mode = FLB_FALSE;
} else {
flb_plg_error(ins, "'Mode' must be '%s' or '%s'",
FLB_MULTILINE_MODE_PARTIAL_MESSAGE,
FLB_MULTILINE_MODE_PARSER);
return -1;
Comment on lines +220 to +224
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is flb_free(ctx) needed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its stylistically better yes. Strictly speaking, if you fail plugin init, then Fluent Bit shuts down, and so freeing memory isn't needed. Lots of the plugins don't free on init failure.

}
}

if (ctx->partial_mode == FLB_TRUE && ctx->use_buffer == FLB_FALSE) {
flb_plg_error(ins, "'%s' 'Mode' requires 'Buffer' to be 'On'",
FLB_MULTILINE_MODE_PARTIAL_MESSAGE);
}

if (ctx->use_buffer == FLB_FALSE) {
/* Init buffers */
msgpack_sbuffer_init(&ctx->mp_sbuf);
Expand Down Expand Up @@ -252,10 +275,25 @@ static int cb_ml_init(struct flb_filter_instance *ins,
flb_free(ctx);
return -1;
}

/* Set plugin context */
flb_filter_set_context(ins, ctx);

if (ctx->key_content == NULL && ctx->partial_mode == FLB_TRUE) {
flb_plg_error(ins, "'Mode' '%s' requires 'multiline.key_content'",
FLB_MULTILINE_MODE_PARTIAL_MESSAGE);
flb_free(ctx);
return -1;
}

if (ctx->partial_mode == FLB_FALSE && mk_list_size(ctx->multiline_parsers) == 0) {
flb_plg_error(ins, "The default 'Mode' '%s' requires at least one 'multiline.parser'",
FLB_MULTILINE_MODE_PARSER);
flb_free(ctx);
return -1;
}


if (ctx->use_buffer == FLB_TRUE) {
/*
* Emitter Storage Type: the emitter input plugin to be created by default
Expand Down Expand Up @@ -293,43 +331,46 @@ static int cb_ml_init(struct flb_filter_instance *ins,
#endif
}

/* Create multiline context */
ctx->m = flb_ml_create(config, ctx->ins->name);
if (!ctx->m) {
/*
* we don't free the context since upon init failure, the exit
* callback will be triggered with our context set above.
*/
return -1;
}

/* Load the parsers/config */
ret = multiline_load_parsers(ctx);
if (ret == -1) {
return -1;
}

mk_list_init(&ctx->ml_streams);
mk_list_init(&ctx->split_message_packers);

if (ctx->use_buffer == FLB_TRUE) {
if (ctx->partial_mode == FLB_FALSE) {
/* Create multiline context */
ctx->m = flb_ml_create(config, ctx->ins->name);
if (!ctx->m) {
/*
* we don't free the context since upon init failure, the exit
* callback will be triggered with our context set above.
*/
return -1;
}

ctx->m->flush_ms = ctx->flush_ms;
ret = flb_ml_auto_flush_init(ctx->m);
/* Load the parsers/config */
ret = multiline_load_parsers(ctx);
if (ret == -1) {
return -1;
}
} else {
/* Create a stream for this file */
len = strlen(ins->name);
ret = flb_ml_stream_create(ctx->m,
ins->name, len,
flush_callback, ctx,
&stream_id);
if (ret != 0) {
flb_plg_error(ctx->ins, "could not create multiline stream");
return -1;

if (ctx->use_buffer == FLB_TRUE) {

ctx->m->flush_ms = ctx->flush_ms;
ret = flb_ml_auto_flush_init(ctx->m);
if (ret == -1) {
return -1;
}
} else {
/* Create a stream for this file */
len = strlen(ins->name);
ret = flb_ml_stream_create(ctx->m,
ins->name, len,
flush_callback, ctx,
&stream_id);
if (ret != 0) {
flb_plg_error(ctx->ins, "could not create multiline stream");
return -1;
}
ctx->stream_id = stream_id;
}
ctx->stream_id = stream_id;
}

return 0;
Expand Down Expand Up @@ -451,6 +492,176 @@ static struct ml_stream *get_or_create_stream(struct ml_ctx *ctx,
return stream;
}

static void partial_timer_cb(struct flb_config *config, void *data)
{
struct ml_ctx *ctx = data;
(void) config;
struct mk_list *tmp;
struct mk_list *head;
struct split_message_packer *packer;
unsigned long long now;
unsigned long long diff;
int ret;

now = ml_current_timestamp();

mk_list_foreach_safe(head, tmp, &ctx->split_message_packers) {
packer = mk_list_entry(head, struct split_message_packer, _head);

diff = now - packer->last_write_time;
if (diff <= ctx->flush_ms) {
continue;
}

mk_list_del(&packer->_head);
ml_split_message_packer_complete(packer);
/* re-emit record with original tag */
flb_plg_trace(ctx->ins, "emitting from %s to %s", packer->input_name, packer->tag);
ret = in_emitter_add_record(packer->tag, flb_sds_len(packer->tag),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not clear how cycles are avoided.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the beginning of cb_ml_filter:

    if (i_ins == ctx->ins_emitter) {
        flb_plg_trace(ctx->ins, "not processing records from the emitter");
        return FLB_FILTER_NOTOUCH;
    }

packer->mp_sbuf.data, packer->mp_sbuf.size,
ctx->ins_emitter);
if (ret < 0) {
/* this shouldn't happen in normal execution */
flb_plg_warn(ctx->ins, "Couldn't send concatenated record of size %zu bytes to in_emitter %s",
packer->mp_sbuf.size, ctx->ins_emitter->name);
}
ml_split_message_packer_destroy(packer);
}

}

static int ml_filter_partial(const void *data, size_t bytes,
const char *tag, int tag_len,
void **out_buf, size_t *out_bytes,
struct flb_filter_instance *f_ins,
struct flb_input_instance *i_ins,
void *filter_context,
struct flb_config *config)
{
int ret;
int ok = MSGPACK_UNPACK_SUCCESS;
size_t off = 0;
(void) f_ins;
(void) config;
msgpack_unpacked result;
msgpack_object *obj;
struct ml_ctx *ctx = filter_context;
struct flb_time tm;
msgpack_sbuffer tmp_sbuf;
msgpack_packer tmp_pck;
int partial_records = 0;
int total_records = 0;
int return_records = 0;
int partial = FLB_FALSE;
int is_last_partial = FLB_FALSE;
struct split_message_packer *packer;
char *partial_id_str = NULL;
size_t partial_id_size = 0;
struct flb_sched *sched;

/*
* create a timer that will run periodically and check if pending buffers
* have expired
* this is created once on the first flush
*/
if (ctx->timer_created == FLB_FALSE) {
flb_plg_debug(ctx->ins,
"Creating flush timer with frequency %dms",
ctx->flush_ms);

sched = flb_sched_ctx_get();

ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have this be the (flush_ms / x) where (flush_ms / x) is less than grace? And the cb timer altered to only run if in shutdown phase or every x runs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flush timer can't help on shutdown. I need another solution for that (which will come in a separate PR and I will do a separate write up in an issue on that for Eduardo, ping me if you want a verbal explanation before I get to it).

Dividing by X is potentially a good idea since then we are more likely to flush closer to when a log has exceeded flush_ms. I'll change this, I'm gonna make X be 2.

ctx->flush_ms / 2, partial_timer_cb,
ctx, NULL);
if (ret < 0) {
flb_plg_error(ctx->ins, "Failed to create flush timer");
} else {
ctx->timer_created = FLB_TRUE;
}
}

/*
* Create temporary msgpack buffer
* for non-partial messages which are passed on as-is
*/
msgpack_sbuffer_init(&tmp_sbuf);
msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);

msgpack_unpacked_init(&result);
while (msgpack_unpack_next(&result, data, bytes, &off) == ok) {
total_records++;
flb_time_pop_from_msgpack(&tm, &result, &obj);

partial = ml_is_partial(obj);
if (partial == FLB_TRUE) {
partial_records++;
ret = ml_get_partial_id(obj, &partial_id_str, &partial_id_size);
if (ret == -1) {
flb_plg_warn(ctx->ins, "Could not find partial_id but partial_message key is FLB_TRUE for record with tag %s", tag);
/* handle this record as non-partial */
partial_records--;
goto pack_non_partial;
}
packer = ml_get_packer(&ctx->split_message_packers, tag,
i_ins->name, partial_id_str, partial_id_size);
if (packer == NULL) {
flb_plg_trace(ctx->ins, "Found new partial record with tag %s", tag);
packer = ml_create_packer(tag, i_ins->name, partial_id_str, partial_id_size,
obj, ctx->key_content, &tm);
if (packer == NULL) {
flb_plg_warn(ctx->ins, "Could not create packer for partial record with tag %s", tag);
/* handle this record as non-partial */
partial_records--;
goto pack_non_partial;
}
mk_list_add(&packer->_head, &ctx->split_message_packers);
}
ret = ml_split_message_packer_write(packer, obj, ctx->key_content);
if (ret < 0) {
flb_plg_warn(ctx->ins, "Could not append content for partial record with tag %s", tag);
/* handle this record as non-partial */
partial_records--;
goto pack_non_partial;
}
is_last_partial = ml_is_partial_last(obj);
if (is_last_partial == FLB_TRUE) {
/* emit the record in this filter invocation */
return_records++;
ml_split_message_packer_complete(packer);
ml_append_complete_record(packer->mp_sbuf.data, packer->mp_sbuf.size, &tmp_pck);
mk_list_del(&packer->_head);
ml_split_message_packer_destroy(packer);
}
} else {

pack_non_partial:
return_records++;
/* record passed from filter as-is */
msgpack_pack_array(&tmp_pck, 2);
flb_time_append_to_msgpack(&tm, &tmp_pck, 0);
msgpack_pack_object(&tmp_pck, *obj);
Comment on lines +640 to +643
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if for consistency, these non_partial logs should be re-emitted. That way if the multiline filter is not the first filter, all the logs will have prior filters applied twice, rather than only the multiline logs.

Also to consider is efficiency, which advocates for the current solution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind. See that emit only occurs after timeout, so it shouldn't regularly happen. Current solution seems fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this too myself... but returning directly from the filter is more efficient... also I think most of hte filters can be safely passed through twice... this is also something I can fix if it turns out it does have weird side effects and someone complains.

There is an issue right now open that under high throughput the emitter fills up quickly... and remember the use case here is explicitly for huge logs, so I htink I want to stick with only using the emitter when needed.

}

}

msgpack_unpacked_destroy(&result);

if (partial_records == 0) {
/* if no records were partial, we didn't modify the chunk */
msgpack_sbuffer_destroy(&tmp_sbuf);
return FLB_FILTER_NOTOUCH;
} else if (return_records > 0) {
/* some new records can be returned now, return a new buffer */
*out_buf = tmp_sbuf.data;
*out_bytes = tmp_sbuf.size;
Comment on lines +656 to +657
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens to the old out_buf, and out_bytes? This gets cleaned up properly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may also need to call free(tmp_sbuf) The data is still used, so msgpack_sbuffer_free might not be able to be used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the way this works is that you return the memory to the caller, and then the caller frees. If you look in the code in I think flb_filter.c it frees the returned buf. All of the filters work this way.

} else {
/* no records to return right now, free buffer */
msgpack_sbuffer_destroy(&tmp_sbuf);
}
return FLB_FILTER_MODIFIED;
}

static int cb_ml_filter(const void *data, size_t bytes,
const char *tag, int tag_len,
void **out_buf, size_t *out_bytes,
Expand All @@ -462,8 +673,6 @@ static int cb_ml_filter(const void *data, size_t bytes,
int ret;
int ok = MSGPACK_UNPACK_SUCCESS;
size_t off = 0;
(void) out_buf;
(void) out_bytes;
(void) f_ins;
(void) config;
msgpack_unpacked result;
Expand All @@ -474,6 +683,20 @@ static int cb_ml_filter(const void *data, size_t bytes,
struct flb_time tm;
struct ml_stream *stream;

if (i_ins == ctx->ins_emitter) {
flb_plg_trace(ctx->ins, "not processing records from the emitter");
return FLB_FILTER_NOTOUCH;
}

/* 'partial_message' mode */
if (ctx->partial_mode == FLB_TRUE) {
return ml_filter_partial(data, bytes, tag, tag_len,
out_buf, out_bytes,
f_ins, i_ins,
filter_context, config);
}

/* 'parser' mode */
if (ctx->use_buffer == FLB_FALSE) {
/* reset mspgack size content */
ctx->mp_sbuf.size = 0;
Expand Down Expand Up @@ -519,10 +742,6 @@ static int cb_ml_filter(const void *data, size_t bytes,
return FLB_FILTER_NOTOUCH;

} else { /* buffered mode */
if (i_ins == ctx->ins_emitter) {
flb_plg_trace(ctx->ins, "not processing record from the emitter");
return FLB_FILTER_NOTOUCH;
}

stream = get_or_create_stream(ctx, i_ins, tag, tag_len);

Expand Down Expand Up @@ -596,6 +815,13 @@ static struct flb_config_map config_map[] = {
"With buffer off, this filter will not work with most inputs, except tail."
},

{
FLB_CONFIG_MAP_STR, "mode", "parser",
0, FLB_TRUE, offsetof(struct ml_ctx, mode),
"Mode can be 'parser' for regex concat, or 'partial_message' to "
"concat split docker logs."
},

{
FLB_CONFIG_MAP_INT, "flush_ms", "2000",
0, FLB_TRUE, offsetof(struct ml_ctx, flush_ms),
Expand Down
Loading