diff --git a/plugins/processor_content_modifier/cm.c b/plugins/processor_content_modifier/cm.c index 7af7f6e9002..4a2cf444793 100644 --- a/plugins/processor_content_modifier/cm.c +++ b/plugins/processor_content_modifier/cm.c @@ -100,15 +100,15 @@ static int cb_process_traces(struct flb_processor_instance *ins, static struct flb_config_map config_map[] = { { - FLB_CONFIG_MAP_STR, "action", NULL, - 0, FLB_TRUE, offsetof(struct content_modifier_ctx, action_str), - "Action to perform over the content: insert, upsert, delete, rename or hash." + FLB_CONFIG_MAP_STR, "context", NULL, + 0, FLB_TRUE, offsetof(struct content_modifier_ctx, context_str), + "Context where the action will be applied." }, { - FLB_CONFIG_MAP_STR, "context", NULL, - 0, FLB_TRUE, offsetof(struct content_modifier_ctx, context_str), - "Context to apply the action." + FLB_CONFIG_MAP_STR, "action", NULL, + 0, FLB_TRUE, offsetof(struct content_modifier_ctx, action_str), + "Action to perform over the content: insert, upsert, delete, rename or hash." }, { diff --git a/plugins/processor_content_modifier/cm.h b/plugins/processor_content_modifier/cm.h index 0fba1029c82..c6cae0b6b34 100644 --- a/plugins/processor_content_modifier/cm.h +++ b/plugins/processor_content_modifier/cm.h @@ -48,6 +48,11 @@ enum { CM_CONTEXT_LOG_METADATA, CM_CONTEXT_LOG_BODY, + CM_CONTEXT_OTEL_RESOURCE_ATTR, + CM_CONTEXT_OTEL_SCOPE_NAME, + CM_CONTEXT_OTEL_SCOPE_VERSION, + CM_CONTEXT_OTEL_SCOPE_ATTR, + /* Metrics */ CM_CONTEXT_METRIC_NAME, CM_CONTEXT_METRIC_DESCRIPTION, diff --git a/plugins/processor_content_modifier/cm_config.c b/plugins/processor_content_modifier/cm_config.c index cf46156eb3a..487977cb113 100644 --- a/plugins/processor_content_modifier/cm_config.c +++ b/plugins/processor_content_modifier/cm_config.c @@ -112,6 +112,92 @@ static int set_context(struct content_modifier_ctx *ctx) strcasecmp(ctx->context_str, "record") == 0) { context = CM_CONTEXT_LOG_BODY; } + /* + * OpenTelemetry contexts + * ---------------------- + */ + else if (strcasecmp(ctx->context_str, "otel_resource_attributes") == 0) { + context = CM_CONTEXT_OTEL_RESOURCE_ATTR; + } + else if (strcasecmp(ctx->context_str, "otel_scope_name") == 0) { + /* + * scope name is restricted to specific actions, make sure the user + * cannot messed it up + * + * action allowed ? + * ----------------------------- + * CM_ACTION_INSERT Yes + * CM_ACTION_UPSERT Yes + * CM_ACTION_DELETE Yes + * CM_ACTION_RENAME No + * CM_ACTION_HASH Yes + * CM_ACTION_EXTRACT No + * CM_ACTION_CONVERT No + */ + + if (ctx->action_type == CM_ACTION_RENAME || + ctx->action_type == CM_ACTION_EXTRACT || + ctx->action_type == CM_ACTION_CONVERT) { + flb_plg_error(ctx->ins, "action '%s' is not allowed for context '%s'", + ctx->action_str, ctx->context_str); + return -1; + } + + /* check that 'name' is the key set */ + if (!ctx->key) { + ctx->key = flb_sds_create("name"); + } + else if (strcasecmp(ctx->key, "name") != 0) { + flb_plg_error(ctx->ins, "context '%s' requires the name of the key to be 'name', no '%s'", + ctx->context_str, ctx->key); + return -1; + } + + context = CM_CONTEXT_OTEL_SCOPE_NAME; + } + else if (strcasecmp(ctx->context_str, "otel_scope_version") == 0) { + /* + * scope version, same as the name, it's restricted to specific actions, make sure the user + * cannot messed it up + * + * action allowed ? + * ----------------------------- + * CM_ACTION_INSERT Yes + * CM_ACTION_UPSERT Yes + * CM_ACTION_DELETE Yes + * CM_ACTION_RENAME No + * CM_ACTION_HASH Yes + * CM_ACTION_EXTRACT No + * CM_ACTION_CONVERT No + */ + + if (ctx->action_type == CM_ACTION_RENAME || + ctx->action_type == CM_ACTION_EXTRACT || + ctx->action_type == CM_ACTION_CONVERT) { + flb_plg_error(ctx->ins, "action '%s' is not allowed for context '%s'", + ctx->action_str, ctx->context_str); + return -1; + } + + /* check that 'version' is the key set */ + if (!ctx->key) { + ctx->key = flb_sds_create("version"); + } + else if (strcasecmp(ctx->key, "version") != 0) { + flb_plg_error(ctx->ins, "context '%s' requires the name of the key to be 'version', no '%s'", + ctx->context_str, ctx->key); + return -1; + } + context = CM_CONTEXT_OTEL_SCOPE_VERSION; + } + else if (strcasecmp(ctx->context_str, "otel_scope_attributes") == 0) { + context = CM_CONTEXT_OTEL_SCOPE_ATTR; + } + else if (strcasecmp(ctx->context_str, "otel_scope_name") == 0) { + } + else if (strcasecmp(ctx->context_str, "otel_scope_version") == 0) { + context = CM_CONTEXT_OTEL_SCOPE_VERSION; + } else { flb_plg_error(ctx->ins, "unknown logs context '%s'", ctx->context_str); return -1; @@ -177,7 +263,7 @@ static int check_action_requirements(struct content_modifier_ctx *ctx) /* these only requires a key, already validated (useless code) */ } else if (ctx->action_type == CM_ACTION_INSERT || ctx->action_type == CM_ACTION_UPSERT || - ctx->action_type == CM_ACTION_RENAME) { + ctx->action_type == CM_ACTION_RENAME) { if (!ctx->value) { flb_plg_error(ctx->ins, "value is required for action '%s'", ctx->action_str); diff --git a/plugins/processor_content_modifier/cm_logs.c b/plugins/processor_content_modifier/cm_logs.c index ba0d51c75b1..8a73088ef78 100644 --- a/plugins/processor_content_modifier/cm_logs.c +++ b/plugins/processor_content_modifier/cm_logs.c @@ -595,6 +595,134 @@ static int run_action_convert(struct content_modifier_ctx *ctx, return 0; } +static struct cfl_variant *otel_get_or_create_attributes(struct cfl_kvlist *kvlist) +{ + int ret; + struct cfl_list *head; + struct cfl_list *tmp; + struct cfl_kvpair *kvpair; + struct cfl_variant *val; + struct cfl_kvlist *kvlist_tmp; + + /* iterate resource to find the attributes field */ + cfl_list_foreach_safe(head, tmp, &kvlist->list) { + kvpair = cfl_list_entry(head, struct cfl_kvpair, _head); + if (cfl_sds_len(kvpair->key) != 10) { + continue; + } + + if (strncmp(kvpair->key, "attributes", 10) == 0) { + val = kvpair->val; + if (val->type != CFL_VARIANT_KVLIST) { + return NULL; + } + + return val; + } + } + + /* create an empty kvlist as the value of attributes */ + kvlist_tmp = cfl_kvlist_create(); + if (!kvlist_tmp) { + return NULL; + } + + /* create the attributes kvpair */ + ret = cfl_kvlist_insert_kvlist_s(kvlist, "attributes", 10, kvlist_tmp); + if (ret != 0) { + cfl_kvlist_destroy(kvlist_tmp); + return NULL; + } + + /* get the last kvpair from the list */ + kvpair = cfl_list_entry_last(&kvlist->list, struct cfl_kvpair, _head); + if (!kvpair) { + return NULL; + } + + return kvpair->val; +} + + +static struct cfl_variant *otel_get_attributes(int context, struct flb_mp_chunk_record *record) +{ + int key_len; + const char *key_buf; + struct cfl_list *head; + struct cfl_object *obj = NULL; + struct cfl_variant *val; + struct cfl_kvlist *kvlist; + struct cfl_kvpair *kvpair; + struct cfl_variant *var_attr; + + if (context == CM_CONTEXT_OTEL_RESOURCE_ATTR) { + key_buf = "resource"; + key_len = 8; + } + else if (context == CM_CONTEXT_OTEL_SCOPE_ATTR) { + key_buf = "scope"; + key_len = 5; + } + else { + return NULL; + } + + obj = record->cobj_record; + kvlist = obj->variant->data.as_kvlist; + cfl_list_foreach(head, &kvlist->list) { + kvpair = cfl_list_entry(head, struct cfl_kvpair, _head); + + if (cfl_sds_len(kvpair->key) != key_len) { + continue; + } + + if (strncmp(kvpair->key, key_buf, key_len) == 0) { + val = kvpair->val; + if (val->type != CFL_VARIANT_KVLIST) { + return NULL; + } + + var_attr = otel_get_or_create_attributes(val->data.as_kvlist); + if (!var_attr) { + return NULL; + } + + return var_attr; + } + } + + return NULL; +} + +static struct cfl_variant *otel_get_scope(struct flb_mp_chunk_record *record) +{ + struct cfl_list *head; + struct cfl_object *obj; + struct cfl_variant *val; + struct cfl_kvlist *kvlist; + struct cfl_kvpair *kvpair; + + obj = record->cobj_record; + kvlist = obj->variant->data.as_kvlist; + cfl_list_foreach(head, &kvlist->list) { + kvpair = cfl_list_entry(head, struct cfl_kvpair, _head); + + if (cfl_sds_len(kvpair->key) != 5) { + continue; + } + + if (strncmp(kvpair->key, "scope", 5) == 0) { + val = kvpair->val; + if (val->type != CFL_VARIANT_KVLIST) { + return NULL; + } + + return val; + } + } + + return NULL; +} int cm_logs_process(struct flb_processor_instance *ins, struct content_modifier_ctx *ctx, struct flb_mp_chunk_cobj *chunk_cobj, @@ -602,11 +730,23 @@ int cm_logs_process(struct flb_processor_instance *ins, int tag_len) { int ret = -1; + int record_type; struct flb_mp_chunk_record *record; struct cfl_object *obj = NULL; + struct cfl_object obj_static; + struct cfl_variant *var; /* Iterate records */ while ((ret = flb_mp_chunk_cobj_record_next(chunk_cobj, &record)) == FLB_MP_CHUNK_RECORD_OK) { + obj = NULL; + + /* Retrieve information about the record type */ + ret = flb_log_event_decoder_get_record_type(&record->event, &record_type); + if (ret != 0) { + flb_plg_error(ctx->ins, "record has invalid event type"); + continue; + } + /* retrieve the target cfl object */ if (ctx->context_type == CM_CONTEXT_LOG_METADATA) { obj = record->cobj_metadata; @@ -614,6 +754,41 @@ int cm_logs_process(struct flb_processor_instance *ins, else if (ctx->context_type == CM_CONTEXT_LOG_BODY) { obj = record->cobj_record; } + else if (ctx->context_type == CM_CONTEXT_OTEL_RESOURCE_ATTR && + record_type == FLB_LOG_EVENT_GROUP_START) { + var = otel_get_attributes(CM_CONTEXT_OTEL_RESOURCE_ATTR, record); + if (!var) { + continue; + } + + obj_static.type = CFL_VARIANT_KVLIST; + obj_static.variant = var; + obj = &obj_static; + } + else if (ctx->context_type == CM_CONTEXT_OTEL_SCOPE_ATTR && + record_type == FLB_LOG_EVENT_GROUP_START) { + + var = otel_get_attributes(CM_CONTEXT_OTEL_SCOPE_ATTR, record); + if (!var) { + continue; + } + + obj_static.type = CFL_VARIANT_KVLIST; + obj_static.variant = var; + obj = &obj_static; + } + else if ((ctx->context_type == CM_CONTEXT_OTEL_SCOPE_NAME || ctx->context_type == CM_CONTEXT_OTEL_SCOPE_VERSION) && + record_type == FLB_LOG_EVENT_GROUP_START) { + + var = otel_get_scope(record); + obj_static.type = CFL_VARIANT_KVLIST; + obj_static.variant = var; + obj = &obj_static; + } + + if (!obj) { + continue; + } /* the operation on top of the data type is unsupported */ if (obj->variant->type != CFL_VARIANT_KVLIST) {