From dedc5402ef95bd5368a407ac7ff0e7ab3d3fa5f2 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 3 Jan 2023 17:55:19 -0800 Subject: [PATCH] filter_ecs: all fixes from December 2022 in these PRs: https://github.com/fluent/fluent-bit/pull/6614 1.9 based branch Signed-off-by: Wesley Pettit --- plugins/filter_ecs/ecs.c | 191 +++++++++++++++++++++++++++---------- plugins/filter_ecs/ecs.h | 22 +++-- tests/runtime/filter_ecs.c | 168 +++++++++++++++++++++++++++++++- 3 files changed, 321 insertions(+), 60 deletions(-) diff --git a/plugins/filter_ecs/ecs.c b/plugins/filter_ecs/ecs.c index ff7874cba05..19438dbfa54 100644 --- a/plugins/filter_ecs/ecs.c +++ b/plugins/filter_ecs/ecs.c @@ -315,7 +315,8 @@ static flb_sds_t parse_id_from_arn(const char *arn, int len) /* * This deserializes the msgpack metadata buf to msgpack_object - * which can be used with flb_ra_translate in the main filter callback + * and then uses that msgpack_object to construct a list of key value pairs + * ready to attach to log records */ static int flb_ecs_metadata_buffer_init(struct flb_filter_ecs *ctx, struct flb_ecs_metadata_buffer *meta) @@ -324,6 +325,11 @@ static int flb_ecs_metadata_buffer_init(struct flb_filter_ecs *ctx, msgpack_object root; size_t off = 0; int ret; + struct flb_ecs_metadata_key *metadata_key = NULL; + flb_sds_t val = NULL; + struct mk_list *tmp; + struct mk_list *head; + struct flb_ecs_metadata_keypair *keypair = NULL; msgpack_unpacked_init(&result); ret = msgpack_unpack_next(&result, meta->buf, meta->size, &off); @@ -341,24 +347,73 @@ static int flb_ecs_metadata_buffer_init(struct flb_filter_ecs *ctx, return -1; } - meta->unpacked = result; - meta->obj = root; + /* create list of metadata key value pairs */ + mk_list_init(&meta->metadata_keypairs); + meta->keypairs_len = 0; + meta->last_used_time = time(NULL); - meta->free_packer = FLB_TRUE; + + mk_list_foreach_safe(head, tmp, &ctx->metadata_keys) { + metadata_key = mk_list_entry(head, struct flb_ecs_metadata_key, _head); + val = flb_ra_translate(metadata_key->ra, NULL, 0, + root, NULL); + if (!val) { + flb_plg_info(ctx->ins, "Translation failed for %s : %s for container ID: %s", + metadata_key->key, metadata_key->template, meta->id); + /* keep trying other keys*/ + continue; + } + + if (flb_sds_len(val) == 0) { + flb_plg_info(ctx->ins, "Translation failed for %s : %s for container ID: %s", + metadata_key->key, metadata_key->template, meta->id); + flb_sds_destroy(val); + /* keep trying other keys*/ + continue; + } + + keypair = flb_calloc(1, sizeof(struct flb_ecs_metadata_keypair)); + if (!keypair) { + flb_errno(); + msgpack_unpacked_destroy(&result); + flb_sds_destroy(val); + return -1; + } + + keypair->key = metadata_key->key; + keypair->val = val; + + mk_list_add(&keypair->_head, &meta->metadata_keypairs); + meta->keypairs_len += 1; + } + + msgpack_unpacked_destroy(&result); return 0; } static void flb_ecs_metadata_buffer_destroy(struct flb_ecs_metadata_buffer *meta) { + struct flb_ecs_metadata_keypair *keypair = NULL; + struct mk_list *tmp; + struct mk_list *head; + if (meta) { flb_free(meta->buf); - if (meta->free_packer == FLB_TRUE) { - msgpack_unpacked_destroy(&meta->unpacked); - } if (meta->id) { flb_sds_destroy(meta->id); } + if (mk_list_is_set( &meta->metadata_keypairs) == 0) { + mk_list_foreach_safe(head, tmp, &meta->metadata_keypairs) { + keypair = mk_list_entry(head, struct flb_ecs_metadata_keypair, _head); + /* only need to free val. key is ref to flb_ecs_metadata_key.key*/ + if (keypair->val) { + flb_sds_destroy(keypair->val); + } + mk_list_del(&keypair->_head); + flb_free(keypair); + } + } flb_free(meta); } } @@ -1325,7 +1380,7 @@ static int get_metadata_by_id(struct flb_filter_ecs *ctx, /* get metadata for this container */ ret = flb_hash_get(ctx->container_hash_table, container_short_id, flb_sds_len(container_short_id), - (void *) metadata_buffer, &size); + (void **) metadata_buffer, &size); if (ret == -1) { /* try fetch metadata */ @@ -1339,7 +1394,7 @@ static int get_metadata_by_id(struct flb_filter_ecs *ctx, /* get from hash table */ ret = flb_hash_get(ctx->container_hash_table, container_short_id, flb_sds_len(container_short_id), - (void *) metadata_buffer, &size); + (void **) metadata_buffer, &size); } flb_sds_destroy(container_short_id); @@ -1369,14 +1424,14 @@ static int is_tag_marked_failed(struct flb_filter_ecs *ctx, const char *tag, int tag_len) { int ret; - int val = 0; + int *val = NULL; size_t val_size; ret = flb_hash_get(ctx->failed_metadata_request_tags, tag, tag_len, - (void *) &val, &val_size); + (void **) &val, &val_size); if (ret != -1) { - if (val >= FLB_ECS_FILTER_METADATA_RETRIES) { + if (*val > ctx->agent_endpoint_retries) { return FLB_TRUE; } } @@ -1389,11 +1444,12 @@ static void mark_tag_failed(struct flb_filter_ecs *ctx, { int ret; int *val = NULL; + int *new_val = NULL; size_t val_size; ret = flb_hash_get(ctx->failed_metadata_request_tags, tag, tag_len, - (void *) val, &val_size); + (void **) &val, &val_size); if (ret == -1) { /* hash table copies memory to new heap block */ @@ -1409,16 +1465,29 @@ static void mark_tag_failed(struct flb_filter_ecs *ctx, /* hash table will contain a copy */ flb_free(val); } else { + /* + * val is memory returned from hash table + * if we simply update the value here and call flb_hash_add + * it first frees the old memory (which is what we passed it) + * then tries to copy over the memory we passed in to a new location + * flb_hash stores all entries as if they were strings, so we also + * can't simply increment the value returned by flb_hash_get + */ + new_val = flb_malloc(sizeof(int)); + if (!new_val) { + flb_errno(); + return; + } /* increment number of failed metadata requests for this tag */ - *val = *val + 1; + *new_val = *val + 1; flb_hash_add(ctx->failed_metadata_request_tags, tag, tag_len, - val, sizeof(int)); + new_val, sizeof(int)); flb_plg_info(ctx->ins, "Failed to get ECS Metadata for tag %s %d times. " "This might be because the logs for this tag do not come from an ECS Task Container. " "This plugin will retry metadata requests at most %d times total for this tag.", - tag, *val, FLB_ECS_FILTER_METADATA_RETRIES); - + tag, *new_val, ctx->agent_endpoint_retries); + flb_free(new_val); } } @@ -1448,9 +1517,8 @@ static int cb_ecs_filter(const void *data, size_t bytes, msgpack_object_kv *kv; struct mk_list *tmp; struct mk_list *head; - struct flb_ecs_metadata_key *metadata_key; - struct flb_ecs_metadata_buffer *metadata_buffer; - flb_sds_t val; + struct flb_ecs_metadata_keypair *keypair = NULL; + struct flb_ecs_metadata_buffer *metadata_buffer = NULL; /* First check that the static cluster metadata has been retrieved */ if (ctx->has_cluster_metadata == FLB_FALSE) { @@ -1467,7 +1535,7 @@ static int cb_ecs_filter(const void *data, size_t bytes, if (check == FLB_TRUE) { flb_plg_debug(ctx->ins, "Failed to get ECS Metadata for tag %s %d times. " "Will not attempt to retry the metadata request. Will attach cluster metadata only.", - tag, FLB_ECS_FILTER_METADATA_RETRIES); + tag, ctx->agent_endpoint_retries); } if (check == FLB_FALSE && ctx->cluster_metadata_only == FLB_FALSE) { @@ -1521,7 +1589,7 @@ static int cb_ecs_filter(const void *data, size_t bytes, flb_time_append_to_msgpack(&tm, &tmp_pck, 0); /* new record map size is old size + the new keys we will add */ - total_records = obj->via.map.size + ctx->metadata_keys_len; + total_records = obj->via.map.size + metadata_buffer->keypairs_len; msgpack_pack_map(&tmp_pck, total_records); /* iterate through the old record map and add it to the new buffer */ @@ -1532,28 +1600,21 @@ static int cb_ecs_filter(const void *data, size_t bytes, } /* append new keys */ - mk_list_foreach_safe(head, tmp, &ctx->metadata_keys) { - metadata_key = mk_list_entry(head, struct flb_ecs_metadata_key, _head); - val = flb_ra_translate(metadata_key->ra, NULL, 0, - metadata_buffer->obj, NULL); - if (!val) { - flb_plg_info(ctx->ins, "Translation failed for %s : %s", - metadata_key->key, metadata_key->template); - msgpack_unpacked_destroy(&result); - msgpack_sbuffer_destroy(&tmp_sbuf); - return FLB_FILTER_NOTOUCH; + if (metadata_buffer->keypairs_len > 0) { + mk_list_foreach_safe(head, tmp, &metadata_buffer->metadata_keypairs) { + keypair = mk_list_entry(head, struct flb_ecs_metadata_keypair, _head); + + len = flb_sds_len(keypair->key); + msgpack_pack_str(&tmp_pck, len); + msgpack_pack_str_body(&tmp_pck, + keypair->key, + len); + len = flb_sds_len(keypair->val); + msgpack_pack_str(&tmp_pck, len); + msgpack_pack_str_body(&tmp_pck, + keypair->val, + len); } - len = flb_sds_len(metadata_key->key); - msgpack_pack_str(&tmp_pck, len); - msgpack_pack_str_body(&tmp_pck, - metadata_key->key, - len); - len = flb_sds_len(val); - msgpack_pack_str(&tmp_pck, len); - msgpack_pack_str_body(&tmp_pck, - val, - len); - flb_sds_destroy(val); } } msgpack_unpacked_destroy(&result); @@ -1590,6 +1651,7 @@ static void flb_filter_ecs_destroy(struct flb_filter_ecs *ctx) struct mk_list *head; struct flb_ecs_metadata_key *metadata_key; struct flb_ecs_metadata_buffer *buf; + struct flb_ecs_metadata_keypair *keypair = NULL; if (ctx) { if (ctx->ecs_upstream) { @@ -1609,18 +1671,32 @@ static void flb_filter_ecs_destroy(struct flb_filter_ecs *ctx) } if (ctx->cluster_meta_buf.buf) { flb_free(ctx->cluster_meta_buf.buf); - msgpack_unpacked_destroy(&ctx->cluster_meta_buf.unpacked); + if (mk_list_is_set(&ctx->cluster_meta_buf.metadata_keypairs) == 0) { + mk_list_foreach_safe(head, tmp, &ctx->cluster_meta_buf.metadata_keypairs) { + keypair = mk_list_entry(head, struct flb_ecs_metadata_keypair, _head); + /* only need to free val. key is ref to flb_ecs_metadata_key.key*/ + if (keypair->val) { + flb_sds_destroy(keypair->val); + } + mk_list_del(&keypair->_head); + flb_free(keypair); + } + } } - mk_list_foreach_safe(head, tmp, &ctx->metadata_keys) { - metadata_key = mk_list_entry(head, struct flb_ecs_metadata_key, _head); - mk_list_del(&metadata_key->_head); - flb_ecs_metadata_key_destroy(metadata_key); + if (mk_list_is_set(&ctx->metadata_keys) == 0) { + mk_list_foreach_safe(head, tmp, &ctx->metadata_keys) { + metadata_key = mk_list_entry(head, struct flb_ecs_metadata_key, _head); + mk_list_del(&metadata_key->_head); + flb_ecs_metadata_key_destroy(metadata_key); + } } - mk_list_foreach_safe(head, tmp, &ctx->metadata_buffers) { - buf = mk_list_entry(head, struct flb_ecs_metadata_buffer, _head); - mk_list_del(&buf->_head); - flb_hash_del(ctx->container_hash_table, buf->id); - flb_ecs_metadata_buffer_destroy(buf); + if (mk_list_is_set(&ctx->metadata_buffers) == 0) { + mk_list_foreach_safe(head, tmp, &ctx->metadata_buffers) { + buf = mk_list_entry(head, struct flb_ecs_metadata_buffer, _head); + mk_list_del(&buf->_head); + flb_hash_del(ctx->container_hash_table, buf->id); + flb_ecs_metadata_buffer_destroy(buf); + } } if (ctx->container_hash_table) { flb_hash_destroy(ctx->container_hash_table); @@ -1694,6 +1770,15 @@ static struct flb_config_map config_map[] = { "Defaults to 51678" }, + { + FLB_CONFIG_MAP_INT, "agent_endpoint_retries", FLB_ECS_FILTER_METADATA_RETRIES, + 0, FLB_TRUE, offsetof(struct flb_filter_ecs, agent_endpoint_retries), + "Number of retries for failed metadata requests to ECS Agent Introspection " + "endpoint. The most common cause of failed metadata requests is that the " + "container the metadata request was made for is not part of an ECS Task. " + "Check if you have non-task containers and docker dual logging enabled." + }, + {0} }; diff --git a/plugins/filter_ecs/ecs.h b/plugins/filter_ecs/ecs.h index e6bc0eba796..52438a0853c 100644 --- a/plugins/filter_ecs/ecs.h +++ b/plugins/filter_ecs/ecs.h @@ -30,7 +30,7 @@ #define FLB_ECS_FILTER_PORT "51678" #define FLB_ECS_FILTER_CLUSTER_PATH "/v1/metadata" #define FLB_ECS_FILTER_TASK_PATH_FORMAT "/v1/tasks?dockerid=%s" -#define FLB_ECS_FILTER_METADATA_RETRIES 2 +#define FLB_ECS_FILTER_METADATA_RETRIES "3" /* * Kubernetes recommends not running more than 110 pods per node @@ -49,21 +49,29 @@ struct flb_ecs_metadata_key { struct mk_list _head; }; +/* metadata processed into KV pair ready to add to log record */ +struct flb_ecs_metadata_keypair { + /* key is just a reference to the flb_ecs_metadata_key.key */ + flb_sds_t key; + flb_sds_t val; + + struct mk_list _head; +}; + struct flb_ecs_metadata_buffer { /* msgpack_sbuffer */ char *buf; size_t size; - /* unpacked object to use with flb_ra_translate */ - msgpack_unpacked unpacked; - msgpack_object obj; - int free_packer; - /* the hash table only stores a pointer- we need the list to track and free these */ struct mk_list _head; /* we clean up the memory for these once ecs_meta_cache_ttl has expired */ time_t last_used_time; + /* List of processed metadata keys and values */ + struct mk_list metadata_keypairs; + int keypairs_len; + /* * To remove from the hash table on TTL expiration, we need the ID * While we use a TTL hash, it won't clean up the memory, so we have a separate routine for that @@ -109,6 +117,8 @@ struct flb_filter_ecs { flb_sds_t ecs_host; int ecs_port; + int agent_endpoint_retries; + /* * This field is used when we build new container metadata objects */ diff --git a/tests/runtime/filter_ecs.c b/tests/runtime/filter_ecs.c index e0ba33a30b2..15c95d3b44a 100644 --- a/tests/runtime/filter_ecs.c +++ b/tests/runtime/filter_ecs.c @@ -15,6 +15,7 @@ struct filter_test { struct filter_test_result { char *expected_pattern; /* string that must occur in output */ + char *fail_pattern; /* if this pattern is found in the result, fail test */ int expected_pattern_index; /* which record to check for the pattern */ int expected_records; /* expected number of outputted records */ int actual_records; /* actual number of outputted records */ @@ -47,6 +48,15 @@ static int cb_check_result(void *record, size_t size, void *data) expected->actual_records++; + if (expected->fail_pattern != NULL) { + p = strstr(result, expected->fail_pattern); + TEST_CHECK(p == NULL); + if (p) { + flb_error("Should not find: '%s' in result '%s'", + expected->fail_pattern, result); + } + } + flb_free(record); return 0; } @@ -158,6 +168,85 @@ static void flb_test_ecs_filter() sleep(2); TEST_CHECK(expected.actual_records == expected.expected_records); filter_test_destroy(ctx); + + /* unset env */ + setenv("FLB_ECS_PLUGIN_UNDER_TEST", "", 1); +} + +/* + * First release of ECS filter could crash + * when saving that it faild to get metadata for a tag + */ +static void flb_test_ecs_filter_mark_tag_failed() +{ + int len; + int ret; + int bytes; + char *p; + struct flb_lib_out_cb cb_data; + struct filter_test *ctx; + struct filter_test_result expected = { 0 }; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_ECS_PLUGIN_UNDER_TEST", "true", 1); + setenv("TEST_TASK_ERROR", ERROR_RESPONSE, 1); + + /* Create test context */ + ctx = filter_test_create((void *) &cb_data, "testprefix-79c796ed2a7f"); + if (!ctx) { + exit(EXIT_FAILURE); + } + + /* Configure filter */ + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "ecs_tag_prefix", "testprefix-", + "ADD", "resource $ClusterName.$TaskID.$ECSContainerName", + NULL); + TEST_CHECK(ret == 0); + + /* Prepare output callback with expected result */ + expected.expected_records = 4; /* 4 records with no metadata */ + expected.expected_pattern = ""; + expected.expected_pattern_index = 0; + cb_data.cb = cb_check_result; + cb_data.data = (void *) &expected; + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Ingest data samples */ + p = "[0, {\"log\":\"error: my error\"}]"; + len = strlen(p); + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, p, len); + TEST_CHECK(bytes == len); + sleep(1); + + p = "[0, {\"log\":\"error: my error\"}]"; + len = strlen(p); + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, p, len); + TEST_CHECK(bytes == len); + sleep(1); + + p = "[0, {\"log\":\"error: my error\"}]"; + len = strlen(p); + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, p, len); + TEST_CHECK(bytes == len); + sleep(1); + + p = "[0, {\"log\":\"error: my error\"}]"; + len = strlen(p); + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, p, len); + TEST_CHECK(bytes == len); + sleep(2); + + /* check number of outputted records */ + TEST_CHECK(expected.actual_records == expected.expected_records); + filter_test_destroy(ctx); + + /* unset env */ + setenv("FLB_ECS_PLUGIN_UNDER_TEST", "", 1); + setenv("TEST_TASK_ERROR", "", 1); } static void flb_test_ecs_filter_no_prefix() @@ -207,6 +296,9 @@ static void flb_test_ecs_filter_no_prefix() sleep(2); TEST_CHECK(expected.actual_records == expected.expected_records); filter_test_destroy(ctx); + + /* unset env */ + setenv("FLB_ECS_PLUGIN_UNDER_TEST", "", 1); } static void flb_test_ecs_filter_cluster_metadata_only() @@ -258,6 +350,9 @@ static void flb_test_ecs_filter_cluster_metadata_only() sleep(2); TEST_CHECK(expected.actual_records == expected.expected_records); filter_test_destroy(ctx); + + /* unset env */ + setenv("FLB_ECS_PLUGIN_UNDER_TEST", "", 1); } static void flb_test_ecs_filter_cluster_error() @@ -308,6 +403,10 @@ static void flb_test_ecs_filter_cluster_error() sleep(2); TEST_CHECK(expected.actual_records == expected.expected_records); filter_test_destroy(ctx); + + /* unset env */ + setenv("FLB_ECS_PLUGIN_UNDER_TEST", "", 1); + setenv("TEST_CLUSTER_ERROR", "", 1); } static void flb_test_ecs_filter_task_error() @@ -358,10 +457,77 @@ static void flb_test_ecs_filter_task_error() sleep(2); TEST_CHECK(expected.actual_records == expected.expected_records); filter_test_destroy(ctx); + + /* unset env */ + setenv("FLB_ECS_PLUGIN_UNDER_TEST", "", 1); + setenv("TEST_TASK_ERROR", "", 1); } -TEST_LIST = { +/* + * First release of ECS filter would attach empty values for metadata which could not be fetched + * This checks the case where task metadata fails to fetch and that the filter thus + * does not attach the configured `task` key. + */ +static void flb_test_ecs_filter_cluster_meta_nonempty() +{ + int len; + int ret; + int bytes; + char *p; + struct flb_lib_out_cb cb_data; + struct filter_test *ctx; + struct filter_test_result expected = { 0 }; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_ECS_PLUGIN_UNDER_TEST", "true", 1); + setenv("TEST_TASK_ERROR", ERROR_RESPONSE, 1); + + /* Create test context */ + ctx = filter_test_create((void *) &cb_data, "79c796ed2a7f"); + if (!ctx) { + exit(EXIT_FAILURE); + } + + /* Configure filter */ + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "ecs_tag_prefix", "", + "ADD", "cluster $ClusterName", + "ADD", "task $TaskID", + "ADD", "container $ECSContainerName", + NULL); + TEST_CHECK(ret == 0); + + /* this test is mainly for leak checking on error, not for checking result record */ + expected.expected_records = 1; /* 1 record */ + expected.expected_pattern = "cluster"; /* cluster key should be added */ + expected.expected_pattern_index = 0; + expected.fail_pattern = "task"; /* task key should not be attached */ + cb_data.cb = cb_check_result; + cb_data.data = (void *) &expected; + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Ingest data samples */ + p = "[0, {\"log\":\"error: my error\"}]"; + len = strlen(p); + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, p, len); + TEST_CHECK(bytes == len); + /* check number of outputted records */ + sleep(2); + TEST_CHECK(expected.actual_records == expected.expected_records); + filter_test_destroy(ctx); + + /* unset env */ + setenv("FLB_ECS_PLUGIN_UNDER_TEST", "", 1); + setenv("TEST_TASK_ERROR", "", 1); +} + +TEST_LIST = { + {"flb_test_ecs_filter_cluster_meta_nonempty" , flb_test_ecs_filter_cluster_meta_nonempty }, + {"flb_test_ecs_filter_mark_tag_failed" , flb_test_ecs_filter_mark_tag_failed }, {"flb_test_ecs_filter" , flb_test_ecs_filter }, {"flb_test_ecs_filter_no_prefix" , flb_test_ecs_filter_no_prefix }, {"flb_test_ecs_filter_cluster_metadata_only" , flb_test_ecs_filter_cluster_metadata_only },