diff --git a/include/fluent-bit/flb_config_map.h b/include/fluent-bit/flb_config_map.h index c644df5330b..aa81da53af7 100644 --- a/include/fluent-bit/flb_config_map.h +++ b/include/fluent-bit/flb_config_map.h @@ -23,6 +23,9 @@ #include #include #include +#include +#include +#include #include /* Configuration types */ @@ -47,6 +50,9 @@ #define FLB_CONFIG_MAP_SLIST_3 43 /* split up to 3 nodes + remaining data */ #define FLB_CONFIG_MAP_SLIST_4 44 /* split up to 4 nodes + remaining data */ +#define FLB_CONFIG_MAP_KVLIST 50 /* key/value list */ +#define FLB_CONFIG_MAP_ARRAY 51 /* array */ + #define FLB_CONFIG_MAP_MULT 1 struct flb_config_map_val { @@ -57,7 +63,10 @@ struct flb_config_map_val { size_t s_num; /* FLB_CONFIG_MAP_SIZE */ flb_sds_t str; /* FLB_CONFIG_MAP_STR */ struct mk_list *list; /* FLB_CONFIG_MAP_CLIST and FLB_CONFIG_MAP_SLIST */ + struct cfl_array *array; /* FLB_CONFIG_MAP_ARRAY */ + struct cfl_kvlist *kvlist; /* FLB_CONFIG_MAP_KVLIST */ } val; + int type; struct mk_list *mult; struct mk_list _head; /* Link to list if this entry is a 'multiple' entry */ }; @@ -105,5 +114,6 @@ struct mk_list *flb_config_map_create(struct flb_config *config, void flb_config_map_destroy(struct mk_list *list); int flb_config_map_expected_values(int type); int flb_config_map_set(struct mk_list *properties, struct mk_list *map, void *context); +int flb_config_map_set_from_kvlist(struct cfl_kvlist *properties, struct mk_list *map, void *context); #endif diff --git a/include/fluent-bit/flb_processor.h b/include/fluent-bit/flb_processor.h index 1ad0d0ae813..8ff17fb40fd 100644 --- a/include/fluent-bit/flb_processor.h +++ b/include/fluent-bit/flb_processor.h @@ -166,7 +166,7 @@ struct flb_processor_instance { void *context; /* Instance local context */ void *data; struct flb_processor_plugin *p; /* original plugin */ - struct mk_list properties; /* config properties */ + struct cfl_kvlist *properties; /* config properties */ struct mk_list *config_map; /* configuration map */ struct flb_log_event_decoder *log_decoder; @@ -242,7 +242,10 @@ int flb_processor_instance_check_properties( int flb_processor_instance_set_property( struct flb_processor_instance *ins, - const char *k, const char *v); + const char *k, struct cfl_variant *v); + +int flb_processor_instance_set_property_variant(struct flb_processor_instance *ins, + struct cfl_kvpair *pair); const char *flb_processor_instance_get_property( const char *key, @@ -255,7 +258,7 @@ static inline int flb_processor_instance_config_map_set( struct flb_processor_instance *ins, void *context) { - return flb_config_map_set(&ins->properties, ins->config_map, context); + return flb_config_map_set_from_kvlist(ins->properties, ins->config_map, context); } #endif diff --git a/plugins/processor_content_modifier/cm.c b/plugins/processor_content_modifier/cm.c index f4e127595d8..898cded0ee2 100644 --- a/plugins/processor_content_modifier/cm.c +++ b/plugins/processor_content_modifier/cm.c @@ -104,6 +104,12 @@ static struct flb_config_map config_map[] = { "Action to perform over the content: insert, upsert, delete, rename or hash." }, + { + FLB_CONFIG_MAP_ARRAY, "actions", NULL, + 0, FLB_TRUE, offsetof(struct content_modifier_ctx, actions_list), + "Actions to perform over the content: insert, upsert, delete, rename or hash." + }, + { FLB_CONFIG_MAP_STR, "context", NULL, 0, FLB_TRUE, offsetof(struct content_modifier_ctx, context_str), diff --git a/plugins/processor_content_modifier/cm.h b/plugins/processor_content_modifier/cm.h index 0fba1029c82..6bb4a028c0e 100644 --- a/plugins/processor_content_modifier/cm.h +++ b/plugins/processor_content_modifier/cm.h @@ -61,39 +61,46 @@ enum { }; struct cm_actions { - /* - * Based on the type, we either register a key/value pair or a - * single string value - */ - union { - struct cfl_kv *kv; - cfl_sds_t str; - } value; + flb_sds_t key; + flb_sds_t value; /* Link to struct proc_attr_rules->rules */ struct cfl_list _head; }; +struct content_modifier_action { + int type; + int context_type; + int converted_type; + flb_sds_t key; + flb_sds_t value; + struct flb_regex *pattern; + + struct mk_list _head; +}; + struct content_modifier_ctx { int telemetry_type; - /* Type of action (e.g. ..._ACTION_DELETE, ..._ACTION_INSERT )*/ - int action_type; + // /* Type of action (e.g. ..._ACTION_DELETE, ..._ACTION_INSERT )*/ + // int action_type; - /* Context where the action is applied */ - int context_type; + // /* Context where the action is applied */ + // int context_type; - /* CFL_VARIANT numerical type representation of converted_type_str */ - int converted_type; + // /* CFL_VARIANT numerical type representation of converted_type_str */ + // int converted_type; /* public configuration properties */ - flb_sds_t action_str; /* converted to action_type */ - flb_sds_t context_str; /* converted to context_type */ - flb_sds_t pattern; /* pattern to create 'regex' context */ - flb_sds_t converted_type_str; /* converted_type */ - flb_sds_t key; /* target key */ - flb_sds_t value; /* used for any value */ - struct flb_regex *regex; /* regular expression context created from 'pattern' */ + struct cfl_array *actions_list; /* list of several actions */ + flb_sds_t action_str; /* converted to action_type */ + struct mk_list actions; + flb_sds_t key; /* target key */ + flb_sds_t value; /* used for any value */ + flb_sds_t context_str; /* converted to context_type */ + flb_sds_t pattern; /* pattern to create 'regex' context */ + flb_sds_t converted_type_str; /* converted_type */ + // struct flb_regex *regex; /* regular expression context created from 'pattern' */ /* processor instance reference */ struct flb_processor_instance *ins; diff --git a/plugins/processor_content_modifier/cm_config.c b/plugins/processor_content_modifier/cm_config.c index cf46156eb3a..c9635f97bf5 100644 --- a/plugins/processor_content_modifier/cm_config.c +++ b/plugins/processor_content_modifier/cm_config.c @@ -23,67 +23,73 @@ #include "cm.h" -static int set_action(struct content_modifier_ctx *ctx) +static int action_set_action(struct content_modifier_ctx *ctx, + struct content_modifier_action *action, + const char *action_str) { - if (strcasecmp(ctx->action_str, "insert") == 0) { - ctx->action_type = CM_ACTION_INSERT; + if (strcasecmp(action_str, "insert") == 0) { + action->type = CM_ACTION_INSERT; } - else if (strcasecmp(ctx->action_str, "upsert") == 0) { - ctx->action_type = CM_ACTION_UPSERT; + else if (strcasecmp(action_str, "upsert") == 0) { + action->type = CM_ACTION_UPSERT; } - else if (strcasecmp(ctx->action_str, "delete") == 0) { - ctx->action_type = CM_ACTION_DELETE; + else if (strcasecmp(action_str, "delete") == 0) { + action->type = CM_ACTION_DELETE; } - else if (strcasecmp(ctx->action_str, "rename") == 0) { - ctx->action_type = CM_ACTION_RENAME; + else if (strcasecmp(action_str, "rename") == 0) { + action->type = CM_ACTION_RENAME; } - else if (strcasecmp(ctx->action_str, "hash") == 0) { - ctx->action_type = CM_ACTION_HASH; + else if (strcasecmp(action_str, "hash") == 0) { + action->type = CM_ACTION_HASH; } - else if (strcasecmp(ctx->action_str, "extract") == 0) { - ctx->action_type = CM_ACTION_EXTRACT; + else if (strcasecmp(action_str, "extract") == 0) { + action->type = CM_ACTION_EXTRACT; } - else if (strcasecmp(ctx->action_str, "convert") == 0) { - ctx->action_type = CM_ACTION_CONVERT; + else if (strcasecmp(action_str, "convert") == 0) { + action->type = CM_ACTION_CONVERT; } else { - flb_plg_error(ctx->ins, "unknown action '%s'", ctx->action_str); + flb_plg_error(ctx->ins, "unknown action '%s'", action_str); return -1; } return 0; } -static int set_converted_type(struct content_modifier_ctx *ctx) +static int action_set_converted_type(struct content_modifier_ctx *ctx, + struct content_modifier_action *act, + const char *converted_type_str) { int type = -1; - if (!ctx->converted_type_str) { - ctx->converted_type = -1; + if (!converted_type_str) { + act->converted_type = -1; } - if (strcasecmp(ctx->converted_type_str, "string") == 0) { + if (strcasecmp(converted_type_str, "string") == 0) { type = CFL_VARIANT_STRING; } - else if (strcasecmp(ctx->converted_type_str, "boolean") == 0) { + else if (strcasecmp(converted_type_str, "boolean") == 0) { type = CFL_VARIANT_BOOL; } - else if (strcasecmp(ctx->converted_type_str, "int") == 0) { + else if (strcasecmp(converted_type_str, "int") == 0) { type = CFL_VARIANT_INT; } - else if (strcasecmp(ctx->converted_type_str, "double") == 0) { + else if (strcasecmp(converted_type_str, "double") == 0) { type = CFL_VARIANT_DOUBLE; } else { - flb_plg_error(ctx->ins, "unsupported converted_type '%s'", ctx->converted_type_str); + flb_plg_error(ctx->ins, "unsupported converted_type '%s'", converted_type_str); return -1; } - ctx->converted_type = type; + act->converted_type = type; return 0; } -static int set_context(struct content_modifier_ctx *ctx) +static int action_set_context(struct content_modifier_ctx *ctx, + struct content_modifier_action *act, + const char *context_str) { int context = CM_CONTEXT_UNDEFINED; int event_type; @@ -99,106 +105,101 @@ static int set_context(struct content_modifier_ctx *ctx) * specific context of the Telemetry type. */ if (event_type == FLB_PROCESSOR_LOGS) { - if (ctx->context_str == NULL) { + if (context_str == NULL) { /* if no context is set, use the log body */ context = CM_CONTEXT_LOG_BODY; } - else if (strcasecmp(ctx->context_str, "metadata") == 0 || - strcasecmp(ctx->context_str, "attributes") == 0) { + else if (strcasecmp(context_str, "metadata") == 0 || + strcasecmp(context_str, "attributes") == 0) { context = CM_CONTEXT_LOG_METADATA; } - else if (strcasecmp(ctx->context_str, "body") == 0 || - strcasecmp(ctx->context_str, "message") == 0 || - strcasecmp(ctx->context_str, "record") == 0) { + else if (strcasecmp(context_str, "body") == 0 || + strcasecmp(context_str, "message") == 0 || + strcasecmp(context_str, "record") == 0) { context = CM_CONTEXT_LOG_BODY; } else { - flb_plg_error(ctx->ins, "unknown logs context '%s'", ctx->context_str); + flb_plg_error(ctx->ins, "unknown logs context '%s'", context_str); return -1; } } else if (event_type == FLB_PROCESSOR_METRICS) { - if (ctx->context_str == NULL) { + if (context_str == NULL) { /* if no context is set, use labels */ context = CM_CONTEXT_METRIC_LABELS; } - else if (strcasecmp(ctx->context_str, "name") == 0) { + else if (strcasecmp(context_str, "name") == 0) { context = CM_CONTEXT_METRIC_NAME; } - else if (strcasecmp(ctx->context_str, "description") == 0) { + else if (strcasecmp(context_str, "description") == 0) { context = CM_CONTEXT_METRIC_DESCRIPTION; } - else if (strcasecmp(ctx->context_str, "labels") == 0 || - strcasecmp(ctx->context_str, "attributes") == 0) { + else if (strcasecmp(context_str, "labels") == 0 || + strcasecmp(context_str, "attributes") == 0) { context = CM_CONTEXT_METRIC_LABELS; } else { - flb_plg_error(ctx->ins, "unknown metrics context '%s'", ctx->context_str); + flb_plg_error(ctx->ins, "unknown metrics context '%s'", context_str); return -1; } } else if (event_type == FLB_PROCESSOR_TRACES) { - if (ctx->context_str == NULL) { + if (context_str == NULL) { /* if no context is set, use span attributes */ context = CM_CONTEXT_TRACE_SPAN_ATTRIBUTES; } - else if (strcasecmp(ctx->context_str, "span_name") == 0) { + else if (strcasecmp(context_str, "span_name") == 0) { context = CM_CONTEXT_TRACE_SPAN_NAME; } - else if (strcasecmp(ctx->context_str, "span_kind") == 0) { + else if (strcasecmp(context_str, "span_kind") == 0) { context = CM_CONTEXT_TRACE_SPAN_KIND; } - else if (strcasecmp(ctx->context_str, "span_status") == 0) { + else if (strcasecmp(context_str, "span_status") == 0) { context = CM_CONTEXT_TRACE_SPAN_STATUS; } - else if (strcasecmp(ctx->context_str, "span_attributes") == 0) { + else if (strcasecmp(context_str, "span_attributes") == 0) { context = CM_CONTEXT_TRACE_SPAN_ATTRIBUTES; } else { - flb_plg_error(ctx->ins, "unknown traces context '%s'", ctx->context_str); + flb_plg_error(ctx->ins, "unknown traces context '%s'", context_str); return -1; } } - ctx->context_type = context; + act->context_type = context; return 0; } -static int check_action_requirements(struct content_modifier_ctx *ctx) +static int check_action_requirements(struct content_modifier_ctx *ctx, + struct content_modifier_action *act) { int ret; - if (!ctx->key) { - flb_plg_error(ctx->ins, "key is required for action '%s'", ctx->action_str); + if (!act->key) { + flb_plg_error(ctx->ins, "key is required for action"); return -1; } - if (ctx->action_type == CM_ACTION_DELETE || ctx->action_type == CM_ACTION_HASH) { + if (act->type == CM_ACTION_DELETE || act->type == CM_ACTION_HASH) { /* these only requires a key, already validated (useless code) */ } - else if (ctx->action_type == CM_ACTION_INSERT || ctx->action_type == CM_ACTION_UPSERT || - ctx->action_type == CM_ACTION_RENAME) { + else if (act->type == CM_ACTION_INSERT || act->type == CM_ACTION_UPSERT || + act->type == CM_ACTION_RENAME) { - if (!ctx->value) { - flb_plg_error(ctx->ins, "value is required for action '%s'", ctx->action_str); + if (!act->value) { + flb_plg_error(ctx->ins, "value is required for action"); return -1; } } - else if (ctx->action_type == CM_ACTION_EXTRACT) { - if (!ctx->pattern) { + else if (act->type == CM_ACTION_EXTRACT) { + if (!act->pattern) { flb_plg_error(ctx->ins, "for 'extract' action, a regular expression in 'pattern' is required"); return -1; } } - else if (ctx->action_type == CM_ACTION_CONVERT) { - if (!ctx->converted_type_str) { - flb_plg_error(ctx->ins, "converted_type is required for action '%s'", ctx->action_str); - return -1; - } - - ret = set_converted_type(ctx); - if (ret == -1) { - flb_plg_error(ctx->ins, "cannot set converted_type '%s'", ctx->converted_type_str); + else if (act->type == CM_ACTION_CONVERT) { + if (act->converted_type == -1) { + flb_plg_error(ctx->ins, "converted_type is required for action"); return -1; } } @@ -210,7 +211,18 @@ struct content_modifier_ctx *cm_config_create(struct flb_processor_instance *ins { int ret; + int idx; struct content_modifier_ctx *ctx; + struct mk_list *head; + struct flb_config_map_val *entry; + struct cfl_kvlist *action; + struct cfl_variant *op; + struct cfl_variant *key; + struct cfl_variant *value; + struct cfl_variant *pattern; + struct cfl_variant *context; + struct cfl_variant *converted_type; + struct content_modifier_action *act; /* Create plugin instance context */ ctx = flb_calloc(1, sizeof(struct content_modifier_ctx)); @@ -227,50 +239,144 @@ struct content_modifier_ctx *cm_config_create(struct flb_processor_instance *ins return NULL; } - if (!ctx->action_str) { + if (!ctx->action_str && (ctx->actions_list == NULL || ctx->actions_list->entry_count == 0)) { flb_plg_error(ctx->ins, "no 'action' defined"); flb_free(ctx); return NULL; } - /* process the 'action' configuration */ - ret = set_action(ctx); - if (ret == -1) { - flb_free(ctx); - return NULL; - } + mk_list_init(&ctx->actions); - /* process the 'context' where the action will be applied */ - ret = set_context(ctx); - if (ret == -1) { - flb_free(ctx); - return NULL; - } + if (ctx->action_str) { + act = flb_calloc(1, sizeof(struct content_modifier_action)); + if (act == NULL) { + flb_plg_error(ctx->ins, "unable to allocate memory for action"); + flb_free(ctx); + return NULL; + } + + act->converted_type = -1; + + /* process the 'action' configuration */ + ret = action_set_action(ctx, act, ctx->action_str); + if (ret == -1) { + flb_free(act); + flb_free(ctx); + return NULL; + } - /* Pattern */ - if (ctx->pattern) { - ctx->regex = flb_regex_create(ctx->pattern); - if (!ctx->regex) { - flb_plg_error(ctx->ins, "invalid regex pattern '%s'", ctx->pattern); + /* process the 'context' where the action will be applied */ + ret = action_set_context(ctx, act, ctx->context_str); + if (ret == -1) { + flb_free(act); flb_free(ctx); return NULL; } + + /* Pattern */ + if (ctx->pattern) { + act->pattern = flb_regex_create(ctx->pattern); + if (!act->pattern) { + flb_plg_error(ctx->ins, "invalid regex pattern '%s'", ctx->pattern); + flb_free(ctx); + return NULL; + } + } + + if (ctx->key) { + act->key = flb_sds_create(ctx->key); + } + + if (ctx->value) { + act->key = flb_sds_create(ctx->value); + } + + if (ctx->converted_type_str) { + action_set_converted_type(ctx, act, ctx->converted_type_str); + } + + /* Certain actions needs extra configuration, e.g: insert -> requires a key and a value */ + ret = check_action_requirements(ctx, act); + if (ret == -1) { + flb_free(ctx); + flb_free(act); + return NULL; + } + + mk_list_add(&act->_head, &ctx->actions); } + else { + for (idx = 0; idx < ctx->actions_list->entry_count; idx++) { + action = ctx->actions_list->entries[idx]->data.as_kvlist; - /* Certain actions needs extra configuration, e.g: insert -> requires a key and a value */ - ret = check_action_requirements(ctx); - if (ret == -1) { - flb_free(ctx); - return NULL; + act = flb_calloc(1, sizeof(struct content_modifier_action)); + if (act == NULL) { + flb_free(ctx); + return NULL; + } + + op = cfl_kvlist_fetch(action, "action"); + if (op == NULL) { + flb_free(ctx); + flb_free(act); + return NULL; + } + action_set_action(ctx, act, op->data.as_string); + + context = cfl_kvlist_fetch(action, "context"); + + if (context) { + action_set_context(ctx, act, context->data.as_string); + } + else { + action_set_context(ctx, act, NULL); + } + + key = cfl_kvlist_fetch(action, "key"); + if (key) { + act->key = flb_sds_create(key->data.as_string); + } + + value = cfl_kvlist_fetch(action, "value"); + if (value) { + act->value = flb_sds_create(value->data.as_string); + } + + pattern = cfl_kvlist_fetch(action, "pattern"); + if (pattern) { + act->pattern = flb_regex_create(pattern->data.as_string); + if (!act->pattern) { + flb_plg_error(ctx->ins, "invalid regex pattern '%s'", pattern); + flb_free(ctx); + return NULL; + } + } + + converted_type = cfl_kvlist_fetch(action, "converted_type"); + if (converted_type) { + action_set_converted_type(ctx, act, converted_type->data.as_string); + } + + /* Certain actions needs extra configuration, e.g: insert -> requires a key and a value */ + ret = check_action_requirements(ctx, act); + if (ret == -1) { + flb_free(ctx); + flb_free(act); + return NULL; + } + + mk_list_add(&act->_head, &ctx->actions); + } } + return ctx; } void cm_config_destroy(struct content_modifier_ctx *ctx) { - if (ctx->regex) { - flb_regex_destroy(ctx->regex); - } + // if (ctx->pattern) { + // flb_regex_destroy(ctx->pattern); + // } flb_free(ctx); } diff --git a/plugins/processor_content_modifier/cm_logs.c b/plugins/processor_content_modifier/cm_logs.c index 2e5d2d61e66..7ad8d301591 100644 --- a/plugins/processor_content_modifier/cm_logs.c +++ b/plugins/processor_content_modifier/cm_logs.c @@ -550,48 +550,54 @@ int cm_logs_process(struct flb_processor_instance *ins, int ret = -1; struct flb_mp_chunk_record *record; struct cfl_object *obj = NULL; + struct mk_list *head; + struct content_modifier_action *act; /* Iterate records */ while ((ret = flb_mp_chunk_cobj_record_next(chunk_cobj, &record)) == FLB_MP_CHUNK_RECORD_OK) { - /* retrieve the target cfl object */ - if (ctx->context_type == CM_CONTEXT_LOG_METADATA) { - obj = record->cobj_metadata; - } - else if (ctx->context_type == CM_CONTEXT_LOG_BODY) { - obj = record->cobj_record; - } + mk_list_foreach(head, &ctx->actions) { + act = mk_list_entry(head, struct content_modifier_action, _head); - /* the operation on top of the data type is unsupported */ - if (obj->variant->type != CFL_VARIANT_KVLIST) { - cfl_object_destroy(obj); - return -1; - } + /* retrieve the target cfl object */ + if (act->context_type == CM_CONTEXT_LOG_METADATA) { + obj = record->cobj_metadata; + } + else if (act->context_type == CM_CONTEXT_LOG_BODY) { + obj = record->cobj_record; + } - /* process the action */ - if (ctx->action_type == CM_ACTION_INSERT) { - ret = run_action_insert(ctx, obj, tag, tag_len, ctx->key, ctx->value); - } - else if (ctx->action_type == CM_ACTION_UPSERT) { - ret = run_action_upsert(ctx, obj, tag, tag_len, ctx->key, ctx->value); - } - else if (ctx->action_type == CM_ACTION_DELETE) { - ret = run_action_delete(ctx, obj, tag, tag_len, ctx->key); - } - else if (ctx->action_type == CM_ACTION_RENAME) { - ret = run_action_rename(ctx, obj, tag, tag_len, ctx->key, ctx->value); - } - else if (ctx->action_type == CM_ACTION_HASH) { - ret = run_action_hash(ctx, obj, tag, tag_len, ctx->key); - } - else if (ctx->action_type == CM_ACTION_EXTRACT) { - ret = run_action_extract(ctx, obj, tag, tag_len, ctx->key, ctx->regex); - } - else if (ctx->action_type == CM_ACTION_CONVERT) { - ret = run_action_convert(ctx, obj, tag, tag_len, ctx->key, ctx->converted_type); - } + /* the operation on top of the data type is unsupported */ + if (obj->variant->type != CFL_VARIANT_KVLIST) { + cfl_object_destroy(obj); + return -1; + } - if (ret != 0) { - return FLB_PROCESSOR_FAILURE; + /* process the action */ + if (act->type == CM_ACTION_INSERT) { + ret = run_action_insert(ctx, obj, tag, tag_len, act->key, act->value); + } + else if (act->type == CM_ACTION_UPSERT) { + ret = run_action_upsert(ctx, obj, tag, tag_len, act->key, act->value); + } + else if (act->type == CM_ACTION_DELETE) { + ret = run_action_delete(ctx, obj, tag, tag_len, act->key); + } + else if (act->type == CM_ACTION_RENAME) { + ret = run_action_rename(ctx, obj, tag, tag_len, act->key, act->value); + } + else if (act->type == CM_ACTION_HASH) { + ret = run_action_hash(ctx, obj, tag, tag_len, act->key); + } + else if (act->type == CM_ACTION_EXTRACT) { + ret = run_action_extract(ctx, obj, tag, tag_len, act->key, act->pattern); + } + else if (act->type == CM_ACTION_CONVERT) { + ret = run_action_convert(ctx, obj, tag, tag_len, act->key, act->converted_type); + } + + if (ret != 0) { + return FLB_PROCESSOR_FAILURE; + } } } diff --git a/plugins/processor_content_modifier/cm_traces.c b/plugins/processor_content_modifier/cm_traces.c index 24783ba37fc..b97189845a1 100644 --- a/plugins/processor_content_modifier/cm_traces.c +++ b/plugins/processor_content_modifier/cm_traces.c @@ -666,32 +666,38 @@ int cm_traces_process(struct flb_processor_instance *ins, const char *tag, int tag_len) { int ret = -1; + struct mk_list *head; + struct content_modifier_action *act; - /* process the action */ - if (ctx->action_type == CM_ACTION_INSERT) { - ret = traces_insert_attributes(ctx, traces_context, ctx->key, ctx->value); - } - else if (ctx->action_type == CM_ACTION_UPSERT) { - ret = traces_upsert_attributes(ctx, traces_context, ctx->key, ctx->value); - } - else if (ctx->action_type == CM_ACTION_DELETE) { - ret = traces_delete_attributes(ctx, traces_context, ctx->key); - } - else if (ctx->action_type == CM_ACTION_RENAME) { - ret = traces_rename_attributes(ctx, traces_context, ctx->key, ctx->value); - } - else if (ctx->action_type == CM_ACTION_HASH) { - ret = traces_hash_attributes(ctx, traces_context, ctx->key); - } - else if (ctx->action_type == CM_ACTION_EXTRACT) { - ret = traces_extract_attributes(ctx, traces_context, ctx->key, ctx->regex); - } - else if (ctx->action_type == CM_ACTION_CONVERT) { - ret = traces_convert_attributes(ctx, traces_context, ctx->key, ctx->converted_type); - } + mk_list_foreach(head, &ctx->actions) { + act = mk_list_entry(head, struct content_modifier_action, _head); - if (ret != 0) { - return FLB_PROCESSOR_FAILURE; + /* process the action */ + if (act->type == CM_ACTION_INSERT) { + ret = traces_insert_attributes(ctx, traces_context, act->key, act->value); + } + else if (act->type == CM_ACTION_UPSERT) { + ret = traces_upsert_attributes(ctx, traces_context, act->key, act->value); + } + else if (act->type == CM_ACTION_DELETE) { + ret = traces_delete_attributes(ctx, traces_context, act->key); + } + else if (act->type == CM_ACTION_RENAME) { + ret = traces_rename_attributes(ctx, traces_context, act->key, act->value); + } + else if (act->type == CM_ACTION_HASH) { + ret = traces_hash_attributes(ctx, traces_context, act->key); + } + else if (act->type == CM_ACTION_EXTRACT) { + ret = traces_extract_attributes(ctx, traces_context, act->key, act->pattern); + } + else if (act->type == CM_ACTION_CONVERT) { + ret = traces_convert_attributes(ctx, traces_context, act->key, act->converted_type); + } + + if (ret != 0) { + return FLB_PROCESSOR_FAILURE; + } } return FLB_PROCESSOR_SUCCESS; diff --git a/src/config_format/flb_cf_yaml.c b/src/config_format/flb_cf_yaml.c index 6581692a422..f07aa3fc5bb 100644 --- a/src/config_format/flb_cf_yaml.c +++ b/src/config_format/flb_cf_yaml.c @@ -122,6 +122,8 @@ enum state { STATE_PLUGIN_KEY, STATE_PLUGIN_VAL, STATE_PLUGIN_VAL_LIST, + STATE_PLUGIN_VAL_LIST_MAP_KEY, + STATE_PLUGIN_VAL_LIST_MAP_VAL, STATE_GROUP_KEY, STATE_GROUP_VAL, @@ -181,6 +183,7 @@ static struct parser_state *state_push_key(struct local_ctx *, enum state, const char *key); static int state_create_section(struct flb_cf *, struct parser_state *, char *); static int state_create_group(struct flb_cf *, struct parser_state *, char *); +static struct cfl_kvlist *state_grab_keyvals(struct parser_state *state); static struct parser_state *state_pop(struct local_ctx *ctx); static struct parser_state *state_create(struct file_state *parent, struct file_state *file); static void state_destroy(struct parser_state *s); @@ -228,6 +231,10 @@ static char *state_str(enum state val) return "plugin-value"; case STATE_PLUGIN_VAL_LIST: return "plugin-values"; + case STATE_PLUGIN_VAL_LIST_MAP_KEY: + return "plugin-values-map-key"; + case STATE_PLUGIN_VAL_LIST_MAP_VAL: + return "plugin-values-map-val"; case STATE_GROUP_KEY: return "group-key"; case STATE_GROUP_VAL: @@ -654,31 +661,18 @@ static enum status state_copy_into_config_group(struct parser_state *state, stru return YAML_FAILURE; } - for (idx = 0; idx < kvp->val->data.as_array->entry_count; idx++) { - var = cfl_array_fetch_by_index(kvp->val->data.as_array, idx); + // for (idx = 0; idx < kvp->val->data.as_array->entry_count; idx++) { + while(kvp->val->data.as_array->entry_count > 0) { + var = cfl_array_fetch_by_index(kvp->val->data.as_array, 0); if (var == NULL) { cfl_array_destroy(arr); flb_error("unable to fetch from array by index"); return YAML_FAILURE; } - switch (var->type) { - case CFL_VARIANT_STRING: - - if (cfl_array_append_string(carr, var->data.as_string) < 0) { - flb_error("unable to append string"); - cfl_kvlist_destroy(copy); - cfl_array_destroy(carr); - return YAML_FAILURE; - } - break; - default: - cfl_array_destroy(arr); - flb_error("unable to copy value for property"); - cfl_kvlist_destroy(copy); - cfl_array_destroy(carr); - return YAML_FAILURE; - } + cfl_array_append(carr, var); + kvp->val->data.as_array->entries[0] = NULL; + cfl_array_remove_by_index(kvp->val->data.as_array, 0); } if (cfl_kvlist_insert_array(copy, kvp->key, carr) < 0) { @@ -771,6 +765,7 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, int ret; char *value; struct flb_kv *keyval; + struct cfl_kvlist *kvlist; char *last_included; last_included = state_get_last(ctx); @@ -1509,6 +1504,116 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, return YAML_FAILURE; } break; + case YAML_MAPPING_START_EVENT: + state = state_push_withvals(ctx, state, STATE_PLUGIN_VAL_LIST_MAP_KEY); + + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + break; + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + + case STATE_PLUGIN_VAL_LIST_MAP_KEY: + switch(event->type) { + case YAML_MAPPING_END_EVENT: + kvlist = state->keyvals; + if (kvlist == NULL) { + flb_error("no map values"); + return YAML_FAILURE; + } + + state = state_pop(ctx); + + if (cfl_array_append_kvlist(state->values, kvlist) < 0) { + flb_error("unable to add list to values"); + return YAML_FAILURE; + } + + flb_error("[keyval] stuff %p into %p", kvlist, state->values); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + break; + case YAML_SCALAR_EVENT: + if (state->keyvals == NULL) { + flb_error("unable to add key to list map"); + return YAML_FAILURE; + } + + state = state_push_key(ctx, STATE_PLUGIN_VAL_LIST_MAP_VAL, + (char *)event->data.scalar.value); + + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + + case STATE_PLUGIN_VAL_LIST_MAP_VAL: + switch(event->type) { + /* + case YAML_MAPPING_END_EVENT: + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + + if (state->state != STATE_PLUGIN_VAL_LIST_MAP_KEY) { + flb_error("incorrect previous state"); + return YAML_FAILURE; + } + + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + + if (state->state != STATE_PLUGIN_VAL_LIST) { + flb_error("incorrect previous state"); + return YAML_FAILURE; + } + + break; + */ + case YAML_SCALAR_EVENT: + if (state->keyvals == NULL) { + flb_error("unable to add value to list map"); + return YAML_FAILURE; + } + + /* register key/value pair as a property */ + ret = cfl_kvlist_insert_string(state->keyvals, state->key, + (char *)event->data.scalar.value); + if (ret < 0) { + flb_error("unable to insert string"); + return YAML_FAILURE; + } + + state = state_pop(ctx); + + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + break; + default: yaml_error_event(ctx, state, event); return YAML_FAILURE; @@ -1845,6 +1950,23 @@ static int state_create_group(struct flb_cf *conf, struct parser_state *state, c return YAML_SUCCESS; } +static struct cfl_kvlist *state_grab_keyvals(struct parser_state *state) +{ + struct cfl_kvlist *kvlist; + + if (state->allocation_flags & HAS_KEYVALS) { + flb_info("[keyval] grab: %p", state->keyvals); + kvlist = state->keyvals; + state->allocation_flags &= ~HAS_KEYVALS; + state->keyvals = NULL; + + return kvlist; + } + + flb_error("[keyvals] no keyvals!"); + return NULL; +} + static struct parser_state *state_pop(struct local_ctx *ctx) { struct parser_state *last; @@ -1864,8 +1986,9 @@ static struct parser_state *state_pop(struct local_ctx *ctx) flb_sds_destroy(last->key); } - if (last->allocation_flags & HAS_KEYVALS) { - cfl_kvlist_destroy(last->keyvals); + if (last->keyvals && last->allocation_flags & HAS_KEYVALS) { + flb_error("[keyval] free: %p", last->keyvals); + // cfl_kvlist_destroy(last->keyvals); } state_destroy(last); diff --git a/src/flb_config.c b/src/flb_config.c index 54249fa8d33..8504e043e4a 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -801,35 +801,53 @@ static int configure_plugins_type(struct flb_config *config, struct flb_cf *cf, if (type == FLB_CF_CUSTOM) { if (kv->val->type == CFL_VARIANT_STRING) { ret = flb_custom_set_property(ins, kv->key, kv->val->data.as_string); - } else if (kv->val->type == CFL_VARIANT_ARRAY) { + } + else if (kv->val->type == CFL_VARIANT_ARRAY) { for (i = 0; i < kv->val->data.as_array->entry_count; i++) { val = kv->val->data.as_array->entries[i]; ret = flb_custom_set_property(ins, kv->key, val->data.as_string); } } + else { + flb_error("[config] unable to set type: %d", kv->val->type); + ret = -1; + break; + } } else if (type == FLB_CF_INPUT) { - if (kv->val->type == CFL_VARIANT_STRING) { + if (kv->val->type == CFL_VARIANT_STRING) { ret = flb_input_set_property(ins, kv->key, kv->val->data.as_string); - } else if (kv->val->type == CFL_VARIANT_ARRAY) { + } + else if (kv->val->type == CFL_VARIANT_ARRAY) { for (i = 0; i < kv->val->data.as_array->entry_count; i++) { val = kv->val->data.as_array->entries[i]; ret = flb_input_set_property(ins, kv->key, val->data.as_string); } } + else { + flb_error("[config] unable to set type: %d", kv->val->type); + ret = -1; + break; + } } else if (type == FLB_CF_FILTER) { - if (kv->val->type == CFL_VARIANT_STRING) { + if (kv->val->type == CFL_VARIANT_STRING) { ret = flb_filter_set_property(ins, kv->key, kv->val->data.as_string); - } else if (kv->val->type == CFL_VARIANT_ARRAY) { + } + else if (kv->val->type == CFL_VARIANT_ARRAY) { for (i = 0; i < kv->val->data.as_array->entry_count; i++) { val = kv->val->data.as_array->entries[i]; ret = flb_filter_set_property(ins, kv->key, val->data.as_string); } } + else { + flb_error("[config] unable to set type: %d", kv->val->type); + ret = -1; + break; + } } else if (type == FLB_CF_OUTPUT) { - if (kv->val->type == CFL_VARIANT_STRING) { + if (kv->val->type == CFL_VARIANT_STRING) { ret = flb_output_set_property(ins, kv->key, kv->val->data.as_string); } else if (kv->val->type == CFL_VARIANT_ARRAY) { for (i = 0; i < kv->val->data.as_array->entry_count; i++) { @@ -837,6 +855,11 @@ static int configure_plugins_type(struct flb_config *config, struct flb_cf *cf, ret = flb_output_set_property(ins, kv->key, val->data.as_string); } } + else { + flb_error("[config] unable to set type: %d", kv->val->type); + ret = -1; + break; + } } if (ret == -1) { diff --git a/src/flb_config_map.c b/src/flb_config_map.c index 463f542889c..9ae095bccbd 100644 --- a/src/flb_config_map.c +++ b/src/flb_config_map.c @@ -810,6 +810,311 @@ int flb_config_map_set(struct mk_list *properties, struct mk_list *map, void *co m_list = (struct mk_list **) (base + m->offset); *m_list = m->value.val.list; } + else { + flb_error("[config map#2] unknown type: %d", m->type); + return -1; + } + } + } + + return 0; +} + +/* + * Function used by plugins that needs to populate their context structure with the + * configuration properties already mapped. + */ +int flb_config_map_set_from_kvlist(struct cfl_kvlist *properties, struct mk_list *map, void *context) +{ + int ret; + int len; + char *base; + char *m_bool; + int *m_i_num; + double *m_d_num; + size_t *m_s_num; + flb_sds_t *m_str; + struct mk_list *head; + struct mk_list *m_head; + struct mk_list **m_list; + struct mk_list *list; + struct cfl_array **m_array; + struct cfl_kvlist **m_kvlist; + struct flb_config_map *m = NULL; + struct flb_config_map_val *entry = NULL; + struct cfl_list *chead = NULL; + struct cfl_kvpair *pair = NULL; + + base = context; + + /* Link 'already processed default values' into the caller context */ + mk_list_foreach(m_head, map) { + m = mk_list_entry(m_head, struct flb_config_map, _head); + + /* + * If the map type allows multiple entries, the user context is a pointer + * for a linked list. We just point their structure to our pre-processed + * list of entries. + */ + if (m->flags & FLB_CONFIG_MAP_MULT && m->set_property == FLB_TRUE) { + m_list = (struct mk_list **) (base + m->offset); + *m_list = m->value.mult; + continue; + } + + /* + * If no default value exists or the map will not write to the user + * context.. skip it. + */ + if (!m->def_value || m->set_property == FLB_FALSE) { + continue; + } + + /* + * If a property set by the user will override the default value, just + * do not put the default value into the context since it will be replaced + * later. + */ + // ret = properties_override_default(properties, m->name); + // if (ret == FLB_TRUE) { + // continue; + // } + + /* All the following steps are direct writes to the user context */ + if (m->type == FLB_CONFIG_MAP_STR) { + m_str = (char **) (base + m->offset); + *m_str = m->value.val.str; + } + else if (m->type == FLB_CONFIG_MAP_INT) { + m_i_num = (int *) (base + m->offset); + *m_i_num = m->value.val.i_num; + } + else if (m->type == FLB_CONFIG_MAP_DOUBLE) { + m_d_num = (double *) (base + m->offset); + *m_d_num = m->value.val.d_num; + } + else if (m->type == FLB_CONFIG_MAP_SIZE) { + m_s_num = (size_t *) (base + m->offset); + *m_s_num = m->value.val.s_num; + } + else if (m->type == FLB_CONFIG_MAP_TIME) { + m_i_num = (int *) (base + m->offset); + *m_i_num = m->value.val.s_num; + } + else if (m->type == FLB_CONFIG_MAP_BOOL) { + m_bool = (char *) (base + m->offset); + *m_bool = m->value.val.boolean; + } + else if (m->type >= FLB_CONFIG_MAP_CLIST || + m->type <= FLB_CONFIG_MAP_SLIST_4) { + m_list = (struct mk_list **) (base + m->offset); + *m_list = m->value.val.list; + } + else { + flb_error("[config map#0] unknown type: %d", m->type); + return -1; + } + } + + /* + * Iterate all properties coming from the configuration reader. If a property overrides + * a default value already set in the previous step, just link to the new value. + */ + cfl_list_foreach(chead, &properties->list) { + pair = cfl_list_entry(chead, struct cfl_kvpair, _head); + + if (pair == NULL || pair->key == NULL || pair->val == NULL) { + continue; + } + + mk_list_foreach(m_head, map) { + m = mk_list_entry(m_head, struct flb_config_map, _head); + if (flb_sds_len(pair->key) != flb_sds_len(m->name)) { + m = NULL; + continue; + } + + if (strncasecmp(pair->key, m->name, flb_sds_len(m->name)) == 0) { + break; + } + m = NULL; + continue; + } + + if (!m || m->set_property == FLB_FALSE) { + continue; + } + + /* Check if the map allows multiple entries */ + if (m->flags & FLB_CONFIG_MAP_MULT) { + /* Create node */ + entry = flb_calloc(1, sizeof(struct flb_config_map_val)); + if (!entry) { + flb_errno(); + return -1; + } + + entry->type = m->type; + + /* Populate value */ + if (m->type == FLB_CONFIG_MAP_STR) { + if (pair->val->type != CFL_VARIANT_STRING) { + flb_error("[config map] non-matching type: %s", pair->key); + return -1; + } + entry->val.str = flb_sds_create(pair->val->data.as_string); + } + else if (m->type == FLB_CONFIG_MAP_INT) { + switch (pair->val->type) { + case CFL_VARIANT_STRING: + entry->val.i_num = atoi(pair->val->data.as_string); + break; + case CFL_VARIANT_INT: + entry->val.i_num = pair->val->data.as_int64; + break; + default: + flb_error("[config map] non-matching type: %s", pair->key); + return -1; + } + } + else if (m->type == FLB_CONFIG_MAP_DOUBLE) { + switch (pair->val->type) { + case CFL_VARIANT_STRING: + entry->val.i_num = atoi(pair->val->data.as_string); + break; + case CFL_VARIANT_INT: + entry->val.d_num = (double)pair->val->data.as_int64; + break; + case CFL_VARIANT_DOUBLE: + entry->val.d_num = pair->val->data.as_double; + break; + default: + flb_error("[config map] non-matching type: %s", pair->key); + return -1; + } + } + else if (m->type == FLB_CONFIG_MAP_SIZE) { + entry->val.s_num = flb_utils_size_to_bytes(pair->val->data.as_string); + } + else if (m->type == FLB_CONFIG_MAP_TIME) { + entry->val.i_num = flb_utils_time_to_seconds(pair->val->data.as_string); + } + else if (m->type == FLB_CONFIG_MAP_BOOL) { + ret = flb_utils_bool(pair->val->data.as_string); + if (ret == -1) { + flb_free(entry); + flb_error("[config map] invalid value for boolean property '%s=%s'", + m->name, pair->val->data.as_string); + return -1; + } + entry->val.boolean = ret; + } + else if (m->type >= FLB_CONFIG_MAP_CLIST && + m->type <= FLB_CONFIG_MAP_SLIST_4) { + + list = parse_string_map_to_list(m, pair->val->data.as_string); + if (!list) { + flb_error("[config map] cannot parse list of values '%s'", + pair->val->data.as_string); + flb_free(entry); + return -1; + } + entry->val.list = list; + + /* Validate the number of entries are the minimum expected */ + len = mk_list_size(list); + ret = check_list_size(list, m->type); + if (ret == -1) { + flb_error("[config map] property '%s' expects %i values " + "(only %i were found)", + pair->val->data.as_string, + flb_config_map_expected_values(m->type), len); + /* + * Register the entry anyways, so on exit the resources will + * be released + */ + mk_list_add(&entry->_head, m->value.mult); + return -1; + } + } + else { + flb_error("[config map#1] unknown type: %d", m->type); + return -1; + } + + /* Add entry to the map 'mult' list tail */ + mk_list_add(&entry->_head, m->value.mult); + + /* Override user context */ + m_list = (struct mk_list **) (base + m->offset); + *m_list = m->value.mult; + } + else if (map != NULL) { + /* Direct write to user context */ + if (m->type == FLB_CONFIG_MAP_STR) { + if (pair->val->type != CFL_VARIANT_STRING) { + flb_error("[config map] unmatched type: %s", pair->key); + return -1; + } + m_str = (char **) (base + m->offset); + *m_str = pair->val->data.as_string; + } + else if (m->type == FLB_CONFIG_MAP_INT) { + m_i_num = (int *) (base + m->offset); + *m_i_num = atoi(pair->val->data.as_string); + } + else if (m->type == FLB_CONFIG_MAP_DOUBLE) { + m_d_num = (double *) (base + m->offset); + *m_d_num = atof(pair->val->data.as_string); + } + else if (m->type == FLB_CONFIG_MAP_BOOL) { + m_bool = (char *) (base + m->offset); + ret = flb_utils_bool(pair->val->data.as_string); + if (ret == -1) { + flb_error("[config map] invalid value for boolean property '%s=%s'", + m->name, pair->val->data.as_string); + return -1; + } + *m_bool = ret; + } + else if (m->type == FLB_CONFIG_MAP_SIZE) { + m_s_num = (size_t *) (base + m->offset); + *m_s_num = flb_utils_size_to_bytes(pair->val->data.as_string); + } + else if (m->type == FLB_CONFIG_MAP_TIME) { + m_i_num = (int *) (base + m->offset); + *m_i_num = flb_utils_time_to_seconds(pair->val->data.as_string); + } + else if (m->type >= FLB_CONFIG_MAP_CLIST && + m->type <= FLB_CONFIG_MAP_SLIST_4) { + list = parse_string_map_to_list(m, pair->val->data.as_string); + if (!list) { + flb_error("[config map] cannot parse list of values '%s'", pair->val->data.as_string); + flb_free(entry); + return -1; + } + + if (m->value.val.list) { + destroy_map_val(m->type, &m->value); + } + + m->value.val.list = list; + m_list = (struct mk_list **) (base + m->offset); + *m_list = m->value.val.list; + } + else if (m->type == FLB_CONFIG_MAP_KVLIST) { + m_kvlist = (struct cfl_kvlist **) (base + m->offset); + *m_kvlist = m->value.val.kvlist; + } + else if (m->type == FLB_CONFIG_MAP_ARRAY) { + m->value.val.array = pair->val->data.as_array; + m_array = (struct cfl_array **) (base + m->offset); + *m_array = m->value.val.array; + } + else { + flb_error("[config map#2] unknown type: %d", m->type); + return -1; + } } } diff --git a/src/flb_processor.c b/src/flb_processor.c index a7a6e2b20d9..f9e7e3edb12 100644 --- a/src/flb_processor.c +++ b/src/flb_processor.c @@ -300,7 +300,7 @@ int flb_processor_unit_set_property(struct flb_processor_unit *pu, const char *k return flb_processor_instance_set_property( (struct flb_processor_instance *) pu->ctx, - k, v->data.as_string); + k, v); } void flb_processor_unit_destroy(struct flb_processor_unit *pu) @@ -840,13 +840,6 @@ int flb_processors_load_from_config_format_group(struct flb_processor *proc, str return 0; } - - - - - - - static inline int prop_key_check(const char *key, const char *kv, int k_len) { int len; @@ -861,18 +854,18 @@ static inline int prop_key_check(const char *key, const char *kv, int k_len) } int flb_processor_instance_set_property(struct flb_processor_instance *ins, - const char *k, const char *v) + const char *k, struct cfl_variant *v) { int len; int ret; - flb_sds_t tmp; - struct flb_kv *kv; + flb_sds_t tmp = NULL; len = strlen(k); - tmp = flb_env_var_translate(ins->config->env, v); - - if (!tmp) { - return -1; + if (v->type == CFL_VARIANT_STRING) { + tmp = flb_env_var_translate(ins->config->env, v->data.as_string); + if (!tmp) { + return -1; + } } if (prop_key_check("alias", k, len) == 0 && tmp) { @@ -888,30 +881,37 @@ int flb_processor_instance_set_property(struct flb_processor_instance *ins, ins->log_level = ret; } else { - /* - * Create the property, we don't pass the value since we will - * map it directly to avoid an extra memory allocation. - */ - kv = flb_kv_item_create(&ins->properties, (char *) k, NULL); - - if (!kv) { - - if (tmp) { - flb_sds_destroy(tmp); - } - return -1; + if (v->type == CFL_VARIANT_STRING) { + cfl_kvlist_insert_string(ins->properties, k, tmp); + } + else { + cfl_kvlist_insert(ins->properties, k, v); } - kv->val = tmp; } return 0; } +int flb_processor_instance_set_property_variant(struct flb_processor_instance *ins, + struct cfl_kvpair *pair) +{ + return cfl_kvlist_insert(ins->properties, pair->key, pair->val); +} + const char *flb_processor_instance_get_property( const char *key, struct flb_processor_instance *ins) { - return flb_kv_get_key_value(key, &ins->properties); + struct cfl_variant *var; + + var = cfl_kvlist_fetch(ins->properties, key); + if (var == NULL) { + return NULL; + } + if (var->type == CFL_VARIANT_STRING) { + return var->data.as_string; + } + return NULL; } struct flb_processor_instance *flb_processor_instance_create(struct flb_config *config, @@ -961,7 +961,7 @@ struct flb_processor_instance *flb_processor_instance_create(struct flb_config * instance->data = data; instance->log_level = -1; - mk_list_init(&instance->properties); + instance->properties = cfl_kvlist_create(); instance->log_encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT); if (instance->log_encoder == NULL) { @@ -1005,7 +1005,6 @@ int flb_processor_instance_check_properties( struct flb_processor_instance *ins, struct flb_config *config) { - int ret = 0; struct mk_list *config_map; struct flb_processor_plugin *p = ins->p; @@ -1024,18 +1023,18 @@ int flb_processor_instance_check_properties( ins->config_map = config_map; /* Validate incoming properties against config map */ - ret = flb_config_map_properties_check(ins->p->name, - &ins->properties, - ins->config_map); - - if (ret == -1) { - - if (config->program_name) { - flb_helper("try the command: %s -F %s -h\n", - config->program_name, ins->p->name); - } - return -1; - } + // ret = flb_config_map_properties_check_kvlist(ins->p->name, + // ins->properties, + // ins->config_map); + + // if (ret == -1) { + + // if (config->program_name) { + // flb_helper("try the command: %s -F %s -h\n", + // config->program_name, ins->p->name); + // } + // return -1; + // } } return 0; @@ -1113,7 +1112,7 @@ void flb_processor_instance_destroy(struct flb_processor_instance *ins) } /* release properties */ - flb_kv_release(&ins->properties); + cfl_kvlist_destroy(ins->properties); /* Remove metrics */ #ifdef FLB_HAVE_METRICS