From 706097915639880526dafc6a8c3b03f695bb840b Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 20 Jun 2024 17:31:05 -0600 Subject: [PATCH] processor_content_modifier: add support for OTel Logs Resource and Scopes The following patch extends the processor to allow to modify the resources and scopes of Logs generated by an OpenTelemetry source. The following new contexts are supported: - otel_resource_attributes: alter resource attributes - otel_scope_name: manipulate the scope name - otel_scope_version: manipulate the scope version - otel_scope_attributes: alter the scope attributes example: ----- fluent-bit.yaml ----- pipeline: inputs: - name: opentelemetry port: ${FLUENT_BIT_TEST_LISTENER_PORT} processors: logs: - name: content_modifier context: otel_resource_attributes action: upsert key: "new_attr" value: "my_val" - name: content_modifier context: otel_resource_attributes action: delete key: "service.name" - name: content_modifier context: otel_scope_attributes action: upsert key: "my_new_scope_attr" value: "123" - name: content_modifier context: otel_scope_name action: upsert value: "new scope name" - name: content_modifier context: otel_scope_version action: upsert value: "3.1.0" outputs: - name: stdout match: '*' - name: opentelemetry match: '*' host: 127.0.0.1 port: ${TEST_SUITE_HTTP_PORT} ----- end of file ----- Signed-off-by: Eduardo Silva --- plugins/processor_content_modifier/cm.c | 12 +- plugins/processor_content_modifier/cm.h | 5 + .../processor_content_modifier/cm_config.c | 88 ++++++++- plugins/processor_content_modifier/cm_logs.c | 175 ++++++++++++++++++ 4 files changed, 273 insertions(+), 7 deletions(-) diff --git a/plugins/processor_content_modifier/cm.c b/plugins/processor_content_modifier/cm.c index 7af7f6e9002..4a2cf444793 100644 --- a/plugins/processor_content_modifier/cm.c +++ b/plugins/processor_content_modifier/cm.c @@ -100,15 +100,15 @@ static int cb_process_traces(struct flb_processor_instance *ins, static struct flb_config_map config_map[] = { { - FLB_CONFIG_MAP_STR, "action", NULL, - 0, FLB_TRUE, offsetof(struct content_modifier_ctx, action_str), - "Action to perform over the content: insert, upsert, delete, rename or hash." + FLB_CONFIG_MAP_STR, "context", NULL, + 0, FLB_TRUE, offsetof(struct content_modifier_ctx, context_str), + "Context where the action will be applied." }, { - FLB_CONFIG_MAP_STR, "context", NULL, - 0, FLB_TRUE, offsetof(struct content_modifier_ctx, context_str), - "Context to apply the action." + FLB_CONFIG_MAP_STR, "action", NULL, + 0, FLB_TRUE, offsetof(struct content_modifier_ctx, action_str), + "Action to perform over the content: insert, upsert, delete, rename or hash." }, { diff --git a/plugins/processor_content_modifier/cm.h b/plugins/processor_content_modifier/cm.h index 0fba1029c82..c6cae0b6b34 100644 --- a/plugins/processor_content_modifier/cm.h +++ b/plugins/processor_content_modifier/cm.h @@ -48,6 +48,11 @@ enum { CM_CONTEXT_LOG_METADATA, CM_CONTEXT_LOG_BODY, + CM_CONTEXT_OTEL_RESOURCE_ATTR, + CM_CONTEXT_OTEL_SCOPE_NAME, + CM_CONTEXT_OTEL_SCOPE_VERSION, + CM_CONTEXT_OTEL_SCOPE_ATTR, + /* Metrics */ CM_CONTEXT_METRIC_NAME, CM_CONTEXT_METRIC_DESCRIPTION, diff --git a/plugins/processor_content_modifier/cm_config.c b/plugins/processor_content_modifier/cm_config.c index cf46156eb3a..487977cb113 100644 --- a/plugins/processor_content_modifier/cm_config.c +++ b/plugins/processor_content_modifier/cm_config.c @@ -112,6 +112,92 @@ static int set_context(struct content_modifier_ctx *ctx) strcasecmp(ctx->context_str, "record") == 0) { context = CM_CONTEXT_LOG_BODY; } + /* + * OpenTelemetry contexts + * ---------------------- + */ + else if (strcasecmp(ctx->context_str, "otel_resource_attributes") == 0) { + context = CM_CONTEXT_OTEL_RESOURCE_ATTR; + } + else if (strcasecmp(ctx->context_str, "otel_scope_name") == 0) { + /* + * scope name is restricted to specific actions, make sure the user + * cannot messed it up + * + * action allowed ? + * ----------------------------- + * CM_ACTION_INSERT Yes + * CM_ACTION_UPSERT Yes + * CM_ACTION_DELETE Yes + * CM_ACTION_RENAME No + * CM_ACTION_HASH Yes + * CM_ACTION_EXTRACT No + * CM_ACTION_CONVERT No + */ + + if (ctx->action_type == CM_ACTION_RENAME || + ctx->action_type == CM_ACTION_EXTRACT || + ctx->action_type == CM_ACTION_CONVERT) { + flb_plg_error(ctx->ins, "action '%s' is not allowed for context '%s'", + ctx->action_str, ctx->context_str); + return -1; + } + + /* check that 'name' is the key set */ + if (!ctx->key) { + ctx->key = flb_sds_create("name"); + } + else if (strcasecmp(ctx->key, "name") != 0) { + flb_plg_error(ctx->ins, "context '%s' requires the name of the key to be 'name', no '%s'", + ctx->context_str, ctx->key); + return -1; + } + + context = CM_CONTEXT_OTEL_SCOPE_NAME; + } + else if (strcasecmp(ctx->context_str, "otel_scope_version") == 0) { + /* + * scope version, same as the name, it's restricted to specific actions, make sure the user + * cannot messed it up + * + * action allowed ? + * ----------------------------- + * CM_ACTION_INSERT Yes + * CM_ACTION_UPSERT Yes + * CM_ACTION_DELETE Yes + * CM_ACTION_RENAME No + * CM_ACTION_HASH Yes + * CM_ACTION_EXTRACT No + * CM_ACTION_CONVERT No + */ + + if (ctx->action_type == CM_ACTION_RENAME || + ctx->action_type == CM_ACTION_EXTRACT || + ctx->action_type == CM_ACTION_CONVERT) { + flb_plg_error(ctx->ins, "action '%s' is not allowed for context '%s'", + ctx->action_str, ctx->context_str); + return -1; + } + + /* check that 'version' is the key set */ + if (!ctx->key) { + ctx->key = flb_sds_create("version"); + } + else if (strcasecmp(ctx->key, "version") != 0) { + flb_plg_error(ctx->ins, "context '%s' requires the name of the key to be 'version', no '%s'", + ctx->context_str, ctx->key); + return -1; + } + context = CM_CONTEXT_OTEL_SCOPE_VERSION; + } + else if (strcasecmp(ctx->context_str, "otel_scope_attributes") == 0) { + context = CM_CONTEXT_OTEL_SCOPE_ATTR; + } + else if (strcasecmp(ctx->context_str, "otel_scope_name") == 0) { + } + else if (strcasecmp(ctx->context_str, "otel_scope_version") == 0) { + context = CM_CONTEXT_OTEL_SCOPE_VERSION; + } else { flb_plg_error(ctx->ins, "unknown logs context '%s'", ctx->context_str); return -1; @@ -177,7 +263,7 @@ static int check_action_requirements(struct content_modifier_ctx *ctx) /* these only requires a key, already validated (useless code) */ } else if (ctx->action_type == CM_ACTION_INSERT || ctx->action_type == CM_ACTION_UPSERT || - ctx->action_type == CM_ACTION_RENAME) { + ctx->action_type == CM_ACTION_RENAME) { if (!ctx->value) { flb_plg_error(ctx->ins, "value is required for action '%s'", ctx->action_str); diff --git a/plugins/processor_content_modifier/cm_logs.c b/plugins/processor_content_modifier/cm_logs.c index ba0d51c75b1..8a73088ef78 100644 --- a/plugins/processor_content_modifier/cm_logs.c +++ b/plugins/processor_content_modifier/cm_logs.c @@ -595,6 +595,134 @@ static int run_action_convert(struct content_modifier_ctx *ctx, return 0; } +static struct cfl_variant *otel_get_or_create_attributes(struct cfl_kvlist *kvlist) +{ + int ret; + struct cfl_list *head; + struct cfl_list *tmp; + struct cfl_kvpair *kvpair; + struct cfl_variant *val; + struct cfl_kvlist *kvlist_tmp; + + /* iterate resource to find the attributes field */ + cfl_list_foreach_safe(head, tmp, &kvlist->list) { + kvpair = cfl_list_entry(head, struct cfl_kvpair, _head); + if (cfl_sds_len(kvpair->key) != 10) { + continue; + } + + if (strncmp(kvpair->key, "attributes", 10) == 0) { + val = kvpair->val; + if (val->type != CFL_VARIANT_KVLIST) { + return NULL; + } + + return val; + } + } + + /* create an empty kvlist as the value of attributes */ + kvlist_tmp = cfl_kvlist_create(); + if (!kvlist_tmp) { + return NULL; + } + + /* create the attributes kvpair */ + ret = cfl_kvlist_insert_kvlist_s(kvlist, "attributes", 10, kvlist_tmp); + if (ret != 0) { + cfl_kvlist_destroy(kvlist_tmp); + return NULL; + } + + /* get the last kvpair from the list */ + kvpair = cfl_list_entry_last(&kvlist->list, struct cfl_kvpair, _head); + if (!kvpair) { + return NULL; + } + + return kvpair->val; +} + + +static struct cfl_variant *otel_get_attributes(int context, struct flb_mp_chunk_record *record) +{ + int key_len; + const char *key_buf; + struct cfl_list *head; + struct cfl_object *obj = NULL; + struct cfl_variant *val; + struct cfl_kvlist *kvlist; + struct cfl_kvpair *kvpair; + struct cfl_variant *var_attr; + + if (context == CM_CONTEXT_OTEL_RESOURCE_ATTR) { + key_buf = "resource"; + key_len = 8; + } + else if (context == CM_CONTEXT_OTEL_SCOPE_ATTR) { + key_buf = "scope"; + key_len = 5; + } + else { + return NULL; + } + + obj = record->cobj_record; + kvlist = obj->variant->data.as_kvlist; + cfl_list_foreach(head, &kvlist->list) { + kvpair = cfl_list_entry(head, struct cfl_kvpair, _head); + + if (cfl_sds_len(kvpair->key) != key_len) { + continue; + } + + if (strncmp(kvpair->key, key_buf, key_len) == 0) { + val = kvpair->val; + if (val->type != CFL_VARIANT_KVLIST) { + return NULL; + } + + var_attr = otel_get_or_create_attributes(val->data.as_kvlist); + if (!var_attr) { + return NULL; + } + + return var_attr; + } + } + + return NULL; +} + +static struct cfl_variant *otel_get_scope(struct flb_mp_chunk_record *record) +{ + struct cfl_list *head; + struct cfl_object *obj; + struct cfl_variant *val; + struct cfl_kvlist *kvlist; + struct cfl_kvpair *kvpair; + + obj = record->cobj_record; + kvlist = obj->variant->data.as_kvlist; + cfl_list_foreach(head, &kvlist->list) { + kvpair = cfl_list_entry(head, struct cfl_kvpair, _head); + + if (cfl_sds_len(kvpair->key) != 5) { + continue; + } + + if (strncmp(kvpair->key, "scope", 5) == 0) { + val = kvpair->val; + if (val->type != CFL_VARIANT_KVLIST) { + return NULL; + } + + return val; + } + } + + return NULL; +} int cm_logs_process(struct flb_processor_instance *ins, struct content_modifier_ctx *ctx, struct flb_mp_chunk_cobj *chunk_cobj, @@ -602,11 +730,23 @@ int cm_logs_process(struct flb_processor_instance *ins, int tag_len) { int ret = -1; + int record_type; struct flb_mp_chunk_record *record; struct cfl_object *obj = NULL; + struct cfl_object obj_static; + struct cfl_variant *var; /* Iterate records */ while ((ret = flb_mp_chunk_cobj_record_next(chunk_cobj, &record)) == FLB_MP_CHUNK_RECORD_OK) { + obj = NULL; + + /* Retrieve information about the record type */ + ret = flb_log_event_decoder_get_record_type(&record->event, &record_type); + if (ret != 0) { + flb_plg_error(ctx->ins, "record has invalid event type"); + continue; + } + /* retrieve the target cfl object */ if (ctx->context_type == CM_CONTEXT_LOG_METADATA) { obj = record->cobj_metadata; @@ -614,6 +754,41 @@ int cm_logs_process(struct flb_processor_instance *ins, else if (ctx->context_type == CM_CONTEXT_LOG_BODY) { obj = record->cobj_record; } + else if (ctx->context_type == CM_CONTEXT_OTEL_RESOURCE_ATTR && + record_type == FLB_LOG_EVENT_GROUP_START) { + var = otel_get_attributes(CM_CONTEXT_OTEL_RESOURCE_ATTR, record); + if (!var) { + continue; + } + + obj_static.type = CFL_VARIANT_KVLIST; + obj_static.variant = var; + obj = &obj_static; + } + else if (ctx->context_type == CM_CONTEXT_OTEL_SCOPE_ATTR && + record_type == FLB_LOG_EVENT_GROUP_START) { + + var = otel_get_attributes(CM_CONTEXT_OTEL_SCOPE_ATTR, record); + if (!var) { + continue; + } + + obj_static.type = CFL_VARIANT_KVLIST; + obj_static.variant = var; + obj = &obj_static; + } + else if ((ctx->context_type == CM_CONTEXT_OTEL_SCOPE_NAME || ctx->context_type == CM_CONTEXT_OTEL_SCOPE_VERSION) && + record_type == FLB_LOG_EVENT_GROUP_START) { + + var = otel_get_scope(record); + obj_static.type = CFL_VARIANT_KVLIST; + obj_static.variant = var; + obj = &obj_static; + } + + if (!obj) { + continue; + } /* the operation on top of the data type is unsupported */ if (obj->variant->type != CFL_VARIANT_KVLIST) {