From 605a4aa7d9c1acb6d9fde7a89ae53f92ecfdff66 Mon Sep 17 00:00:00 2001 From: Thiago Padilha Date: Mon, 27 May 2024 13:02:24 -0300 Subject: [PATCH] processor_log_replacer: new processor for replacing log records The main purpose of this processor is as a test of the YAML parser capabilities, which accept arbitrarily complex objects as processor plugin property values. Signed-off-by: Thiago Padilha --- CMakeLists.txt | 1 + plugins/CMakeLists.txt | 1 + plugins/processor_log_replacer/CMakeLists.txt | 4 + plugins/processor_log_replacer/log_replacer.c | 154 ++++++++++++++++++ 4 files changed, 160 insertions(+) create mode 100644 plugins/processor_log_replacer/CMakeLists.txt create mode 100644 plugins/processor_log_replacer/log_replacer.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 1d827275454..69c440a2894 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -288,6 +288,7 @@ option(FLB_FILTER_WASM "Enable WASM filter" option(FLB_PROCESSOR_CONTENT_MODIFIER "Enable content modifier processor" Yes) option(FLB_PROCESSOR_LABELS "Enable metrics label manipulation processor" Yes) option(FLB_PROCESSOR_METRICS_SELECTOR "Enable metrics selector processor" Yes) +option(FLB_PROCESSOR_LOG_REPLACER "Enable log replacer processor" Yes) option(FLB_PROCESSOR_SQL "Enable SQL processor" Yes) if(DEFINED FLB_NIGHTLY_BUILD AND NOT "${FLB_NIGHTLY_BUILD}" STREQUAL "") diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 9006ef6d823..66986fe3a7d 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -285,6 +285,7 @@ REGISTER_IN_PLUGIN("in_random") REGISTER_PROCESSOR_PLUGIN("processor_content_modifier") REGISTER_PROCESSOR_PLUGIN("processor_labels") REGISTER_PROCESSOR_PLUGIN("processor_metrics_selector") +REGISTER_PROCESSOR_PLUGIN("processor_log_replacer") REGISTER_PROCESSOR_PLUGIN("processor_sql") # OUTPUTS diff --git a/plugins/processor_log_replacer/CMakeLists.txt b/plugins/processor_log_replacer/CMakeLists.txt new file mode 100644 index 00000000000..3f39eee4e6b --- /dev/null +++ b/plugins/processor_log_replacer/CMakeLists.txt @@ -0,0 +1,4 @@ +set(src + log_replacer.c) + +FLB_PLUGIN(processor_log_replacer "${src}" "") diff --git a/plugins/processor_log_replacer/log_replacer.c b/plugins/processor_log_replacer/log_replacer.c new file mode 100644 index 00000000000..50da7c8e291 --- /dev/null +++ b/plugins/processor_log_replacer/log_replacer.c @@ -0,0 +1,154 @@ +#include +#include +#include +#include + +struct log_replacer_context { + struct flb_processor_instance *ins; /* processor instance */ + struct cfl_variant *replacement; +}; + +static void log_replacer_destroy(struct log_replacer_context *ctx) +{ + if (!ctx) { + return; + } + + flb_free(ctx); +} + +static struct log_replacer_context * +log_replacer_create(struct flb_processor_instance *ins, + struct flb_config *config) +{ + int ret; + struct log_replacer_context *ctx; + + /* Allocate context */ + ctx = flb_calloc(1, sizeof(struct log_replacer_context)); + if (!ctx) { + flb_errno(); + return NULL; + } + + ret = flb_processor_instance_config_map_set(ins, (void *) ctx); + if (ret < 0) { + flb_errno(); + flb_plg_error(ins, "configuration error"); + flb_free(ctx); + return NULL; + } + + ctx->ins = ins; + + if (!ctx->replacement) { + flb_plg_error(ins, "\"replacement\" must be set to an array or map"); + log_replacer_destroy(ctx); + } + + return ctx; +} + +static int cb_init(struct flb_processor_instance *ins, + void *source_plugin_instance, int source_plugin_type, + struct flb_config *config) +{ + struct log_replacer_context *ctx; + + ctx = log_replacer_create(ins, config); + if (!ctx) { + return -1; + } + + flb_processor_instance_set_context(ins, ctx); + + return FLB_PROCESSOR_SUCCESS; +} + +static int cb_exit(struct flb_processor_instance *ins, void *data) +{ + struct log_replacer_context *ctx; + + if (!ins) { + return FLB_PROCESSOR_SUCCESS; + } + + ctx = data; + if (ctx) { + log_replacer_destroy(ctx); + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int process_logs(struct flb_processor_instance *ins, void *chunk_data, + const char *tag, int tag_len) +{ + int ret; + struct cfl_kvpair *kvp; + struct log_replacer_context *ctx; + struct flb_mp_chunk_cobj *chunk_cobj; + struct flb_mp_chunk_record *record; + struct cfl_object *obj = NULL; + struct cfl_list *tmp; + struct cfl_list *head; + + ctx = ins->context; + chunk_cobj = (struct flb_mp_chunk_cobj *) chunk_data; + + + /* Iterate records */ + while ((ret = flb_mp_chunk_cobj_record_next(chunk_cobj, &record)) == FLB_MP_CHUNK_RECORD_OK) { + /* retrieve the target cfl object */ + obj = record->cobj_record; + + /* the operation on top of the data type is unsupported */ + if (obj->variant->type != CFL_VARIANT_KVLIST || ctx->replacement->type != CFL_VARIANT_KVLIST) { + cfl_object_destroy(obj); + return -1; + } + + /* delete all keys in the record */ + cfl_list_foreach_safe(head, tmp, &obj->variant->data.as_kvlist->list) { + kvp = cfl_list_entry(head, struct cfl_kvpair, _head); + cfl_kvpair_destroy(kvp); + } + + /* insert all keys from the replacement into the record */ + cfl_list_foreach_safe(head, tmp, &ctx->replacement->data.as_kvlist->list) { + kvp = cfl_list_entry(head, struct cfl_kvpair, _head); + cfl_kvlist_insert(obj->variant->data.as_kvlist, kvp->key, kvp->val); + } + + /* prevent the engine from deleting the values which are still + * referenced in the context */ + obj->variant->referenced = 1; + + if (ret != 0) { + return FLB_PROCESSOR_FAILURE; + } + } + + return 0; +} + +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_VARIANT, "replacement", NULL, 0, FLB_TRUE, + offsetof(struct log_replacer_context, replacement), + "Object which will be used to replace the record." + }, + {0} +}; + +struct flb_processor_plugin processor_log_replacer_plugin = { + .name = "log_replacer", + .description = "Replace log records with a configured object", + .cb_init = cb_init, + .cb_process_logs = process_logs, + .cb_process_metrics = NULL, + .cb_process_traces = NULL, + .cb_exit = cb_exit, + .config_map = config_map, + .flags = 0 +};