From 6a69b10217b85f23538410f6e997f9bd401a893e Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sun, 18 Jul 2021 19:40:06 -0600 Subject: [PATCH] filter_multiline: new 'multiline' filter Signed-off-by: Eduardo Silva --- CMakeLists.txt | 1 + plugins/CMakeLists.txt | 1 + plugins/filter_multiline/CMakeLists.txt | 4 + plugins/filter_multiline/ml.c | 253 ++++++++++++++++++++++++ plugins/filter_multiline/ml.h | 41 ++++ 5 files changed, 300 insertions(+) create mode 100644 plugins/filter_multiline/CMakeLists.txt create mode 100644 plugins/filter_multiline/ml.c create mode 100644 plugins/filter_multiline/ml.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 71e824fa2e2..0c7f94c9c9c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -185,6 +185,7 @@ option(FLB_FILTER_KUBERNETES "Enable kubernetes filter" Yes) option(FLB_FILTER_REWRITE_TAG "Enable tag rewrite filter" Yes) option(FLB_FILTER_THROTTLE "Enable throttle filter" Yes) option(FLB_FILTER_THROTTLE_SIZE "Enable throttle size filter" No) +option(FLB_FILTER_MULTILINE "Enable multiline filter" Yes) option(FLB_FILTER_NEST "Enable nest filter" Yes) option(FLB_FILTER_LUA "Enable Lua scripting filter" Yes) option(FLB_FILTER_RECORD_MODIFIER "Enable record_modifier filter" Yes) diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 4fee45f6631..8b3ded8cbda 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -222,6 +222,7 @@ REGISTER_FILTER_PLUGIN("filter_tensorflow") if(FLB_REGEX) REGISTER_FILTER_PLUGIN("filter_kubernetes") REGISTER_FILTER_PLUGIN("filter_modify") + REGISTER_FILTER_PLUGIN("filter_multiline") REGISTER_FILTER_PLUGIN("filter_nest") REGISTER_FILTER_PLUGIN("filter_parser") endif() diff --git a/plugins/filter_multiline/CMakeLists.txt b/plugins/filter_multiline/CMakeLists.txt new file mode 100644 index 00000000000..7293f4a3f85 --- /dev/null +++ b/plugins/filter_multiline/CMakeLists.txt @@ -0,0 +1,4 @@ +set(src + ml.c) + +FLB_PLUGIN(filter_multiline "${src}" "") diff --git a/plugins/filter_multiline/ml.c b/plugins/filter_multiline/ml.c new file mode 100644 index 00000000000..6abe648c46c --- /dev/null +++ b/plugins/filter_multiline/ml.c @@ -0,0 +1,253 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2021 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "ml.h" + +static int multiline_load_parsers(struct ml_ctx *ctx) +{ + int ret; + struct mk_list *head; + struct mk_list *head_p; + struct flb_config_map_val *mv; + struct flb_slist_entry *val = NULL; + struct flb_ml_parser_ins *parser_i; + + if (!ctx->multiline_parsers) { + return -1; + } + + /* + * Iterate all 'multiline.parser' entries. Every entry is considered + * a group which can have multiple multiline parser instances. + */ + flb_config_map_foreach(head, mv, ctx->multiline_parsers) { + mk_list_foreach(head_p, mv->val.list) { + val = mk_list_entry(head_p, struct flb_slist_entry, _head); + + /* Create an instance of the defined parser */ + parser_i = flb_ml_parser_instance_create(ctx->m, val->str); + if (!parser_i) { + return -1; + } + + /* Always override parent parser values */ + if (ctx->key_content) { + ret = flb_ml_parser_instance_set(parser_i, + "key_content", + ctx->key_content); + if (ret == -1) { + flb_plg_error(ctx->ins, "could not override 'key_content'"); + return -1; + } + } + } + } + + return 0; +} + +static int flush_callback(struct flb_ml_parser *parser, + struct flb_ml_stream *mst, + void *data, char *buf_data, size_t buf_size) +{ + struct ml_ctx *ctx = data; + + /* Append incoming record to our msgpack context buffer */ + msgpack_sbuffer_write(&ctx->mp_sbuf, buf_data, buf_size); + + return 0; +} + +static int cb_ml_init(struct flb_filter_instance *ins, + struct flb_config *config, + void *data) +{ + int ret; + int len; + uint64_t stream_id; + struct ml_ctx *ctx; + (void) config; + (void) data; + + ctx = flb_calloc(1, sizeof(struct ml_ctx)); + if (!ctx) { + flb_errno(); + return -1; + } + ctx->ins = ins; + + /* Init buffers */ + msgpack_sbuffer_init(&ctx->mp_sbuf); + msgpack_packer_init(&ctx->mp_pck, &ctx->mp_sbuf, msgpack_sbuffer_write); + + /* Load the config map */ + ret = flb_filter_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_free(ctx); + return -1; + } + + /* Set plugin context */ + flb_filter_set_context(ins, ctx); + + /* Create multiline context */ + ctx->m = flb_ml_create(config, ctx->ins->name); + if (!ctx->m) { + /* + * we don't free the context since upon init failure, the exit + * callback will be triggered with our context set above. + */ + return -1; + } + + /* Load the parsers/config */ + ret = multiline_load_parsers(ctx); + if (ret == -1) { + return -1; + } + + /* Create a stream for this file */ + len = strlen(ins->name); + ret = flb_ml_stream_create(ctx->m, + ins->name, len, + flush_callback, ctx, + &stream_id); + if (ret != 0) { + flb_plg_error(ctx->ins, "could not create multiline stream"); + return -1; + } + ctx->stream_id = stream_id; + + return 0; +} + +static int cb_ml_filter(const void *data, size_t bytes, + const char *tag, int tag_len, + void **out_buf, size_t *out_bytes, + struct flb_filter_instance *f_ins, + void *filter_context, + struct flb_config *config) +{ + int ret; + int ok = MSGPACK_UNPACK_SUCCESS; + size_t off = 0; + (void) out_buf; + (void) out_bytes; + (void) f_ins; + (void) filter_context; + (void) config; + msgpack_unpacked result; + msgpack_object *obj; + char *tmp_buf; + size_t tmp_size; + struct ml_ctx *ctx = filter_context; + struct flb_time tm; + + /* reset mspgack size content */ + ctx->mp_sbuf.size = 0; + + /* process records */ + msgpack_unpacked_init(&result); + while (msgpack_unpack_next(&result, data, bytes, &off) == ok) { + flb_time_pop_from_msgpack(&tm, &result, &obj); + ret = flb_ml_append_object(ctx->m, ctx->stream_id, &tm, obj); + if (ret != 0) { + flb_plg_debug(ctx->ins, "could not append object"); + } + } + msgpack_unpacked_destroy(&result); + + if (ctx->mp_sbuf.size > 0) { + /* + * If the filter will report a new set of records because the + * original data was modified, we make a copy to a new memory + * area, since the buffer might be invalidated in the filter + * chain. + */ + + tmp_buf = flb_malloc(ctx->mp_sbuf.size); + if (!tmp_buf) { + flb_errno(); + return FLB_FILTER_NOTOUCH; + } + tmp_size = ctx->mp_sbuf.size; + memcpy(tmp_buf, ctx->mp_sbuf.data, tmp_size); + *out_buf = tmp_buf; + *out_bytes = tmp_size; + ctx->mp_sbuf.size = 0; + + return FLB_FILTER_MODIFIED; + } + + return FLB_FILTER_NOTOUCH; +} + +static int cb_ml_exit(void *data, struct flb_config *config) +{ + struct ml_ctx *ctx = data; + + if (!ctx) { + return 0; + } + + if (ctx->m) { + flb_ml_destroy(ctx->m); + } + + msgpack_sbuffer_destroy(&ctx->mp_sbuf); + flb_free(ctx); + + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + /* Multiline Core Engine based API */ + { + FLB_CONFIG_MAP_CLIST, "multiline.parser", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct ml_ctx, multiline_parsers), + "specify one or multiple multiline parsers: docker, cri, go, java, etc." + }, + + { + FLB_CONFIG_MAP_STR, "multiline.key_content", NULL, + 0, FLB_TRUE, offsetof(struct ml_ctx, key_content), + "specify the key name that holds the content to process." + }, + + /* EOF */ + {0} +}; + +struct flb_filter_plugin filter_multiline_plugin = { + .name = "multiline", + .description = "Concatenate multiline messages", + .cb_init = cb_ml_init, + .cb_filter = cb_ml_filter, + .cb_exit = cb_ml_exit, + .config_map = config_map, + .flags = 0 +}; diff --git a/plugins/filter_multiline/ml.h b/plugins/filter_multiline/ml.h new file mode 100644 index 00000000000..7dab61ec0af --- /dev/null +++ b/plugins/filter_multiline/ml.h @@ -0,0 +1,41 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2021 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_FILTER_MULTILINE_H +#define FLB_FILTER_MULTILINE_H + +#include + +struct ml_ctx { + flb_sds_t key_content; + + /* packaging buffers */ + msgpack_sbuffer mp_sbuf; /* temporary msgpack buffer */ + msgpack_packer mp_pck; /* temporary msgpack packer */ + + /* Multiline core engine */ + uint64_t stream_id; + struct flb_ml *m; + struct mk_list *multiline_parsers; + + struct flb_filter_instance *ins; +}; + +#endif