diff --git a/include/fluent-bit/flb_processor.h b/include/fluent-bit/flb_processor.h index 20d4a605e16..591604fbed4 100644 --- a/include/fluent-bit/flb_processor.h +++ b/include/fluent-bit/flb_processor.h @@ -207,7 +207,7 @@ struct flb_processor_unit *flb_processor_unit_create(struct flb_processor *proc, int event_type, char *unit_name); void flb_processor_unit_destroy(struct flb_processor_unit *pu); -int flb_processor_unit_set_property(struct flb_processor_unit *pu, const char *k, const char *v); +int flb_processor_unit_set_property(struct flb_processor_unit *pu, const char *k, struct cfl_variant *v); int flb_processors_load_from_config_format_group(struct flb_processor *proc, struct flb_cf_group *g); diff --git a/src/config_format/flb_cf_yaml.c b/src/config_format/flb_cf_yaml.c index 172d700d8e8..f8807704644 100644 --- a/src/config_format/flb_cf_yaml.c +++ b/src/config_format/flb_cf_yaml.c @@ -56,9 +56,48 @@ enum section { SECTION_INPUT, SECTION_FILTER, SECTION_OUTPUT, + SECTION_PROCESSOR, SECTION_OTHER, }; +static char *section_names[] = { + "env", + "include", + "service", + "pipeline", + "custom", + "input", + "filter", + "output", + "processor", + "other" +}; + +struct file_state { + /* file */ + flb_sds_t name; /* file name */ + flb_sds_t path; /* file root path */ + + /* parent file state */ + struct file_state *parent; +}; + +struct local_ctx { + int level; /* inclusion level */ + + struct cfl_list states; + + struct mk_list includes; + + int service_set; +}; + +/* yaml_* functions return 1 on success and 0 on failure. */ +enum status { + YAML_SUCCESS = 1, + YAML_FAILURE = 0 +}; + enum state { STATE_START, /* start state */ STATE_STREAM, /* start/end stream */ @@ -72,19 +111,14 @@ enum state { STATE_INCLUDE, /* 'includes' section */ STATE_OTHER, /* any other unknown section */ + STATE_CUSTOM, /* custom plugins */ STATE_PIPELINE, /* pipeline groups customs inputs, filters and outputs */ STATE_PLUGIN_INPUT, /* input plugins section */ STATE_PLUGIN_FILTER, /* filter plugins section */ STATE_PLUGIN_OUTPUT, /* output plugins section */ - STATE_CUSTOM, /* custom plugins */ - STATE_CUSTOM_SEQUENCE, - STATE_CUSTOM_KEY_VALUE_PAIR, - STATE_CUSTOM_KEY, - STATE_CUSTOM_VAL, - - STATE_PLUGIN_TYPE, + STATE_PLUGIN_START, STATE_PLUGIN_KEY, STATE_PLUGIN_VAL, STATE_PLUGIN_VAL_LIST, @@ -92,15 +126,8 @@ enum state { STATE_GROUP_KEY, STATE_GROUP_VAL, + STATE_INPUT_PROCESSORS, STATE_INPUT_PROCESSOR, - STATE_INPUT_PROCESSOR_LOGS_KEY, - STATE_INPUT_PROCESSOR_LOGS_VAL, - - STATE_INPUT_PROCESSOR_METRICS_KEY, - STATE_INPUT_PROCESSOR_METRICS_VAL, - - STATE_INPUT_PROCESSOR_TRACES_KEY, - STATE_INPUT_PROCESSOR_TRACES_VAL, /* environment variables */ STATE_ENV, @@ -109,71 +136,135 @@ enum state { STATE_STOP /* end state */ }; +/* parser state allocation flags */ +#define HAS_KEY (1 << 0) +#define HAS_KEYVALS (1 << 1) + struct parser_state { /* tokens state */ enum state state; + /* nesting level */ + int level; /* active section (if any) */ enum section section; - /* temporary key value pair */ - flb_sds_t key; - flb_sds_t val; - /* active section */ struct flb_cf_section *cf_section; - struct cfl_array *values; /* pointer to current values in a list. */ - /* active group */ struct flb_cf_group *cf_group; - /* active processor group: logs, metrics or traces */ - struct cfl_kvlist *cf_processor_kv; - struct cfl_array *cf_processor_type_array; - struct cfl_kvlist *cf_processor_type_list; + /* key value */ + flb_sds_t key; + /* section key/value list */ + struct cfl_kvlist *keyvals; + /* pointer to current values in a list. */ + struct cfl_array *values; + /* are we the owner of the values? */ + int allocation_flags; - /* file */ - flb_sds_t file; /* file name */ - flb_sds_t root_path; /* file root path */ + struct file_state *file; - /* caller file */ - flb_sds_t caller_file; /* caller file name */ - flb_sds_t caller_root_path; /* caller file root path */ + struct cfl_list _head; }; -struct local_ctx { - int level; /* inclusion level */ - - struct mk_list includes; - - int service_set; -}; +static struct parser_state *state_push(struct local_ctx *, enum state); +static struct parser_state *state_push_withvals(struct local_ctx *, + struct parser_state *, + enum state); +static struct parser_state *state_push_witharr(struct local_ctx *, + struct parser_state *, + enum state); +static struct parser_state *state_push_section(struct local_ctx *, enum state, + enum section); +static struct parser_state *state_push_key(struct local_ctx *, enum state, + const char *key); +static int state_create_section(struct flb_cf *, struct parser_state *, char *); +static int state_create_group(struct flb_cf *, struct parser_state *, char *); +static struct parser_state *state_pop(struct local_ctx *ctx); +static struct parser_state *state_create(struct file_state *parent, struct file_state *file); +static void state_destroy(struct parser_state *s); -/* yaml_* functions return 1 on success and 0 on failure. */ -enum status { - YAML_SUCCESS = 1, - YAML_FAILURE = 0 -}; static int read_config(struct flb_cf *cf, struct local_ctx *ctx, - char *caller_file, char *cfg_file); + struct file_state *parent, char *cfg_file); + +static char *state_str(enum state val) +{ + switch (val) { + case STATE_START: + return "start"; + case STATE_STREAM: + return "stream"; + case STATE_DOCUMENT: + return "document"; + case STATE_SECTION: + return "section"; + case STATE_SECTION_KEY: + return "section-key"; + case STATE_SECTION_VAL: + return "section-value"; + case STATE_SERVICE: + return "service"; + case STATE_INCLUDE: + return "include"; + case STATE_OTHER: + return "other"; + case STATE_CUSTOM: + return "custom"; + case STATE_PIPELINE: + return "pipeline"; + case STATE_PLUGIN_INPUT: + return "input"; + case STATE_PLUGIN_FILTER: + return "filter"; + case STATE_PLUGIN_OUTPUT: + return "output"; + case STATE_PLUGIN_START: + return "plugin-start"; + case STATE_PLUGIN_KEY: + return "plugin-key"; + case STATE_PLUGIN_VAL: + return "plugin-value"; + case STATE_PLUGIN_VAL_LIST: + return "plugin-values"; + case STATE_GROUP_KEY: + return "group-key"; + case STATE_GROUP_VAL: + return "group-val"; + case STATE_INPUT_PROCESSORS: + return "processors"; + case STATE_INPUT_PROCESSOR: + return "processor"; + case STATE_ENV: + return "env"; + case STATE_STOP: + return "stop"; + default: + return "unknown"; + } +} -static int add_section_type(struct flb_cf *cf, struct parser_state *s) +static int add_section_type(struct flb_cf *conf, struct parser_state *state) { - if (s->section == SECTION_INPUT) { - s->cf_section = flb_cf_section_create(cf, "INPUT", 0); + if (conf == NULL || state == NULL) { + return -1; } - else if (s->section == SECTION_FILTER) { - s->cf_section = flb_cf_section_create(cf, "FILTER", 0); + + if (state->section == SECTION_INPUT) { + state->cf_section = flb_cf_section_create(conf, "INPUT", 0); + } + else if (state->section == SECTION_FILTER) { + state->cf_section = flb_cf_section_create(conf, "FILTER", 0); } - else if (s->section == SECTION_OUTPUT) { - s->cf_section = flb_cf_section_create(cf, "OUTPUT", 0); + else if (state->section == SECTION_OUTPUT) { + state->cf_section = flb_cf_section_create(conf, "OUTPUT", 0); } - else if (s->section == SECTION_CUSTOM) { - s->cf_section = flb_cf_section_create(cf, "customs", 0); + else if (state->section == SECTION_CUSTOM) { + state->cf_section = flb_cf_section_create(conf, "customs", 0); } - if (!s->cf_section) { + if (!state->cf_section) { return -1; } @@ -211,164 +302,80 @@ static char *event_type_str(yaml_event_t *event) } } -static char *state_str(enum state val) +static char *state_get_last(struct local_ctx *ctx) { - char* ret; - switch(val) { - case STATE_START: - ret = "start"; - break; - case STATE_STREAM: - ret = "stream"; - break; - case STATE_DOCUMENT: - ret = "document"; - break; - case STATE_SECTION: - ret = "section"; - break; - case STATE_SECTION_KEY: - ret = "section-key"; - break; - case STATE_SECTION_VAL: - ret = "section-val"; - break; - case STATE_SERVICE: - ret = "service"; - break; - case STATE_INCLUDE: - ret = "include"; - break; - case STATE_OTHER: - ret = "other"; - break; - case STATE_PIPELINE: - ret = "pipeline"; - break; - case STATE_PLUGIN_INPUT: - ret = "plugin-input"; - break; - case STATE_PLUGIN_FILTER: - ret = "plugin-filter"; - break; - case STATE_PLUGIN_OUTPUT: - ret = "plugin-output"; - break; - case STATE_CUSTOM: - ret = "custom"; - break; - case STATE_CUSTOM_SEQUENCE: - ret = "custom-sequence"; - break; - case STATE_CUSTOM_KEY_VALUE_PAIR: - ret = "custom-key-value-pair"; - break; - case STATE_CUSTOM_KEY: - ret = "custom-key"; - break; - case STATE_CUSTOM_VAL: - ret = "custom-val"; - break; - case STATE_PLUGIN_TYPE: - ret = "plugin-type"; - break; - case STATE_PLUGIN_KEY: - ret = "plugin-key"; - break; - case STATE_PLUGIN_VAL: - ret = "plugin-val"; - break; - case STATE_PLUGIN_VAL_LIST: - ret = "plugin-val-list"; - break; - case STATE_GROUP_KEY: - ret = "group-key"; - break; - case STATE_GROUP_VAL: - ret = "group-val"; - break; - case STATE_INPUT_PROCESSOR: - ret = "input-processor"; - break; - case STATE_INPUT_PROCESSOR_LOGS_KEY: - ret = "input-processor-logs-key"; - break; - case STATE_INPUT_PROCESSOR_LOGS_VAL: - ret = "input-processor-logs-val"; - break; - case STATE_INPUT_PROCESSOR_METRICS_KEY: - ret = "input-processor-metrics-key"; - break; - case STATE_INPUT_PROCESSOR_METRICS_VAL: - ret = "input-processor-metrics-val"; - break; - case STATE_INPUT_PROCESSOR_TRACES_KEY: - ret = "input-processor-traces-key"; - break; - case STATE_INPUT_PROCESSOR_TRACES_VAL: - ret = "input-processor-traces-val"; - break; - case STATE_ENV: - ret = "env"; - break; - case STATE_STOP: - ret = "stop"; - break; + struct flb_slist_entry *entry; - default: - ret = "UNKNOWN"; + entry = mk_list_entry_last(&ctx->includes, struct flb_slist_entry, _head); + + if (entry == NULL) { + return NULL; } - return ret; + return entry->str; } -static char *get_last_included_file(struct local_ctx *ctx) +static void yaml_error_event(struct local_ctx *ctx, struct parser_state *state, + yaml_event_t *event) { - struct flb_slist_entry *e; + struct flb_slist_entry *entry; - e = mk_list_entry_last(&ctx->includes, struct flb_slist_entry, _head); - return e->str; -} + if (event == NULL) { + flb_error("[config] YAML error found but with no state or event"); + return; + } -static void yaml_error_event(struct local_ctx *ctx, struct parser_state *s, - yaml_event_t *event) -{ - struct flb_slist_entry *e; + if (state == NULL) { + flb_error("[config] YAML error found but with no state, line %zu, column %zu: " + "unexpected event '%s' (%d).", + event->start_mark.line + 1, event->start_mark.column, + event_type_str(event), event->type); + return; + } - e = mk_list_entry_last(&ctx->includes, struct flb_slist_entry, _head); + entry = mk_list_entry_last(&ctx->includes, struct flb_slist_entry, _head); + + if (entry == NULL) { + flb_error("[config] YAML error found (no file info), line %zu, column %zu: " + "unexpected event '%s' (%d) in state '%s' (%d).", + event->start_mark.line + 1, event->start_mark.column, + event_type_str(event), event->type, state_str(state->state), state->state); + return; + } flb_error("[config] YAML error found in file \"%s\", line %zu, column %zu: " "unexpected event '%s' (%d) in state '%s' (%d).", - e->str, event->start_mark.line + 1, event->start_mark.column, - event_type_str(event), event->type, state_str(s->state), s->state); + entry->str, event->start_mark.line + 1, event->start_mark.column, + event_type_str(event), event->type, state_str(state->state), state->state); } -static void yaml_error_definition(struct local_ctx *ctx, struct parser_state *s, +static void yaml_error_definition(struct local_ctx *ctx, struct parser_state *state, yaml_event_t *event, char *value) { flb_error("[config] YAML error found in file \"%s\", line %zu, column %zu: " "duplicated definition of '%s'", - s->file, event->start_mark.line + 1, event->start_mark.column, + state->file->name, event->start_mark.line + 1, event->start_mark.column, value); } -static void yaml_error_plugin_category(struct local_ctx *ctx, struct parser_state *s, +static void yaml_error_plugin_category(struct local_ctx *ctx, struct parser_state *state, yaml_event_t *event, char *value) { flb_error("[config] YAML error found in file \"%s\", line %zu, column %zu: " "the pipeline component '%s' is not valid. Try one of these values: " "customs, inputs, filters or outputs.", - s->file, event->start_mark.line + 1, event->start_mark.column, + state->file->name, event->start_mark.line + 1, event->start_mark.column, value); } static int is_file_included(struct local_ctx *ctx, const char *path) { struct mk_list *head; - struct flb_slist_entry *e; + struct flb_slist_entry *entry; mk_list_foreach(head, &ctx->includes) { - e = mk_list_entry(head, struct flb_slist_entry, _head); - if (strcmp(e->str, path) == 0) { + entry = mk_list_entry(head, struct flb_slist_entry, _head); + + if (strcmp(entry->str, path) == 0) { return FLB_TRUE; } } @@ -377,19 +384,23 @@ static int is_file_included(struct local_ctx *ctx, const char *path) } #ifndef _WIN32 -static int read_glob(struct flb_cf *cf, struct local_ctx *ctx, +static int read_glob(struct flb_cf *conf, struct local_ctx *ctx, struct parser_state *state, const char *path) { int ret = -1; glob_t glb; - char tmp[PATH_MAX]; + char tmp[PATH_MAX+1]; const char *glb_path; - size_t i; + size_t idx; int ret_glb = -1; - if (state->root_path && path[0] != '/') { - snprintf(tmp, PATH_MAX, "%s/%s", state->root_path, path); + if (state->file->path && path[0] != '/') { + ret = snprintf(tmp, PATH_MAX, "%s/%s", state->file->path, path); + + if (ret > PATH_MAX) { + return -1; + } glb_path = tmp; } else { @@ -397,6 +408,7 @@ static int read_glob(struct flb_cf *cf, struct local_ctx *ctx, } ret_glb = glob(glb_path, GLOB_NOSORT, NULL, &glb); + if (ret_glb != 0) { switch(ret_glb){ case GLOB_NOSPACE: @@ -414,9 +426,10 @@ static int read_glob(struct flb_cf *cf, struct local_ctx *ctx, return ret; } - for (i = 0; i < glb.gl_pathc; i++) { - ret = read_config(cf, ctx, state->file, glb.gl_pathv[i]); - if (ret < 0) { + for (idx = 0; idx < glb.gl_pathc; idx++) { + ret = read_config(conf, ctx, state->file, glb.gl_pathv[idx]); + + if (ret < 0) { break; } } @@ -425,14 +438,29 @@ static int read_glob(struct flb_cf *cf, struct local_ctx *ctx, return ret; } #else -static int read_glob(struct flb_cf *cf, struct parser_state *ctx, const char *path) +static char *dirname(char *path) +{ + char *ptr; + + + ptr = strrchr(path, '\\'); + + if (ptr == NULL) { + return path; + } + *ptr++='\0'; + return path; +} + +static int read_glob(struct flb_cf *conf, struct local_ctx *ctx, + struct parser_state *state, const char *path) { char *star, *p0, *p1; char pattern[MAX_PATH]; char buf[MAX_PATH]; int ret; struct stat st; - HANDLE h; + HANDLE hnd; WIN32_FIND_DATA data; if (strlen(path) > MAX_PATH - 1) { @@ -440,6 +468,7 @@ static int read_glob(struct flb_cf *cf, struct parser_state *ctx, const char *pa } star = strchr(path, '*'); + if (star == NULL) { return -1; } @@ -465,8 +494,9 @@ static int read_glob(struct flb_cf *cf, struct parser_state *ctx, const char *pa memcpy(pattern, path, (p1 - path)); pattern[p1 - path] = '\0'; - h = FindFirstFileA(pattern, &data); - if (h == INVALID_HANDLE_VALUE) { + hnd = FindFirstFileA(pattern, &data); + + if (hnd == INVALID_HANDLE_VALUE) { return 0; } @@ -488,200 +518,341 @@ static int read_glob(struct flb_cf *cf, struct parser_state *ctx, const char *pa if (FAILED(StringCchCatA(buf, MAX_PATH, data.cFileName))) { continue; } + if (FAILED(StringCchCatA(buf, MAX_PATH, p1))) { continue; } if (strchr(p1, '*')) { - read_glob(cf, ctx, buf); /* recursive */ + read_glob(conf, ctx, state, buf); /* recursive */ continue; } ret = stat(buf, &st); + if (ret == 0 && (st.st_mode & S_IFMT) == S_IFREG) { - if (read_config(cf, ctx, data.cFileName, buf) < 0) { + + if (read_config(conf, ctx, state, buf) < 0) { return -1; } } - } while (FindNextFileA(h, &data) != 0); + } while (FindNextFileA(hnd, &data) != 0); - FindClose(h); + FindClose(hnd); return 0; } #endif -static int consume_event(struct flb_cf *cf, struct local_ctx *ctx, - struct parser_state *s, yaml_event_t *event) +static void print_current_state(struct local_ctx *ctx, struct parser_state *state, + yaml_event_t *event) { - int len; - int ret; - char *value; - struct flb_kv *kv; - char *last_included = get_last_included_file(ctx); - - switch (s->state) { - case STATE_START: - switch (event->type) { - case YAML_STREAM_START_EVENT: - s->state = STATE_STREAM; - break; - default: - yaml_error_event(ctx, s, event); - return YAML_FAILURE; - } - break; + flb_error("%*s%s->%s", state->level*2, "", state_str(state->state), + event_type_str(event)); +} - case STATE_STREAM: - switch (event->type) { - case YAML_DOCUMENT_START_EVENT: - s->state = STATE_DOCUMENT; +static void print_current_properties(struct parser_state *state) +{ + struct cfl_list *head; + struct cfl_kvpair *prop; + struct cfl_variant *var; + int idx; + + flb_error("%*s[%s] PROPERTIES:", state->level*2, "", section_names[state->section]); + + cfl_list_foreach(head, &state->keyvals->list) { + prop = cfl_list_entry(head, struct cfl_kvpair, _head); + switch (prop->val->type) { + case CFL_VARIANT_STRING: + flb_error("%*s%s: %s", (state->level+2)*2, "", prop->key, prop->val->data.as_string); break; - case YAML_STREAM_END_EVENT: - s->state = STATE_STOP; /* all done */ + case CFL_VARIANT_ARRAY: + flb_error("%*s%s: [", (state->level+2)*2, "", prop->key); + for (idx = 0; idx < prop->val->data.as_array->entry_count; idx++) { + var = cfl_array_fetch_by_index(prop->val->data.as_array, idx); + flb_error("%*s%s", (state->level+3)*2, "", var->data.as_string); + } + flb_error("%*s]", (state->level+2)*2, ""); break; - default: - yaml_error_event(ctx, s, event); - return YAML_FAILURE; } - break; + } +} - case STATE_DOCUMENT: - switch (event->type) { - case YAML_MAPPING_START_EVENT: - s->state = STATE_SECTION; - break; - case YAML_DOCUMENT_END_EVENT: - s->state = STATE_STREAM; - break; - default: - yaml_error_event(ctx, s, event); +static struct parser_state *get_current_state(struct local_ctx *ctx) +{ + struct parser_state *state; + + if (cfl_list_size(&ctx->states) <= 0) { + return NULL; + } + state = cfl_list_entry_last(&ctx->states, struct parser_state, _head); + return state; +} + +static enum status state_copy_into_config_group(struct parser_state *state, struct flb_cf_group *cf_group) +{ + struct cfl_list *head; + struct cfl_kvpair *kvp; + struct cfl_variant *var; + struct cfl_variant *varr; + struct cfl_array *arr; + struct cfl_array *carr; + struct cfl_kvlist *copy; + int idx; + + if (cf_group == NULL) { + flb_error("no group for processor properties"); + return YAML_FAILURE; + } + + varr = cfl_kvlist_fetch(cf_group->properties, state->key); + + if (varr == NULL) { + arr = cfl_array_create(1); + + if (arr == NULL) { + flb_error("unable to allocate array"); return YAML_FAILURE; } - break; - /* - * 'customs' - * -------- - */ - case STATE_CUSTOM: - switch (event->type) { - case YAML_SEQUENCE_START_EVENT: - s->state = STATE_PLUGIN_TYPE; - break; + cfl_array_resizable(arr, CFL_TRUE); - case YAML_SEQUENCE_END_EVENT: - s->state = STATE_SECTION; - break; - default: - yaml_error_event(ctx, s, event); + if (cfl_kvlist_insert_array(cf_group->properties, state->key, arr) < 0) { + cfl_array_destroy(arr); + flb_error("unable to insert into array"); return YAML_FAILURE; } - break; + } + else { + arr = varr->data.as_array; + } - case STATE_CUSTOM_SEQUENCE: - switch (event->type) { - case YAML_SCALAR_EVENT: - value = (char *)event->data.scalar.value; - len = strlen(value); - if (len == 0) { - yaml_error_event(ctx, s, event); + copy = cfl_kvlist_create(); + + if (copy == NULL) { + cfl_array_destroy(arr); + flb_error("unable to allocate kvlist"); + return YAML_FAILURE; + } + + cfl_list_foreach(head, &state->keyvals->list) { + kvp = cfl_list_entry(head, struct cfl_kvpair, _head); + switch (kvp->val->type) { + case CFL_VARIANT_STRING: + + if (cfl_kvlist_insert_string(copy, kvp->key, kvp->val->data.as_string) < 0) { + flb_error("unable to allocate kvlist"); + cfl_kvlist_destroy(copy); return YAML_FAILURE; } + break; + case CFL_VARIANT_ARRAY: + carr = cfl_array_create(kvp->val->data.as_array->entry_count); - /* create the 'customs' section */ - s->cf_section = flb_cf_section_create(cf, "customs", 0); - if (!s->cf_section) { + if (carr) { + flb_error("unable to allocate array"); + cfl_kvlist_destroy(copy); return YAML_FAILURE; } + for (idx = 0; idx < kvp->val->data.as_array->entry_count; idx++) { + var = cfl_array_fetch_by_index(kvp->val->data.as_array, idx); - /* value is the 'custom plugin name', create a section instance */ - if (flb_cf_section_property_add(cf, s->cf_section->properties, - "name", 4, - value, len) < 0) { - return YAML_FAILURE; + if (var == NULL) { + cfl_array_destroy(arr); + flb_error("unable to fetch from array by index"); + return YAML_FAILURE; + } + switch (var->type) { + case CFL_VARIANT_STRING: + + if (cfl_array_append_string(carr, var->data.as_string) < 0) { + flb_error("unable to append string"); + cfl_kvlist_destroy(copy); + cfl_array_destroy(carr); + return YAML_FAILURE; + } + break; + default: + cfl_array_destroy(arr); + flb_error("unable to copy value for property"); + cfl_kvlist_destroy(copy); + cfl_array_destroy(carr); + return YAML_FAILURE; + } } - /* next state are key value pairs for the custom plugin*/ - s->state = STATE_CUSTOM_KEY_VALUE_PAIR; - break; - case YAML_MAPPING_START_EVENT: - break; - case YAML_MAPPING_END_EVENT: - break; - case YAML_SEQUENCE_END_EVENT: - s->state = STATE_SECTION; - break; - default: - yaml_error_event(ctx, s, event); - return YAML_FAILURE; - } - break; - case STATE_CUSTOM_KEY_VALUE_PAIR: - switch(event->type) { - case YAML_MAPPING_START_EVENT: - s->state = STATE_CUSTOM_KEY; - break; - case YAML_MAPPING_END_EVENT: - s->state = STATE_CUSTOM; - break; - case YAML_SEQUENCE_END_EVENT: + if (cfl_kvlist_insert_array(copy, kvp->key, carr) < 0) { + cfl_array_destroy(arr); + flb_error("unabelt to insert into array"); + flb_error("unable to insert array into kvlist"); + } break; - default: - yaml_error_event(ctx, s, event); + flb_error("unknown value type for properties: %d", kvp->val->type); + cfl_kvlist_destroy(copy); return YAML_FAILURE; } - break; + } - case STATE_CUSTOM_KEY: - switch(event->type) { - case YAML_SCALAR_EVENT: - s->state = STATE_CUSTOM_VAL; - value = (char *) event->data.scalar.value; - s->key = flb_sds_create(value); + if (cfl_array_append_kvlist(arr, copy) < 0) { + flb_error("unable to insert array into kvlist"); + cfl_kvlist_destroy(copy); + return YAML_FAILURE; + } + return YAML_SUCCESS; +} + +static enum status state_copy_into_properties(struct parser_state *state, struct flb_cf *conf, struct cfl_kvlist *properties) +{ + struct cfl_list *head; + struct cfl_kvpair *kvp; + struct cfl_variant *var; + struct cfl_array *arr; + int idx; + + cfl_list_foreach(head, &state->keyvals->list) { + kvp = cfl_list_entry(head, struct cfl_kvpair, _head); + switch (kvp->val->type) { + case CFL_VARIANT_STRING: + var = flb_cf_section_property_add(conf, + properties, + kvp->key, + cfl_sds_len(kvp->key), + kvp->val->data.as_string, + cfl_sds_len(kvp->val->data.as_string)); + + if (var == NULL) { + flb_error("unable to add variant value property"); + return YAML_FAILURE; + } break; - case YAML_MAPPING_START_EVENT: - s->state = STATE_CUSTOM; + case CFL_VARIANT_ARRAY: + arr = flb_cf_section_property_add_list(conf, properties, + kvp->key, cfl_sds_len(kvp->key)); + + if (arr == NULL) { + flb_error("unable to add property list"); + return YAML_FAILURE; + } + for (idx = 0; idx < kvp->val->data.as_array->entry_count; idx++) { + var = cfl_array_fetch_by_index(kvp->val->data.as_array, idx); + + if (var == NULL) { + flb_error("unable to retrieve from array by index"); + return YAML_FAILURE; + } + switch (var->type) { + case CFL_VARIANT_STRING: + + if (cfl_array_append_string(arr, var->data.as_string) < 0) { + flb_error("unable to append string to array"); + return YAML_FAILURE; + } + break; + default: + flb_error("unable to copy value for property"); + return YAML_FAILURE; + } + } break; - case YAML_MAPPING_END_EVENT: - s->state = STATE_CUSTOM_SEQUENCE; + default: + flb_error("unknown value type for properties: %d", kvp->val->type); + return YAML_FAILURE; + } + } + return YAML_SUCCESS; +} + +static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, + yaml_event_t *event) +{ + struct parser_state *state; + enum status status; + int ret; + char *value; + struct flb_kv *keyval; + char *last_included; + + last_included = state_get_last(ctx); + + if (last_included == NULL) { + last_included = "**unknown**"; + } + + state = get_current_state(ctx); + + if (state == NULL) { + flb_error("unable to parse yaml: no state"); + return YAML_FAILURE; + } + print_current_state(ctx, state, event); + + switch (state->state) { + case STATE_START: + switch (event->type) { + case YAML_STREAM_START_EVENT: + state = state_push(ctx, STATE_STREAM); + + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + break; + case YAML_NO_EVENT: + state->state = STATE_STOP; break; default: - yaml_error_event(ctx, s, event); + yaml_error_event(ctx, state, event); return YAML_FAILURE; } break; - case STATE_CUSTOM_VAL: - switch(event->type) { - case YAML_SCALAR_EVENT: - s->state = STATE_CUSTOM_KEY; - value = (char *) event->data.scalar.value; - s->val = flb_sds_create(value); + case STATE_STREAM: + switch (event->type) { + case YAML_DOCUMENT_START_EVENT: + state = state_push(ctx, STATE_DOCUMENT); - /* register key/value pair as a property */ - flb_cf_section_property_add(cf, s->cf_section->properties, - s->key, flb_sds_len(s->key), - s->val, flb_sds_len(s->val)); - flb_sds_destroy(s->key); - flb_sds_destroy(s->val); + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } break; - case YAML_MAPPING_START_EVENT: /* start a new group */ - s->state = STATE_GROUP_KEY; - s->cf_group = flb_cf_group_create(cf, s->cf_section, - s->key, flb_sds_len(s->key)); - flb_sds_destroy(s->key); - if (!s->cf_group) { + case YAML_STREAM_END_EVENT: + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + break; + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + + case STATE_DOCUMENT: + switch (event->type) { + case YAML_MAPPING_START_EVENT: + state = state_push(ctx, STATE_SECTION); + + if (state == NULL) { + flb_error("unable to allocate state"); return YAML_FAILURE; } break; + case YAML_DOCUMENT_END_EVENT: + state = state_pop(ctx); + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + break; default: - yaml_error_event(ctx, s, event); + yaml_error_event(ctx, state, event); return YAML_FAILURE; } break; - /* end of 'customs' */ /* * 'includes' @@ -692,19 +863,31 @@ static int consume_event(struct flb_cf *cf, struct local_ctx *ctx, case YAML_SEQUENCE_START_EVENT: break; case YAML_SEQUENCE_END_EVENT: - s->state = STATE_SECTION; + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + + if (state->state != STATE_SECTION) { + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } break; case YAML_SCALAR_EVENT: value = (char *) event->data.scalar.value; - flb_debug("[config yaml] including: %s", value); + flb_error("[config yaml] including: %s", value); + if (strchr(value, '*') != NULL) { - ret = read_glob(cf, ctx, s, value); + ret = read_glob(conf, ctx, state, value); } else { - ret = read_config(cf, ctx, s->file, value); + ret = read_config(conf, ctx, state->file, value); } + if (ret == -1) { - flb_error("[config] including file '%s' at %s:%zu", + flb_error("[config] including file '%s' at %s:%zu", value, last_included, event->start_mark.line + 1); return YAML_FAILURE; @@ -712,95 +895,174 @@ static int consume_event(struct flb_cf *cf, struct local_ctx *ctx, ctx->level++; break; default: - yaml_error_event(ctx, s, event); + yaml_error_event(ctx, state, event); return YAML_FAILURE; } break; /* end of 'includes' */ + /* + * 'customs' + * -------- + */ + case STATE_CUSTOM: + switch (event->type) { + case YAML_SEQUENCE_START_EVENT: + break; + case YAML_MAPPING_START_EVENT: + state = state_push_withvals(ctx, state, STATE_PLUGIN_START); + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + if (add_section_type(conf, state) == -1) { + flb_error("unable to add section type"); + return YAML_FAILURE; + } + break; + case YAML_SEQUENCE_END_EVENT: + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + break; + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + /* end of 'customs' */ + case STATE_PIPELINE: switch (event->type) { case YAML_SCALAR_EVENT: value = (char *)event->data.scalar.value; + if (strcasecmp(value, "inputs") == 0) { - s->state = STATE_PLUGIN_INPUT; - s->section = SECTION_INPUT; + state = state_push_section(ctx, STATE_PLUGIN_INPUT, SECTION_INPUT); } else if (strcasecmp(value, "filters") == 0) { - s->state = STATE_PLUGIN_FILTER; - s->section = SECTION_FILTER; + state = state_push_section(ctx, STATE_PLUGIN_FILTER, SECTION_FILTER); } else if (strcasecmp(value, "outputs") == 0) { - s->state = STATE_PLUGIN_OUTPUT; - s->section = SECTION_OUTPUT; + state = state_push_section(ctx, STATE_PLUGIN_OUTPUT, SECTION_OUTPUT); } else { - yaml_error_plugin_category(ctx, s, event, value); + yaml_error_plugin_category(ctx, state, event, value); + return YAML_FAILURE; + } + + if (state == NULL) { + flb_error("unable to allocate state"); return YAML_FAILURE; } break; case YAML_MAPPING_START_EVENT: break; case YAML_MAPPING_END_EVENT: - s->state = STATE_SECTION; + state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } break; default: - yaml_error_event(ctx, s, event); + yaml_error_event(ctx, state, event); return YAML_FAILURE; } break; case STATE_SECTION: - s->section = SECTION_OTHER; switch (event->type) { case YAML_SCALAR_EVENT: value = (char *)event->data.scalar.value; + if (strcasecmp(value, "env") == 0) { - s->state = STATE_ENV; - s->section = SECTION_ENV; + state = state_push_section(ctx, STATE_ENV, SECTION_ENV); + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } } else if (strcasecmp(value, "pipeline") == 0) { - s->state = STATE_PIPELINE; - s->section = SECTION_PIPELINE; + state = state_push_section(ctx, STATE_PIPELINE, SECTION_PIPELINE); + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } } else if (strcasecmp(value, "service") == 0) { + if (ctx->service_set) { - yaml_error_definition(ctx, s, event, value); + yaml_error_definition(ctx, state, event, value); + return YAML_FAILURE; + } + + state = state_push_section(ctx, STATE_SERVICE, SECTION_SERVICE); + + if (state == NULL) { + flb_error("unable to allocate state"); return YAML_FAILURE; } - s->state = STATE_SERVICE; - s->section = SECTION_SERVICE; - s->cf_section = flb_cf_section_create(cf, value, 0); - if (!s->cf_section) { + + if (state_create_section(conf, state, value) == -1) { + flb_error("unable to allocate section: %s", value); return YAML_FAILURE; } ctx->service_set = 1; } else if (strcasecmp(value, "customs") == 0) { - s->state = STATE_CUSTOM; - s->section = SECTION_CUSTOM; + state = state_push_section(ctx, STATE_CUSTOM, SECTION_CUSTOM); + + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } } else if (strcasecmp(value, "includes") == 0) { - s->state = STATE_INCLUDE; - s->section = SECTION_INCLUDE; + state = state_push_section(ctx, STATE_INCLUDE, SECTION_INCLUDE); + + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } } else { /* any other main section definition (e.g: similar to STATE_SERVICE) */ - s->state = STATE_OTHER; - s->cf_section = flb_cf_section_create(cf, value, 0); - if (!s->cf_section) { + state = state_push(ctx, STATE_OTHER); + + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + + if (state_create_section(conf, state, value) == -1) { + flb_error("unable to allocate section: %s", value); return YAML_FAILURE; } } break; case YAML_MAPPING_END_EVENT: - s->state = STATE_DOCUMENT; + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } break; case YAML_DOCUMENT_END_EVENT: - s->state = STATE_STREAM; + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } break; default: - yaml_error_event(ctx, s, event); + yaml_error_event(ctx, state, event); return YAML_FAILURE; } break; @@ -811,13 +1073,28 @@ static int consume_event(struct flb_cf *cf, struct local_ctx *ctx, case STATE_OTHER: switch(event->type) { case YAML_MAPPING_START_EVENT: - s->state = STATE_SECTION_KEY; + state = state_push(ctx, STATE_SECTION_KEY); + + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } break; case YAML_MAPPING_END_EVENT: - s->state = STATE_SECTION; + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + + if (state->state != STATE_SECTION) { + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } break; default: - yaml_error_event(ctx, s, event); + yaml_error_event(ctx, state, event); return YAML_FAILURE; } break; @@ -825,15 +1102,35 @@ static int consume_event(struct flb_cf *cf, struct local_ctx *ctx, case STATE_SECTION_KEY: switch(event->type) { case YAML_SCALAR_EVENT: - s->state = STATE_SECTION_VAL; value = (char *) event->data.scalar.value; - s->key = flb_sds_create(value); + state = state_push_key(ctx, STATE_SECTION_VAL, value); + + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } break; case YAML_MAPPING_END_EVENT: - s->state = STATE_SECTION; + state = state_pop(ctx); + switch (state->state) { + case STATE_SERVICE: + case STATE_ENV: + case STATE_OTHER: + break; + default: + printf("BAD STATE FOR SECTION KEY POP=%s\n", state_str(state->state)); + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } break; default: - yaml_error_event(ctx, s, event); + yaml_error_event(ctx, state, event); return YAML_FAILURE; } break; @@ -841,35 +1138,49 @@ static int consume_event(struct flb_cf *cf, struct local_ctx *ctx, case STATE_SECTION_VAL: switch(event->type) { case YAML_SCALAR_EVENT: - s->state = STATE_SECTION_KEY; value = (char *) event->data.scalar.value; - s->val = flb_sds_create(value); /* Check if the incoming k/v pair set a config environment variable */ - if (s->section == SECTION_ENV) { - kv = flb_cf_env_property_add(cf, - s->key, flb_sds_len(s->key), - s->val, flb_sds_len(s->val)); - if (kv == NULL) { + if (state->section == SECTION_ENV) { + keyval = flb_cf_env_property_add(conf, + state->key, flb_sds_len(state->key), + value, strlen(value)); + + if (keyval == NULL) { + flb_error("unable to add key value"); return YAML_FAILURE; } } else { + /* register key/value pair as a property */ - if (s->cf_section == NULL) { + if (state->cf_section == NULL) { + flb_error("no section to register key value to"); return YAML_FAILURE; } - if (flb_cf_section_property_add(cf, s->cf_section->properties, - s->key, flb_sds_len(s->key), - s->val, flb_sds_len(s->val)) < 0) { + + if (flb_cf_section_property_add(conf, state->cf_section->properties, + state->key, flb_sds_len(state->key), + value, strlen(value)) < 0) { + flb_error("unable to add property"); return YAML_FAILURE; } } - flb_sds_destroy(s->key); - flb_sds_destroy(s->val); + + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + + if (state->state != STATE_SECTION_KEY) { + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } break; default: - yaml_error_event(ctx, s, event); + yaml_error_event(ctx, state, event); return YAML_FAILURE; } break; @@ -880,57 +1191,127 @@ static int consume_event(struct flb_cf *cf, struct local_ctx *ctx, case STATE_PLUGIN_OUTPUT: switch(event->type) { case YAML_SEQUENCE_START_EVENT: - s->state = STATE_PLUGIN_TYPE; break; case YAML_SEQUENCE_END_EVENT: - break; - case YAML_SCALAR_EVENT: - s->state = STATE_SECTION; + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } break; case YAML_MAPPING_START_EVENT: - s->state = STATE_PLUGIN_TYPE; + state = state_push_withvals(ctx, state, STATE_PLUGIN_START); + + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + + if (add_section_type(conf, state) == -1) { + flb_error("unable to add section type"); + return YAML_FAILURE; + } + break; + case YAML_SCALAR_EVENT: + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + + if (state->state != STATE_SECTION) { + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } break; case YAML_MAPPING_END_EVENT: - s->state = STATE_SECTION_KEY; + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + + if (state->state != STATE_SECTION_KEY) { + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } break; default: - yaml_error_event(ctx, s, event); + yaml_error_event(ctx, state, event); return YAML_FAILURE; } break; - case STATE_PLUGIN_TYPE: + case STATE_PLUGIN_START: switch(event->type) { case YAML_SCALAR_EVENT: - /* register the section by type */ - ret = add_section_type(cf, s); - if (ret == -1) { + /* Here is where we process all the plugin properties for customs, pipelines + * and processors. + */ + state = state_push_key(ctx, STATE_PLUGIN_VAL, (char *) event->data.scalar.value); + + if (state == NULL) { + flb_error("unable to allocate state"); return YAML_FAILURE; } - - /* the next state is the keys of the properties of the plugin. */ - s->state = STATE_PLUGIN_KEY; break; - case YAML_MAPPING_START_EVENT: - ret = add_section_type(cf, s); - if (ret == -1) { + case YAML_MAPPING_END_EVENT: + print_current_properties(state); + + if (state->section == SECTION_PROCESSOR) { + status = state_copy_into_config_group(state, state->cf_group); + + if (status != YAML_SUCCESS) { + return status; + } + } + else { + status = state_copy_into_properties(state, conf, state->cf_section->properties); + + if (status != YAML_SUCCESS) { + return status; + } + } + + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); return YAML_FAILURE; } - s->state = STATE_PLUGIN_KEY; break; - case YAML_MAPPING_END_EVENT: + case YAML_SEQUENCE_START_EVENT: /* start a new group */ + + if (strcmp(state->key, "processors") == 0) { + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + + if (state->key == NULL) { + flb_error("no key"); + return YAML_FAILURE; + } + + state = state_push_witharr(ctx, state, STATE_PLUGIN_VAL_LIST); + + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } break; case YAML_SEQUENCE_END_EVENT: - if (s->section == SECTION_CUSTOM) { - /* 'customs' is a top level. So we get back to 'section'. */ - s->state = STATE_SECTION; - } - else { - s->state = STATE_PIPELINE; + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; } break; default: - yaml_error_event(ctx, s, event); + yaml_error_event(ctx, state, event); return YAML_FAILURE; } break; @@ -938,336 +1319,324 @@ static int consume_event(struct flb_cf *cf, struct local_ctx *ctx, case STATE_PLUGIN_KEY: switch(event->type) { case YAML_SCALAR_EVENT: - s->state = STATE_PLUGIN_VAL; - value = (char *) event->data.scalar.value; - s->key = flb_sds_create(value); + /* Here is where we process all the plugin properties for customs, pipelines + * and processors. + */ + state = state_push_key(ctx, STATE_PLUGIN_VAL, (char *) event->data.scalar.value); + + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } break; case YAML_MAPPING_START_EVENT: - s->state = STATE_PLUGIN_TYPE; + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + + if (state->state != STATE_PLUGIN_START) { + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } break; case YAML_MAPPING_END_EVENT: - s->state = STATE_PLUGIN_TYPE; + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + + if (state->state != STATE_PLUGIN_START) { + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } break; case YAML_SEQUENCE_END_EVENT: + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } break; default: - yaml_error_event(ctx, s, event); + yaml_error_event(ctx, state, event); return YAML_FAILURE; } break; - case STATE_INPUT_PROCESSOR: + case STATE_PLUGIN_VAL: switch(event->type) { - case YAML_MAPPING_START_EVENT: - break; - case YAML_MAPPING_END_EVENT: - s->state = STATE_PLUGIN_KEY; - break; - case YAML_SCALAR_EVENT: - /* remove 'processors' key, not longer needed */ - if (s->key) { - flb_sds_destroy(s->key); - s->key = NULL; - } - /* Check if we are entering a 'logs', 'metrics' or 'traces' section */ - value = (char *) event->data.scalar.value; - if (strcasecmp(value, "logs") == 0) { - /* logs state */ - s->state = STATE_INPUT_PROCESSOR_LOGS_KEY; + case YAML_SCALAR_EVENT: - /* create the array for definitions found under 'log' */ - s->cf_processor_type_array = cfl_array_create(1); - cfl_array_resizable(s->cf_processor_type_array, CFL_TRUE); + /* register key/value pair as a property */ + if (cfl_kvlist_insert_string(state->keyvals, state->key, (char *)event->data.scalar.value) < 0) { + flb_error("unable to insert string"); + return YAML_FAILURE; + } - cfl_kvlist_insert_array(s->cf_group->properties, "logs", s->cf_processor_type_array); - } - else if (strcasecmp(value, "metrics") == 0) { - /* metrics state */ - s->state = STATE_INPUT_PROCESSOR_METRICS_KEY; + state = state_pop(ctx); - /* create the array for definitions found under 'log' */ - s->cf_processor_type_array = cfl_array_create(1); - cfl_array_resizable(s->cf_processor_type_array, CFL_TRUE); + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + break; + case YAML_SEQUENCE_START_EVENT: /* start a new group */ + state = state_push_witharr(ctx, state, STATE_PLUGIN_VAL_LIST); - cfl_kvlist_insert_array(s->cf_group->properties, "metrics", s->cf_processor_type_array); - } - else if (strcasecmp(value, "traces") == 0) { - /* metrics state */ - s->state = STATE_INPUT_PROCESSOR_TRACES_KEY; + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + break; + case YAML_MAPPING_START_EVENT: - /* create the array for definitions found under 'log' */ - s->cf_processor_type_array = cfl_array_create(1); - cfl_array_resizable(s->cf_processor_type_array, CFL_TRUE); + if (strcmp(state->key, "processors") == 0) { + state = state_push(ctx, STATE_INPUT_PROCESSORS); - cfl_kvlist_insert_array(s->cf_group->properties, "traces", s->cf_processor_type_array); + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; } - else { - flb_error("[config] unknown processor '%s'", value); - yaml_error_event(ctx, s, event); + + if (state_create_group(conf, state, "processors") == YAML_FAILURE) { return YAML_FAILURE; } break; - default: - yaml_error_event(ctx, s, event); + } + + state = state_push(ctx, STATE_GROUP_KEY); + + if (state == NULL) { + flb_error("unable to allocate state"); return YAML_FAILURE; - }; - break; - case STATE_INPUT_PROCESSOR_LOGS_KEY: - switch(event->type) { - case YAML_SEQUENCE_START_EVENT: - break; - case YAML_SEQUENCE_END_EVENT: - s->state = STATE_INPUT_PROCESSOR; - break; - case YAML_MAPPING_START_EVENT: - s->cf_processor_type_list = cfl_kvlist_create(); - cfl_array_append_kvlist(s->cf_processor_type_array, s->cf_processor_type_list); - break; - case YAML_MAPPING_END_EVENT: - break; - case YAML_SCALAR_EVENT: - /* Check if we are entering a 'logs', 'metrics' or 'traces' section */ - value = (char *) event->data.scalar.value; - s->key = flb_sds_create(value); - s->state = STATE_INPUT_PROCESSOR_LOGS_VAL; - break; - default: - yaml_error_event(ctx, s, event); + } + /* create group */ + state->values = flb_cf_section_property_add_list(conf, + state->cf_section->properties, + state->key, flb_sds_len(state->key)); + + if (state->values == NULL) { + flb_error("no values"); return YAML_FAILURE; - }; - break; + } - case STATE_INPUT_PROCESSOR_LOGS_VAL: - switch(event->type) { - case YAML_SEQUENCE_START_EVENT: - s->state = STATE_INPUT_PROCESSOR_LOGS_KEY; - break; - case YAML_SEQUENCE_END_EVENT: - break; - case YAML_MAPPING_START_EVENT: - break; - case YAML_MAPPING_END_EVENT: - break; - case YAML_SCALAR_EVENT: - value = (char *) event->data.scalar.value; - if (!s->cf_processor_type_list || !s->key || !value) { - s->state = STATE_INPUT_PROCESSOR; - break; - } - cfl_kvlist_insert_string(s->cf_processor_type_list, s->key, value); - flb_sds_destroy(s->key); - s->key = NULL; - s->state = STATE_INPUT_PROCESSOR_LOGS_KEY; - break; - default: - yaml_error_event(ctx, s, event); + state->cf_group = flb_cf_group_create(conf, state->cf_section, state->key, strlen(state->key)); + + if (state->cf_group == NULL) { + flb_error("unable to create group"); return YAML_FAILURE; - }; - break; + } + break; + case YAML_SEQUENCE_END_EVENT: /* end of group */ + state = state_pop(ctx); - case STATE_INPUT_PROCESSOR_METRICS_KEY: - switch(event->type) { - case YAML_SEQUENCE_START_EVENT: - break; - case YAML_SEQUENCE_END_EVENT: - s->state = STATE_INPUT_PROCESSOR; - break; - case YAML_MAPPING_START_EVENT: - s->cf_processor_type_list = cfl_kvlist_create(); - cfl_array_append_kvlist(s->cf_processor_type_array, s->cf_processor_type_list); - break; - case YAML_MAPPING_END_EVENT: - break; - case YAML_SCALAR_EVENT: - value = (char *) event->data.scalar.value; - s->key = flb_sds_create(value); - s->state = STATE_INPUT_PROCESSOR_METRICS_VAL; - break; - default: - yaml_error_event(ctx, s, event); + if (state == NULL) { + flb_error("no state left"); return YAML_FAILURE; - }; + } + + if (state->state != STATE_PLUGIN_KEY) { + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + + if (state->state != STATE_PLUGIN_KEY) { + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + case YAML_MAPPING_END_EVENT: + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + break; + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } break; - case STATE_INPUT_PROCESSOR_METRICS_VAL: + case STATE_PLUGIN_VAL_LIST: switch(event->type) { - case YAML_SEQUENCE_START_EVENT: - s->state = STATE_INPUT_PROCESSOR_METRICS_KEY; - break; - case YAML_SEQUENCE_END_EVENT: - break; - case YAML_MAPPING_START_EVENT: - break; - case YAML_MAPPING_END_EVENT: - break; - case YAML_SCALAR_EVENT: - value = (char *) event->data.scalar.value; - cfl_kvlist_insert_string(s->cf_processor_type_list, s->key, value); - flb_sds_destroy(s->key); - s->key = NULL; - s->val = NULL; - s->state = STATE_INPUT_PROCESSOR_METRICS_KEY; - break; - default: - yaml_error_event(ctx, s, event); + case YAML_SCALAR_EVENT: + + if (state->values == NULL) { + flb_error("unable to add values to list"); return YAML_FAILURE; - }; + } + + if (cfl_array_append_string(state->values, (char *)event->data.scalar.value) < 0) { + flb_error("unable to add values to list"); + return YAML_FAILURE; + } + break; + case YAML_SEQUENCE_END_EVENT: + + /* register key/value pair as a property */ + if (cfl_kvlist_insert_array(state->keyvals, state->key, state->values) < 0) { + flb_error("unable to insert key values"); + return YAML_FAILURE; + } + + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + break; + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } break; - case STATE_INPUT_PROCESSOR_TRACES_KEY: + case STATE_INPUT_PROCESSORS: switch(event->type) { - case YAML_SEQUENCE_START_EVENT: - break; - case YAML_SEQUENCE_END_EVENT: - s->state = STATE_INPUT_PROCESSOR; - break; case YAML_MAPPING_START_EVENT: - s->cf_processor_type_list = cfl_kvlist_create(); - cfl_array_append_kvlist(s->cf_processor_type_array, s->cf_processor_type_list); break; case YAML_MAPPING_END_EVENT: + + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } break; case YAML_SCALAR_EVENT: + + /* Check if we are entering a 'logs', 'metrics' or 'traces' section */ value = (char *) event->data.scalar.value; - s->key = flb_sds_create(value); - s->state = STATE_INPUT_PROCESSOR_TRACES_VAL; + + if (strcasecmp(value, "logs") == 0) { + /* logs state */ + state = state_push_key(ctx, STATE_INPUT_PROCESSOR, "logs"); + } + else if (strcasecmp(value, "metrics") == 0) { + /* metrics state */ + state = state_push_key(ctx, STATE_INPUT_PROCESSOR, "metrics"); + } + else if (strcasecmp(value, "traces") == 0) { + /* metrics state */ + state = state_push_key(ctx, STATE_INPUT_PROCESSOR, "traces"); + } + else { + flb_error("[config] unknown processor '%s'", value); + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } break; default: - yaml_error_event(ctx, s, event); + yaml_error_event(ctx, state, event); return YAML_FAILURE; }; break; - case STATE_INPUT_PROCESSOR_TRACES_VAL: + case STATE_INPUT_PROCESSOR: switch(event->type) { case YAML_SEQUENCE_START_EVENT: - s->state = STATE_INPUT_PROCESSOR_TRACES_KEY; break; case YAML_SEQUENCE_END_EVENT: + + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } break; case YAML_MAPPING_START_EVENT: + + state = state_push_withvals(ctx, state, STATE_PLUGIN_START); + + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + state->section = SECTION_PROCESSOR; break; case YAML_MAPPING_END_EVENT: - break; - case YAML_SCALAR_EVENT: - value = (char *) event->data.scalar.value; - cfl_kvlist_insert_string(s->cf_processor_type_list, s->key, value); - flb_sds_destroy(s->key); - s->key = NULL; - s->val = NULL; - s->state = STATE_INPUT_PROCESSOR_TRACES_KEY; + return YAML_FAILURE; break; default: - yaml_error_event(ctx, s, event); + yaml_error_event(ctx, state, event); return YAML_FAILURE; }; break; - case STATE_PLUGIN_VAL: + + /* groups: a group is a sub-section and here we handle the key/value pairs. */ + case STATE_GROUP_KEY: switch(event->type) { case YAML_SCALAR_EVENT: - s->state = STATE_PLUGIN_KEY; + /* grab current value (key) */ value = (char *) event->data.scalar.value; - s->val = flb_sds_create(value); - /* register key/value pair as a property */ - if (flb_cf_section_property_add(cf, s->cf_section->properties, - s->key, flb_sds_len(s->key), - s->val, flb_sds_len(s->val)) < 0) { - return YAML_FAILURE; - } - if (cfl_kvlist_count(s->cf_section->properties) <= 0) { - return YAML_FAILURE; - } - flb_sds_destroy(s->key); - flb_sds_destroy(s->val); - s->key = NULL; - s->val = NULL; - break; - case YAML_SEQUENCE_START_EVENT: /* start a new group */ - if (strcmp(s->key, "processors") == 0) { - yaml_error_event(ctx, s, event); - return YAML_FAILURE; - } - s->state = STATE_GROUP_KEY; - s->cf_group = flb_cf_group_create(cf, s->cf_section, - s->key, flb_sds_len(s->key)); - flb_sds_destroy(s->key); - if (!s->cf_group) { + state = state_push_key(ctx, STATE_GROUP_VAL, value); + + if (state == NULL) { + flb_error("unable to allocate state"); return YAML_FAILURE; } break; - case YAML_SEQUENCE_END_EVENT: /* end of group */ - s->state = STATE_PLUGIN_KEY; - break; case YAML_MAPPING_START_EVENT: - /* create group */ - s->cf_group = flb_cf_group_create(cf, s->cf_section, s->key, strlen(s->key)); - - /* Special handling for input processor */ - if (strcmp(s->key, "processors") == 0) { - s->state = STATE_INPUT_PROCESSOR; - break; - } - - s->state = STATE_GROUP_KEY; - s->values = flb_cf_section_property_add_list(cf, - s->cf_section->properties, - s->key, flb_sds_len(s->key)); - if (s->values == NULL) { - return YAML_FAILURE; - } - flb_sds_destroy(s->key); - s->key = NULL; break; case YAML_MAPPING_END_EVENT: - s->state = STATE_PLUGIN_KEY; - break; - default: - yaml_error_event(ctx, s, event); - return YAML_FAILURE; - } - break; - case STATE_PLUGIN_VAL_LIST: - switch(event->type) { - case YAML_SCALAR_EVENT: - if (s->values == NULL) { + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); return YAML_FAILURE; } - cfl_array_append_string(s->values, (char *)event->data.scalar.value); - break; - case YAML_SEQUENCE_END_EVENT: - s->values = NULL; - s->state = STATE_PLUGIN_KEY; - break; - default: - yaml_error_event(ctx, s, event); - return YAML_FAILURE; - } - break; - /* groups: a group is a sub-section and here we handle the key/value pairs */ - case STATE_GROUP_KEY: - switch(event->type) { - case YAML_SCALAR_EVENT: - /* next state */ - s->state = STATE_GROUP_VAL; + /* This is also the end of the plugin values mapping. + * So we pop an additional state off the stack. + */ + state = state_pop(ctx); - /* grab current value (key) */ - value = (char *) event->data.scalar.value; - s->key = flb_sds_create(value); - break; - case YAML_MAPPING_START_EVENT: - break; - case YAML_MAPPING_END_EVENT: - s->state = STATE_PLUGIN_KEY; - break; - case YAML_SEQUENCE_END_EVENT: - s->state = STATE_PLUGIN_KEY; - s->cf_group = NULL; + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } break; default: - yaml_error_event(ctx, s, event); + yaml_error_event(ctx, state, event); return YAML_FAILURE; } break; @@ -1275,22 +1644,25 @@ static int consume_event(struct flb_cf *cf, struct local_ctx *ctx, case STATE_GROUP_VAL: switch(event->type) { case YAML_SCALAR_EVENT: - s->state = STATE_GROUP_KEY; value = (char *) event->data.scalar.value; - s->val = flb_sds_create(value); /* add the kv pair to the active group properties */ - flb_cf_section_property_add(cf, s->cf_group->properties, - s->key, flb_sds_len(s->key), - s->val, flb_sds_len(s->val)); - flb_sds_destroy(s->key); - flb_sds_destroy(s->val); - s->key = NULL; - s->val = NULL; + if (flb_cf_section_property_add(conf, state->cf_group->properties, + state->key, flb_sds_len(state->key), + value, strlen(value)) == NULL) { + flb_error("unable to add property"); + return YAML_FAILURE; + } + state = state_pop(ctx); + + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } break; default: - yaml_error_event(ctx, s, event); + yaml_error_event(ctx, state, event); return YAML_FAILURE; } break; @@ -1302,151 +1674,333 @@ static int consume_event(struct flb_cf *cf, struct local_ctx *ctx, return YAML_SUCCESS; } -static char *get_real_path(char *file, char *path, size_t size) +static struct parser_state *state_start(struct local_ctx *ctx, struct file_state *file) { - int len; - char *p; - char *end; + struct parser_state *state; -#ifdef _MSC_VER - p = _fullpath(path, file, size); -#else - p = realpath(file, path); -#endif + state = state_create(NULL, file); - if (!p) { - len = strlen(file); - if (len > size) { - return NULL; - } - memcpy(path, file, len); - path[len] = '\0'; + if (state != NULL) { + cfl_list_add(&state->_head, &ctx->states); } - /* lookup path ending and truncate */ -#ifdef _MSC_VER - end = strrchr(path, '\\'); -#else - end = strrchr(path, '/'); -#endif + return state; +} - if (end) { - end++; - *end = '\0'; +static struct parser_state *state_push(struct local_ctx *ctx, enum state state_num) +{ + struct parser_state *last = NULL; + struct parser_state *state; + + if (cfl_list_size(&ctx->states) <= 0) { + return NULL; } - return path; + last = cfl_list_entry_last(&ctx->states, struct parser_state, _head); + + if (last == NULL) { + return NULL; + } + + state = state_create(last->file, last->file); + + if (state == NULL) { + return NULL; + } + state->section = last->section; + state->keyvals = last->keyvals; + state->cf_section = last->cf_section; + state->cf_group = last->cf_group; + state->values = last->values; + state->file = last->file; + state->state = state_num; + state->level = last->level + 1; + state->key = last->key; + + cfl_list_add(&state->_head, &ctx->states); + return state; } -static void state_destroy(struct parser_state *s) +static struct parser_state *state_push_section(struct local_ctx *ctx, + enum state state_num, + enum section section) { - if (s->caller_file) { - flb_sds_destroy(s->caller_file); + struct parser_state *state; + + state = state_push(ctx, state_num); + + if (state == NULL) { + return NULL; } - if (s->caller_root_path) { - flb_sds_destroy(s->caller_root_path); + state->section = section; + + return state; +} + +static struct parser_state *state_push_key(struct local_ctx *ctx, + enum state state_num, + const char *key) +{ + struct parser_state *state; + flb_sds_t skey; + + if (key == NULL) { + return NULL; } - if (s->file) { - flb_sds_destroy(s->file); + skey = flb_sds_create(key); + + if (skey == NULL) { + return NULL; } - if (s->root_path) { - flb_sds_destroy(s->root_path); + state = state_push(ctx, state_num); + + if (state == NULL) { + flb_sds_destroy(skey); + return NULL; } - flb_free(s); + + state->key = skey; + state->allocation_flags |= HAS_KEY; + return state; } -static struct parser_state *state_create(char *caller_file, char *file) +static struct parser_state *state_push_withvals(struct local_ctx *ctx, + struct parser_state *parent, + enum state state_num) { - int ret; - char *p; - char file_path[PATH_MAX + 1] = {0}; - char caller_path[PATH_MAX + 1] = {0}; - struct parser_state *s; - struct stat st; + struct parser_state *state; + struct cfl_kvlist *kvlist; + + kvlist = cfl_kvlist_create(); - if (!file) { + if (kvlist == NULL) { return NULL; } - /* allocate context */ - s = flb_calloc(1, sizeof(struct parser_state)); - if (!s) { - flb_errno(); + state = state_push(ctx, state_num); + + if (state == NULL) { + cfl_kvlist_destroy(kvlist); return NULL; } - /* resolve real path for caller file and target file */ -#ifndef FLB_HAVE_STATIC_CONF - if (caller_file) { - p = get_real_path(caller_file, caller_path, PATH_MAX + 1); - if (!p) { - state_destroy(s); - return NULL; - } - s->caller_file = flb_sds_create(caller_file); - s->caller_root_path = flb_sds_create(caller_path); + state->keyvals = kvlist; + state->allocation_flags |= HAS_KEYVALS; + + return state; +} + +static struct parser_state *state_push_witharr(struct local_ctx *ctx, + struct parser_state *parent, + enum state state_num) +{ + struct parser_state *state; + + parent->values = cfl_array_create(4); + + if (parent->values == NULL) { + flb_error("parent has no values"); + return NULL; } - else { - s->caller_file = flb_sds_create(s->file); - s->caller_root_path = flb_sds_create(s->root_path); + + cfl_array_resizable(parent->values, CFL_TRUE); + + state = state_push(ctx, state_num); + + return state; +} + +static int state_create_section(struct flb_cf *conf, struct parser_state *state, char *name) +{ + + if (state == NULL || conf == NULL || name == NULL) { + return -1; + } + + state->cf_section = flb_cf_section_create(conf, name, 0); + + if (state->cf_section == NULL) { + return -1; + } + + return 0; +} + +static int state_create_group(struct flb_cf *conf, struct parser_state *state, char *name) +{ + if (state == NULL || conf == NULL || name == NULL) { + return -1; + } + + state->cf_group = flb_cf_group_create(conf, state->cf_section, + "processors", strlen("processors")); + + if (state->cf_group == NULL) { + return -1; + } + + return YAML_SUCCESS; +} + +static struct parser_state *state_pop(struct local_ctx *ctx) +{ + struct parser_state *last; + + if (ctx == NULL) { + return NULL; + } + + if (cfl_list_size(&ctx->states) <= 0) { + return NULL; + } + + last = cfl_list_entry_last(&ctx->states, struct parser_state, _head); + cfl_list_del(&last->_head); + + if (last->allocation_flags & HAS_KEY) { + flb_sds_destroy(last->key); } - /* check if the file exists */ - ret = stat(file, &st); - if (ret == 0) { - p = get_real_path(file, file_path, PATH_MAX + 1); - s->file = flb_sds_create(file); - s->root_path = flb_sds_create(file_path); + if (last->allocation_flags & HAS_KEYVALS) { + cfl_kvlist_destroy(last->keyvals); } - else if (errno == ENOENT && caller_file && s->caller_root_path != NULL) { - snprintf(file_path, PATH_MAX, "%s/%s", s->caller_root_path, file); - s->file = flb_sds_create(file_path); + + state_destroy(last); + + if (cfl_list_size(&ctx->states) <= 0) { + return NULL; + } + + return cfl_list_entry_last(&ctx->states, struct parser_state, _head); +} + +static void state_destroy(struct parser_state *s) +{ + flb_free(s); +} + +static struct parser_state *state_create(struct file_state *parent, struct file_state *file) +{ + struct parser_state *state; + + /* allocate context */ + state = flb_calloc(1, sizeof(struct parser_state)); + + if (!state) { + flb_errno(); + return NULL; } + + state->file = file; +#ifndef FLB_HAVE_STATIC_CONF + + if (parent) { + state->file->parent = parent; + } + +#else + + s->file->name = flb_sds_create("***static***"); + s->file->path = flb_sds_create("***static***"); + #endif - return s; + return state; } -static int read_config(struct flb_cf *cf, struct local_ctx *ctx, - char *caller_file, char *cfg_file) +static int read_config(struct flb_cf *conf, struct local_ctx *ctx, + struct file_state *parent, char *cfg_file) { int ret; int status; int code = 0; - char *file; struct parser_state *state; + flb_sds_t include_dir = NULL; + flb_sds_t include_file = NULL; yaml_parser_t parser; yaml_event_t event; FILE *fh; + struct file_state fstate; + + if (parent && cfg_file[0] != '/') { + + include_dir = flb_sds_create_size(strlen(cfg_file) + strlen(parent->path)); + + if (include_dir == NULL) { + flb_error("unable to create filename"); + return -1; + } + + if (flb_sds_printf(&include_dir, "%s/%s", parent->path, cfg_file) == NULL) { + flb_error("unable to create full filename"); + return -1; + } + + } + else { + + include_dir = flb_sds_create(cfg_file); + + if (include_dir == NULL) { + flb_error("unable to create filename"); + return -1; + } + } + + include_file = flb_sds_create(include_dir); + + if (include_file == NULL) { + flb_error("unable to create include filename"); + flb_sds_destroy(include_dir); + return -1; + } + + fstate.name = basename(include_dir); + fstate.path = dirname(include_dir); + + fstate.parent = parent; + + state = state_start(ctx, &fstate); - state = state_create(caller_file, cfg_file); if (!state) { + flb_error("unable to push initial include file state: %s", cfg_file); + flb_sds_destroy(include_dir); + flb_sds_destroy(include_file); return -1; } - file = state->file; /* check if this file has been included before */ - ret = is_file_included(ctx, file); + ret = is_file_included(ctx, include_file); + if (ret) { - flb_error("[config] file '%s' is already included", file); + flb_error("[config] file '%s' is already included", cfg_file); + flb_sds_destroy(include_dir); + flb_sds_destroy(include_file); state_destroy(state); return -1; } - fh = fopen(file, "r"); + flb_error("============ %s ============", cfg_file); + fh = fopen(include_file, "r"); + if (!fh) { flb_errno(); + flb_sds_destroy(include_dir); + flb_sds_destroy(include_file); state_destroy(state); return -1; } /* add file to the list of included files */ - ret = flb_slist_add(&ctx->includes, file); + ret = flb_slist_add(&ctx->includes, include_file); + if (ret == -1) { - flb_error("[config] could not register file %s", file); + flb_error("[config] could not register file %s", cfg_file); fclose(fh); + flb_sds_destroy(include_dir); + flb_sds_destroy(include_file); state_destroy(state); return -1; } @@ -1457,63 +2011,50 @@ static int read_config(struct flb_cf *cf, struct local_ctx *ctx, do { status = yaml_parser_parse(&parser, &event); + if (status == YAML_FAILURE) { - flb_error("[config] invalid YAML format in file %s", file); + flb_error("[config] invalid YAML format in file %s", cfg_file); code = -1; goto done; } - status = consume_event(cf, ctx, state, &event); + + status = consume_event(conf, ctx, &event); + if (status == YAML_FAILURE) { + flb_error("yaml error"); code = -1; goto done; } + yaml_event_delete(&event); + state = cfl_list_entry_last(&ctx->states, struct parser_state, _head); + } while (state->state != STATE_STOP); + flb_error("=============================="); done: + if (code == -1) { yaml_event_delete(&event); } yaml_parser_delete(&parser); - state_destroy(state); + state_pop(ctx); fclose(fh); ctx->level--; + flb_sds_destroy(include_file); + flb_sds_destroy(include_dir); + return code; } -static int local_init(struct local_ctx *ctx, char *file) +static int local_init(struct local_ctx *ctx) { - char *end; - char path[PATH_MAX + 1] = {0}; - /* reset the state */ memset(ctx, '\0', sizeof(struct local_ctx)); - -#ifndef FLB_HAVE_STATIC_CONF - char *p; - - if (file) { -#ifdef _MSC_VER - p = _fullpath(path, file, PATH_MAX + 1); -#else - p = realpath(file, path); -#endif - if (!p) { - return -1; - } - } -#endif - - /* lookup path ending and truncate */ - end = strrchr(path, '/'); - if (end) { - end++; - *end = '\0'; - } - + cfl_list_init(&ctx->states); ctx->level = 0; flb_slist_create(&ctx->includes); @@ -1525,34 +2066,37 @@ static void local_exit(struct local_ctx *ctx) flb_slist_destroy(&ctx->includes); } -struct flb_cf *flb_cf_yaml_create(struct flb_cf *cf, char *file_path, +struct flb_cf *flb_cf_yaml_create(struct flb_cf *conf, char *file_path, char *buf, size_t size) { int ret; struct local_ctx ctx; - if (!cf) { - cf = flb_cf_create(); - if (!cf) { + if (!conf) { + conf = flb_cf_create(); + + if (!conf) { return NULL; } } /* initialize the parser state */ - ret = local_init(&ctx, file_path); + ret = local_init(&ctx); + if (ret == -1) { - flb_cf_destroy(cf); + flb_cf_destroy(conf); return NULL; } /* process the entry poing config file */ - ret = read_config(cf, &ctx, NULL, file_path); + ret = read_config(conf, &ctx, NULL, file_path); + if (ret == -1) { - flb_cf_destroy(cf); + flb_cf_destroy(conf); local_exit(&ctx); return NULL; } local_exit(&ctx); - return cf; + return conf; } diff --git a/src/config_format/flb_config_format.c b/src/config_format/flb_config_format.c index af4f56b9d27..8ae8477475e 100644 --- a/src/config_format/flb_config_format.c +++ b/src/config_format/flb_config_format.c @@ -449,6 +449,10 @@ struct flb_cf_group *flb_cf_group_create(struct flb_cf *cf, struct flb_cf_sectio /* initialize lists */ g->properties = cfl_kvlist_create(); + if (g->properties == NULL) { + flb_free(g); + return NULL; + } /* determinate type by name */ if (len <= 0) { @@ -458,6 +462,7 @@ struct flb_cf_group *flb_cf_group_create(struct flb_cf *cf, struct flb_cf_sectio /* create a NULL terminated name */ g->name = flb_sds_create_len(name, len); if (!g->name) { + cfl_kvlist_destroy(g->properties); flb_free(g); return NULL; } diff --git a/src/flb_processor.c b/src/flb_processor.c index c1ece6f6de3..3a4f578e5e3 100644 --- a/src/flb_processor.c +++ b/src/flb_processor.c @@ -42,6 +42,7 @@ static int acquire_lock(pthread_mutex_t *lock, result = pthread_mutex_lock(lock); if (result != 0) { + if (result == EAGAIN) { retry_count++; @@ -75,6 +76,7 @@ static int release_lock(pthread_mutex_t *lock, result = pthread_mutex_unlock(lock); if (result != 0) { + if (result == EAGAIN) { retry_count++; @@ -110,10 +112,12 @@ struct flb_processor *flb_processor_create(struct flb_config *config, struct flb_processor *proc; proc = flb_calloc(1, sizeof(struct flb_processor)); + if (!proc) { flb_errno(); return NULL; } + proc->config = config; proc->is_active = FLB_FALSE; proc->data = source_plugin_instance; @@ -165,13 +169,16 @@ struct flb_processor_unit *flb_processor_unit_create(struct flb_processor *proc, /* allocate and initialize processor unit context */ pu = flb_calloc(1, sizeof(struct flb_processor_unit)); + if (!pu) { flb_errno(); return NULL; } + pu->parent = proc; pu->event_type = event_type; pu->name = flb_sds_create(unit_name); + if (!pu->name) { flb_free(pu); return NULL; @@ -191,6 +198,7 @@ struct flb_processor_unit *flb_processor_unit_create(struct flb_processor *proc, if (f) { /* create an instance of the filter */ f_ins = flb_filter_new(config, unit_name, NULL); + if (!f_ins) { pthread_mutex_destroy(&pu->lock); flb_sds_destroy(pu->name); @@ -264,15 +272,34 @@ struct flb_processor_unit *flb_processor_unit_create(struct flb_processor *proc, return pu; } -int flb_processor_unit_set_property(struct flb_processor_unit *pu, const char *k, const char *v) +int flb_processor_unit_set_property(struct flb_processor_unit *pu, const char *k, struct cfl_variant *v) { + struct cfl_variant *val; + int i; + int ret; + if (pu->unit_type == FLB_PROCESSOR_UNIT_FILTER) { - return flb_filter_set_property(pu->ctx, k, v); + + if (v->type == CFL_VARIANT_STRING) { + return flb_filter_set_property(pu->ctx, k, v->data.as_string); + } + else if (v->type == CFL_VARIANT_ARRAY) { + + for (i = 0; i < v->data.as_array->entry_count; i++) { + val = v->data.as_array->entries[i]; + ret = flb_filter_set_property(pu->ctx, k, val->data.as_string); + + if (ret == -1) { + return ret; + } + } + return 0; + } } return flb_processor_instance_set_property( (struct flb_processor_instance *) pu->ctx, - k, v); + k, v->data.as_string); } void flb_processor_unit_destroy(struct flb_processor_unit *pu) @@ -308,7 +335,8 @@ int flb_processor_unit_init(struct flb_processor_unit *pu) if (pu->unit_type == FLB_PROCESSOR_UNIT_FILTER) { ret = flb_filter_init(proc->config, pu->ctx); - if (ret == -1) { + + if (ret == -1) { flb_error("[processor] error initializing unit filter %s", pu->name); return -1; } @@ -343,7 +371,8 @@ int flb_processor_init(struct flb_processor *proc) mk_list_foreach(head, &proc->logs) { pu = mk_list_entry(head, struct flb_processor_unit, _head); ret = flb_processor_unit_init(pu); - if (ret == -1) { + + if (ret == -1) { return -1; } count++; @@ -352,7 +381,8 @@ int flb_processor_init(struct flb_processor *proc) mk_list_foreach(head, &proc->metrics) { pu = mk_list_entry(head, struct flb_processor_unit, _head); ret = flb_processor_unit_init(pu); - if (ret == -1) { + + if (ret == -1) { return -1; } count++; @@ -361,7 +391,8 @@ int flb_processor_init(struct flb_processor *proc) mk_list_foreach(head, &proc->traces) { pu = mk_list_entry(head, struct flb_processor_unit, _head); ret = flb_processor_unit_init(pu); - if (ret == -1) { + + if (ret == -1) { return -1; } count++; @@ -469,7 +500,8 @@ int flb_processor_run(struct flb_processor *proc, * */ if (ret == FLB_FILTER_MODIFIED) { - /* release intermediate buffer */ + + /* release intermediate buffer */ if (cur_buf != data) { flb_free(cur_buf); } @@ -582,7 +614,8 @@ int flb_processor_run(struct flb_processor *proc, } } else if (type == FLB_PROCESSOR_METRICS) { - if (p_ins->p->cb_process_metrics != NULL) { + + if (p_ins->p->cb_process_metrics != NULL) { ret = p_ins->p->cb_process_metrics(p_ins, (struct cmt *) cur_buf, tag, @@ -598,7 +631,8 @@ int flb_processor_run(struct flb_processor *proc, } } else if (type == FLB_PROCESSOR_TRACES) { - if (p_ins->p->cb_process_traces != NULL) { + + if (p_ins->p->cb_process_traces != NULL) { ret = p_ins->p->cb_process_traces(p_ins, (struct ctrace *) cur_buf, tag, @@ -680,7 +714,8 @@ static int load_from_config_format_group(struct flb_processor *proc, int type, s for (i = 0; i < array->entry_count; i++) { /* every entry in the array must be a map */ tmp = array->entries[i]; - if (tmp->type != CFL_VARIANT_KVLIST) { + + if (tmp->type != CFL_VARIANT_KVLIST) { return -1; } @@ -688,7 +723,8 @@ static int load_from_config_format_group(struct flb_processor *proc, int type, s /* get the processor name, this is a mandatory config field */ tmp = cfl_kvlist_fetch(kvlist, "name"); - if (!tmp) { + + if (!tmp) { flb_error("processor configuration don't have a 'name' defined"); return -1; } @@ -696,7 +732,8 @@ static int load_from_config_format_group(struct flb_processor *proc, int type, s /* create the processor unit and load all the properties */ name = tmp->data.as_string; pu = flb_processor_unit_create(proc, type, name); - if (!pu) { + + if (!pu) { flb_error("cannot create '%s' processor unit", name); return -1; } @@ -704,28 +741,29 @@ static int load_from_config_format_group(struct flb_processor *proc, int type, s /* iterate list of properties and set each one (skip name) */ cfl_list_foreach(head, &kvlist->list) { pair = cfl_list_entry(head, struct cfl_kvpair, _head); - if (strcmp(pair->key, "name") == 0) { + + if (strcmp(pair->key, "name") == 0) { continue; } - if (pair->val->type != CFL_VARIANT_STRING) { - continue; - } /* If filter plugin in processor unit has its own match rule, * we must release the pre-allocated '*' match at first. */ if (pu->unit_type == FLB_PROCESSOR_UNIT_FILTER) { - if (strcmp(pair->key, "match") == 0) { + + if (strcmp(pair->key, "match") == 0) { f_ins = (struct flb_filter_instance *)pu->ctx; - if (f_ins->match != NULL) { + + if (f_ins->match != NULL) { flb_sds_destroy(f_ins->match); f_ins->match = NULL; } } } - ret = flb_processor_unit_set_property(pu, pair->key, pair->val->data.as_string); - if (ret == -1) { + ret = flb_processor_unit_set_property(pu, pair->key, pair->val); + + if (ret == -1) { flb_error("cannot set property '%s' for processor '%s'", pair->key, name); return -1; } @@ -744,9 +782,11 @@ int flb_processors_load_from_config_format_group(struct flb_processor *proc, str /* logs */ val = cfl_kvlist_fetch(g->properties, "logs"); + if (val) { ret = load_from_config_format_group(proc, FLB_PROCESSOR_LOGS, val); - if (ret == -1) { + + if (ret == -1) { flb_error("failed to load 'logs' processors"); return -1; } @@ -754,9 +794,11 @@ int flb_processors_load_from_config_format_group(struct flb_processor *proc, str /* metrics */ val = cfl_kvlist_fetch(g->properties, "metrics"); + if (val) { ret = load_from_config_format_group(proc, FLB_PROCESSOR_METRICS, val); - if (ret == -1) { + + if (ret == -1) { flb_error("failed to load 'metrics' processors"); return -1; } @@ -766,7 +808,8 @@ int flb_processors_load_from_config_format_group(struct flb_processor *proc, str val = cfl_kvlist_fetch(g->properties, "traces"); if (val) { ret = load_from_config_format_group(proc, FLB_PROCESSOR_TRACES, val); - if (ret == -1) { + + if (ret == -1) { flb_error("failed to load 'traces' processors"); return -1; } @@ -805,6 +848,7 @@ int flb_processor_instance_set_property(struct flb_processor_instance *ins, len = strlen(k); tmp = flb_env_var_translate(ins->config->env, v); + if (!tmp) { return -1; } @@ -815,7 +859,8 @@ int flb_processor_instance_set_property(struct flb_processor_instance *ins, else if (prop_key_check("log_level", k, len) == 0 && tmp) { ret = flb_log_get_level_str(tmp); flb_sds_destroy(tmp); - if (ret == -1) { + + if (ret == -1) { return -1; } ins->log_level = ret; @@ -826,8 +871,10 @@ int flb_processor_instance_set_property(struct flb_processor_instance *ins, * map it directly to avoid an extra memory allocation. */ kv = flb_kv_item_create(&ins->properties, (char *) k, NULL); - if (!kv) { - if (tmp) { + + if (!kv) { + + if (tmp) { flb_sds_destroy(tmp); } return -1; @@ -860,6 +907,7 @@ struct flb_processor_instance *flb_processor_instance_create( mk_list_foreach(head, &config->processor_plugins) { plugin = mk_list_entry(head, struct flb_processor_plugin, _head); + if (strcasecmp(plugin->name, name) == 0) { break; } @@ -871,6 +919,7 @@ struct flb_processor_instance *flb_processor_instance_create( } instance = flb_calloc(1, sizeof(struct flb_filter_instance)); + if (!instance) { flb_errno(); return NULL; @@ -954,7 +1003,8 @@ int flb_processor_instance_check_properties( * instance in question. */ config_map = flb_config_map_create(config, p->config_map); - if (!config_map) { + + if (!config_map) { flb_error("[native processor] error loading config map for '%s' plugin", p->name); return -1; @@ -965,8 +1015,10 @@ int flb_processor_instance_check_properties( ret = flb_config_map_properties_check(ins->p->name, &ins->properties, ins->config_map); - if (ret == -1) { - if (config->program_name) { + + if (ret == -1) { + + if (config->program_name) { flb_helper("try the command: %s -F %s -h\n", config->program_name, ins->p->name); } diff --git a/tests/internal/config_format_yaml.c b/tests/internal/config_format_yaml.c index 126cb4f0899..dac39f7d24c 100644 --- a/tests/internal/config_format_yaml.c +++ b/tests/internal/config_format_yaml.c @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -10,8 +11,22 @@ #include "flb_tests_internal.h" -#define FLB_000 FLB_TESTS_DATA_PATH "/data/config_format/yaml/fluent-bit.yaml" -#define FLB_001 FLB_TESTS_DATA_PATH "/data/config_format/yaml/issue_7559.yaml" +#define FLB_TESTS_CONF_PATH FLB_TESTS_DATA_PATH "/data/config_format/yaml" +#define FLB_000 FLB_TESTS_CONF_PATH "/fluent-bit.yaml" +#define FLB_001 FLB_TESTS_CONF_PATH "/issue_7559.yaml" + +/* + * Configurations to test: + * * basic single input to single output + * * basic single input to single output with a filter + * * includes + * * slist + * * conf parsers + * * yaml parsers + * * customs + * * service + * * env + */ /* data/config_format/fluent-bit.yaml */ void test_basic() @@ -20,6 +35,8 @@ void test_basic() struct flb_cf *cf; struct flb_cf_section *s; struct flb_cf_group *g; + struct cfl_variant *v; + int idx = 0; cf = flb_cf_yaml_create(NULL, FLB_000, NULL, 0); TEST_CHECK(cf != NULL); @@ -45,6 +62,69 @@ void test_basic() TEST_CHECK(mk_list_size(&cf->outputs) == 2); TEST_CHECK(mk_list_size(&cf->others) == 1); + /* check inputs */ + idx = 0; + mk_list_foreach(head, &cf->inputs) { + s = mk_list_entry(head, struct flb_cf_section, _head_section); + switch (idx) { + case 0: + v = flb_cf_section_property_get(cf, s, "name"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "dummy") == 0); + break; + case 1: + v = flb_cf_section_property_get(cf, s, "name"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "tail") == 0); + v = flb_cf_section_property_get(cf, s, "path"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "./test.log") == 0); + break; + case 2: + v = flb_cf_section_property_get(cf, s, "name"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "tail") == 0); + v = flb_cf_section_property_get(cf, s, "path"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "./test.log") == 0); + break; + } + idx++; + } + + /* check outputs */ + idx = 0; + mk_list_foreach(head, &cf->outputs) { + s = mk_list_entry(head, struct flb_cf_section, _head_section); + switch (idx) { + case 0: + v = flb_cf_section_property_get(cf, s, "name"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "stdout") == 0); + break; + case 1: + v = flb_cf_section_property_get(cf, s, "name"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "stdout") == 0); + break; + } + idx++; + } + + /* check filters */ + idx = 0; + mk_list_foreach(head, &cf->filters) { + s = mk_list_entry(head, struct flb_cf_section, _head_section); + switch (idx) { + case 0: + v = flb_cf_section_property_get(cf, s, "name"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "record_modifier") == 0); + break; + } + idx++; + } + /* groups */ s = flb_cf_section_get_by_name(cf, "input"); TEST_CHECK(s != NULL); @@ -87,8 +167,114 @@ void test_customs_section() flb_cf_destroy(cf); } +void test_slist_even() +{ + struct flb_cf *cf; + struct flb_cf_section *s; + struct cfl_variant *v; + struct mk_list *head; + + cf = flb_cf_yaml_create(NULL, FLB_TESTS_CONF_PATH "/pipelines/slist/even.yaml", NULL, 0); + TEST_CHECK(cf != NULL); + if (!cf) { + exit(EXIT_FAILURE); + } + + /* Total number of inputs */ + if(!TEST_CHECK(mk_list_size(&cf->inputs) == 1)) { + TEST_MSG("Section number error. Got=%d expect=1", mk_list_size(&cf->inputs)); + } + + mk_list_foreach(head, &cf->inputs) { + s = mk_list_entry(head, struct flb_cf_section, _head_section); + TEST_CHECK(s != NULL); + + v = flb_cf_section_property_get(cf, s, "success_header"); + TEST_CHECK(v->type == CFL_VARIANT_ARRAY); + if (!TEST_CHECK(v->data.as_array->entry_count == 2)) { + TEST_MSG("Section number error. Got=%lud expect=2", v->data.as_array->entry_count); + } + } + + flb_cf_dump(cf); + flb_cf_destroy(cf); +} + +void test_slist_odd() +{ + struct flb_cf *cf; + struct flb_cf_section *s; + struct cfl_variant *v; + struct mk_list *head; + + cf = flb_cf_yaml_create(NULL, FLB_TESTS_CONF_PATH "/pipelines/slist/odd.yaml", NULL, 0); + TEST_CHECK(cf != NULL); + if (!cf) { + exit(EXIT_FAILURE); + } + + /* Total number of inputs */ + if(!TEST_CHECK(mk_list_size(&cf->inputs) == 1)) { + TEST_MSG("Section number error. Got=%d expect=1", mk_list_size(&cf->inputs)); + } + + mk_list_foreach(head, &cf->inputs) { + s = mk_list_entry(head, struct flb_cf_section, _head_section); + TEST_CHECK(s != NULL); + + v = flb_cf_section_property_get(cf, s, "success_header"); + TEST_CHECK(v->type == CFL_VARIANT_ARRAY); + if (!TEST_CHECK(v->data.as_array->entry_count == 3)) { + TEST_MSG("Section number error. Got=%lud expect=3", v->data.as_array->entry_count); + } + } + + flb_cf_dump(cf); + flb_cf_destroy(cf); +} + +void test_parser_conf() +{ + struct flb_cf *cf; + struct flb_config *config; + int ret; + int cnt; + + cf = flb_cf_yaml_create(NULL, FLB_TESTS_CONF_PATH "/parsers/parsers-conf.yaml", NULL, 0); + TEST_CHECK(cf != NULL); + if (!cf) { + exit(EXIT_FAILURE); + } + + config = flb_config_init(); + TEST_CHECK(config != NULL); + config->conf_path = flb_strdup(FLB_TESTS_CONF_PATH "/parsers/"); + + // count the parsers registered automatically by fluent-bit + cnt = mk_list_size(&config->parsers); + // load the parsers from the configuration + ret = flb_config_load_config_format(config, cf); + if (ret != 0) { + exit(EXIT_FAILURE);; + } + + /* Total number of inputs */ + if(!TEST_CHECK(mk_list_size(&config->parsers) == cnt+1)) { + TEST_MSG("Section number error. Got=%d expect=%d", + mk_list_size(&config->parsers), + cnt+1); + } + + flb_cf_dump(cf); + flb_cf_destroy(cf); + flb_config_exit(config); +} + TEST_LIST = { { "basic" , test_basic}, { "customs section", test_customs_section}, + { "slist odd", test_slist_odd}, + { "slist even", test_slist_even}, + { "parsers file conf", test_parser_conf}, { 0 } }; diff --git a/tests/internal/data/config_format/yaml/fluent-bit.yaml b/tests/internal/data/config_format/yaml/fluent-bit.yaml index eee5a907629..49894552fc6 100644 --- a/tests/internal/data/config_format/yaml/fluent-bit.yaml +++ b/tests/internal/data/config_format/yaml/fluent-bit.yaml @@ -5,25 +5,25 @@ includes: - service.yaml customs: - - ${observability}: - api_key: zyJUb2tlbklEItoiY2ZlMTcx + - name: ${observability} + api_key: zyJUb2tlbklEItoiY2ZlMTcx pipeline: inputs: - - tail: - path: ./test.log - parser: json - read_from_head: true - - tail: - path: ./test.log - parser: json - read_from_head: true + - name: tail + path: ./test.log + parser: json + read_from_head: true + - name: tail + path: ./test.log + parser: json + read_from_head: true filters: - - record_modifier: - match: "*" - record: powered_by calyptia + - name: record_modifier + match: "*" + record: powered_by calyptia outputs: - - stdout: - match: "*" + - name: stdout + match: "*" diff --git a/tests/internal/data/config_format/yaml/parsers/parsers-conf.yaml b/tests/internal/data/config_format/yaml/parsers/parsers-conf.yaml new file mode 100644 index 00000000000..6421a8a1f85 --- /dev/null +++ b/tests/internal/data/config_format/yaml/parsers/parsers-conf.yaml @@ -0,0 +1,3 @@ +--- +service: + parsers_file: parsers.conf diff --git a/tests/internal/data/config_format/yaml/parsers/parsers.conf b/tests/internal/data/config_format/yaml/parsers/parsers.conf new file mode 100644 index 00000000000..9f3b6b33102 --- /dev/null +++ b/tests/internal/data/config_format/yaml/parsers/parsers.conf @@ -0,0 +1,6 @@ +[PARSER] + Name docker + Format json + Time_Key time + Time_Format %Y-%m-%dT%H:%M:%S.%L + Time_Keep On diff --git a/tests/internal/data/config_format/yaml/pipelines/slist/even.yaml b/tests/internal/data/config_format/yaml/pipelines/slist/even.yaml new file mode 100644 index 00000000000..5d5b7c46f3e --- /dev/null +++ b/tests/internal/data/config_format/yaml/pipelines/slist/even.yaml @@ -0,0 +1,7 @@ +--- +pipeline: + inputs: + - name: http + success_header: + - foo bar + - bar foo diff --git a/tests/internal/data/config_format/yaml/pipelines/slist/odd.yaml b/tests/internal/data/config_format/yaml/pipelines/slist/odd.yaml new file mode 100644 index 00000000000..a7d2058c9a5 --- /dev/null +++ b/tests/internal/data/config_format/yaml/pipelines/slist/odd.yaml @@ -0,0 +1,8 @@ +--- +pipeline: + inputs: + - name: http + success_header: + - foo bar + - bar foo + - foobar barfoo diff --git a/tests/internal/processor.c b/tests/internal/processor.c index 9a8952052ea..679fd131b8d 100644 --- a/tests/internal/processor.c +++ b/tests/internal/processor.c @@ -67,11 +67,20 @@ static void processor() size_t mp_size; void *out_buf = NULL; size_t out_size; + flb_sds_t hostname_prop_key; + struct cfl_variant var = { + .type = CFL_VARIANT_STRING, + .data.as_string = NULL, + }; printf("\n\n"); flb_init_env(); + hostname_prop_key = flb_sds_create("hostname monox"); + TEST_CHECK(hostname_prop_key != NULL); + var.data.as_string = hostname_prop_key; + config = flb_config_init(); TEST_CHECK(config != NULL); @@ -84,7 +93,7 @@ static void processor() pu = flb_processor_unit_create(proc, FLB_PROCESSOR_LOGS, "modify"); TEST_CHECK(pu != NULL); - ret = flb_processor_unit_set_property(pu, "add", "hostname monox"); + ret = flb_processor_unit_set_property(pu, "add", &var); TEST_CHECK(ret == 0); pu = flb_processor_unit_create(proc, FLB_PROCESSOR_LOGS, "stdout"); @@ -107,6 +116,7 @@ static void processor() flb_processor_destroy(proc); flb_config_exit(config); + flb_sds_destroy(hostname_prop_key); } TEST_LIST = {