Skip to content

Commit

Permalink
processor_log_replacer: new processor for replacing log records
Browse files Browse the repository at this point in the history
The main purpose of this processor is as a test of the YAML parser
capabilities, which accept arbitrarily complex objects as processor
plugin property values.

Signed-off-by: Thiago Padilha <[email protected]>
  • Loading branch information
tchrono committed May 28, 2024
1 parent 8ac0c00 commit 605a4aa
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 0 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ option(FLB_FILTER_WASM "Enable WASM filter"
option(FLB_PROCESSOR_CONTENT_MODIFIER "Enable content modifier processor" Yes)
option(FLB_PROCESSOR_LABELS "Enable metrics label manipulation processor" Yes)
option(FLB_PROCESSOR_METRICS_SELECTOR "Enable metrics selector processor" Yes)
option(FLB_PROCESSOR_LOG_REPLACER "Enable log replacer processor" Yes)
option(FLB_PROCESSOR_SQL "Enable SQL processor" Yes)

if(DEFINED FLB_NIGHTLY_BUILD AND NOT "${FLB_NIGHTLY_BUILD}" STREQUAL "")
Expand Down
1 change: 1 addition & 0 deletions plugins/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ REGISTER_IN_PLUGIN("in_random")
REGISTER_PROCESSOR_PLUGIN("processor_content_modifier")
REGISTER_PROCESSOR_PLUGIN("processor_labels")
REGISTER_PROCESSOR_PLUGIN("processor_metrics_selector")
REGISTER_PROCESSOR_PLUGIN("processor_log_replacer")
REGISTER_PROCESSOR_PLUGIN("processor_sql")

# OUTPUTS
Expand Down
4 changes: 4 additions & 0 deletions plugins/processor_log_replacer/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
set(src
log_replacer.c)

FLB_PLUGIN(processor_log_replacer "${src}" "")
154 changes: 154 additions & 0 deletions plugins/processor_log_replacer/log_replacer.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
#include <cfl/cfl.h>
#include <fluent-bit/flb_filter.h>
#include <fluent-bit/flb_lib.h>
#include <fluent-bit/flb_processor_plugin.h>

struct log_replacer_context {
struct flb_processor_instance *ins; /* processor instance */
struct cfl_variant *replacement;
};

static void log_replacer_destroy(struct log_replacer_context *ctx)
{
if (!ctx) {
return;
}

flb_free(ctx);
}

static struct log_replacer_context *
log_replacer_create(struct flb_processor_instance *ins,
struct flb_config *config)
{
int ret;
struct log_replacer_context *ctx;

/* Allocate context */
ctx = flb_calloc(1, sizeof(struct log_replacer_context));
if (!ctx) {
flb_errno();
return NULL;
}

ret = flb_processor_instance_config_map_set(ins, (void *) ctx);
if (ret < 0) {
flb_errno();
flb_plg_error(ins, "configuration error");
flb_free(ctx);
return NULL;
}

ctx->ins = ins;

if (!ctx->replacement) {
flb_plg_error(ins, "\"replacement\" must be set to an array or map");
log_replacer_destroy(ctx);
}

return ctx;
}

static int cb_init(struct flb_processor_instance *ins,
void *source_plugin_instance, int source_plugin_type,
struct flb_config *config)
{
struct log_replacer_context *ctx;

ctx = log_replacer_create(ins, config);
if (!ctx) {
return -1;
}

flb_processor_instance_set_context(ins, ctx);

return FLB_PROCESSOR_SUCCESS;
}

static int cb_exit(struct flb_processor_instance *ins, void *data)
{
struct log_replacer_context *ctx;

if (!ins) {
return FLB_PROCESSOR_SUCCESS;
}

ctx = data;
if (ctx) {
log_replacer_destroy(ctx);
}

return FLB_PROCESSOR_SUCCESS;
}

static int process_logs(struct flb_processor_instance *ins, void *chunk_data,
const char *tag, int tag_len)
{
int ret;
struct cfl_kvpair *kvp;
struct log_replacer_context *ctx;
struct flb_mp_chunk_cobj *chunk_cobj;
struct flb_mp_chunk_record *record;
struct cfl_object *obj = NULL;
struct cfl_list *tmp;
struct cfl_list *head;

ctx = ins->context;
chunk_cobj = (struct flb_mp_chunk_cobj *) chunk_data;


/* Iterate records */
while ((ret = flb_mp_chunk_cobj_record_next(chunk_cobj, &record)) == FLB_MP_CHUNK_RECORD_OK) {
/* retrieve the target cfl object */
obj = record->cobj_record;

/* the operation on top of the data type is unsupported */
if (obj->variant->type != CFL_VARIANT_KVLIST || ctx->replacement->type != CFL_VARIANT_KVLIST) {
cfl_object_destroy(obj);
return -1;
}

/* delete all keys in the record */
cfl_list_foreach_safe(head, tmp, &obj->variant->data.as_kvlist->list) {
kvp = cfl_list_entry(head, struct cfl_kvpair, _head);
cfl_kvpair_destroy(kvp);
}

/* insert all keys from the replacement into the record */
cfl_list_foreach_safe(head, tmp, &ctx->replacement->data.as_kvlist->list) {
kvp = cfl_list_entry(head, struct cfl_kvpair, _head);
cfl_kvlist_insert(obj->variant->data.as_kvlist, kvp->key, kvp->val);
}

/* prevent the engine from deleting the values which are still
* referenced in the context */
obj->variant->referenced = 1;

if (ret != 0) {
return FLB_PROCESSOR_FAILURE;
}
}

return 0;
}

static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_VARIANT, "replacement", NULL, 0, FLB_TRUE,
offsetof(struct log_replacer_context, replacement),
"Object which will be used to replace the record."
},
{0}
};

struct flb_processor_plugin processor_log_replacer_plugin = {
.name = "log_replacer",
.description = "Replace log records with a configured object",
.cb_init = cb_init,
.cb_process_logs = process_logs,
.cb_process_metrics = NULL,
.cb_process_traces = NULL,
.cb_exit = cb_exit,
.config_map = config_map,
.flags = 0
};

0 comments on commit 605a4aa

Please sign in to comment.