Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter_parser: Add parameter to nest parsed fields under #9828

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions plugins/filter_parser/filter_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,53 @@ static int delete_parsers(struct filter_parser_ctx *ctx)
return c;
}

static int nest_raw_map(struct filter_parser_ctx *ctx,
char **buf,
size_t *size,
const flb_sds_t key)
{
msgpack_sbuffer sbuf;
msgpack_packer pk;
msgpack_unpacked outbuf_result;
msgpack_object obj;
msgpack_object_kv *kv;
const size_t key_len = flb_sds_len(key);
int ret = 0;

msgpack_sbuffer_init(&sbuf);
msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);

msgpack_unpacked_init(&outbuf_result);
ret = msgpack_unpack_next(&outbuf_result, *buf, *size, NULL);
if (ret != MSGPACK_UNPACK_SUCCESS) {
flb_plg_error(ctx->ins,
"Nest: failed to unpack msgpack data with error code %d",
ret);
msgpack_unpacked_destroy(&outbuf_result);
return -1;
}

/* Create a new map, unpacking map `buf` under the new `key` root key */
obj = outbuf_result.data;
if (obj.type == MSGPACK_OBJECT_MAP) {
msgpack_pack_map(&pk, 1);
msgpack_pack_str(&pk, key_len);
msgpack_pack_str_body(&pk, key, key_len);
msgpack_pack_map(&pk, obj.via.map.size);
for (unsigned x = 0; x < obj.via.map.size; ++x) {
kv = &obj.via.map.ptr[x];
msgpack_pack_object(&pk, kv->key);
msgpack_pack_object(&pk, kv->val);
}
flb_free(*buf);
*buf = sbuf.data;
*size = sbuf.size;
}

msgpack_unpacked_destroy(&outbuf_result);
return 0;
}

static int configure(struct filter_parser_ctx *ctx,
struct flb_filter_instance *f_ins,
struct flb_config *config)
Expand Down Expand Up @@ -301,6 +348,9 @@ static int cb_parser_filter(const void *data, size_t bytes,
}

if (out_buf != NULL && parse_ret >= 0) {
if (ctx->nest_under) {
nest_raw_map(ctx, &out_buf, &out_size, ctx->nest_under);
}
if (append_arr != NULL && append_arr_len > 0) {
char *new_buf = NULL;
int new_size;
Expand Down Expand Up @@ -440,6 +490,11 @@ static struct flb_config_map config_map[] = {
"Keep all other original fields in the parsed result. "
"If false, all other original fields will be removed."
},
{
FLB_CONFIG_MAP_STR, "Nest_Under", NULL,
0, FLB_TRUE, offsetof(struct filter_parser_ctx, nest_under),
"Specify field name to nest parsed records under."
},
{
FLB_CONFIG_MAP_DEPRECATED, "Unescape_key", NULL,
0, FLB_FALSE, 0,
Expand Down
1 change: 1 addition & 0 deletions plugins/filter_parser/filter_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct filter_parser_ctx {
int key_name_len;
int reserve_data;
int preserve_key;
flb_sds_t nest_under;
struct mk_list parsers;
struct flb_filter_instance *ins;
};
Expand Down
78 changes: 78 additions & 0 deletions tests/runtime/filter_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,83 @@ void flb_test_filter_parser_reserve_on_preserve_on()
test_ctx_destroy(ctx);
}

void flb_test_filter_parser_nest_under_on()
{
int ret;
int bytes;
char *p, *output, *expected;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;
int filter_ffd;
struct flb_parser *parser;

struct flb_lib_out_cb cb;
cb.cb = callback_test;
cb.data = NULL;

clear_output();

ctx = flb_create();

/* Configure service */
flb_service_set(ctx, "Flush", FLUSH_INTERVAL, "Grace" "1", "Log_Level", "debug", NULL);

/* Input */
in_ffd = flb_input(ctx, (char *) "lib", NULL);
TEST_CHECK(in_ffd >= 0);
flb_input_set(ctx, in_ffd,
"Tag", "test",
NULL);

/* Parser */
parser = flb_parser_create("json", "json", NULL,
FLB_FALSE,
NULL, NULL, NULL, MK_FALSE, MK_TRUE, FLB_FALSE, FLB_FALSE,
NULL, 0, NULL, ctx->config);
TEST_CHECK(parser != NULL);

/* Filter */
filter_ffd = flb_filter(ctx, (char *) "parser", NULL);
TEST_CHECK(filter_ffd >= 0);
ret = flb_filter_set(ctx, filter_ffd,
"Match", "test",
"Key_Name", "to_parse",
"Nest_Under", "nest_key",
"Parser", "json",
NULL);
TEST_CHECK(ret == 0);

/* Output */
out_ffd = flb_output(ctx, (char *) "lib", &cb);
TEST_CHECK(out_ffd >= 0);
flb_output_set(ctx, out_ffd,
"Match", "*",
"format", "json",
NULL);

/* Start the engine */
ret = flb_start(ctx);
TEST_CHECK(ret == 0);

/* Ingest data */
p = "[1,{\"hello\":\"world\",\"some_object\":{\"foo\":\"bar\"},\"to_parse\":\"{\\\"key\\\":\\\"value\\\",\\\"object\\\":{\\\"a\\\":\\\"b\\\"}}\"}]";
bytes = flb_lib_push(ctx, in_ffd, p, strlen(p));
TEST_CHECK(bytes == strlen(p));

wait_with_timeout(1500, &output); /* waiting flush and ensuring data flush */
TEST_CHECK_(output != NULL, "Expected output to not be NULL");
if (output != NULL) {
/* check extra data was not preserved */
expected = "{\"nest_key\":{\"key\":\"value\",\"object\":{\"a\":\"b\"}}}";
TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain key one , got '%s'", output);
free(output);
}

flb_stop(ctx);
flb_destroy(ctx);
}

TEST_LIST = {
{"filter_parser_extract_fields", flb_test_filter_parser_extract_fields },
{"filter_parser_reserve_data_off", flb_test_filter_parser_reserve_data_off },
Expand All @@ -1313,6 +1390,7 @@ TEST_LIST = {
{"filter_parser_reserve_off_preserve_on", flb_test_filter_parser_reserve_off_preserve_on},
{"filter_parser_reserve_on_preserve_off", flb_test_filter_parser_reserve_on_preserve_off},
{"filter_parser_reserve_on_preserve_on", flb_test_filter_parser_reserve_on_preserve_on},
{"filter_parser_nest_under_on", flb_test_filter_parser_nest_under_on},
{NULL, NULL}
};