Skip to content

Commit

Permalink
filter_ecs: all fixes from December 2022 in these PRs:
Browse files Browse the repository at this point in the history
fluent#6614

1.9 based branch

Signed-off-by: Wesley Pettit <[email protected]>
  • Loading branch information
PettitWesley authored and Swapneil Singh committed Oct 3, 2024
1 parent dac799f commit f5525cb
Show file tree
Hide file tree
Showing 3 changed files with 321 additions and 60 deletions.
191 changes: 138 additions & 53 deletions plugins/filter_ecs/ecs.c
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,8 @@ static flb_sds_t parse_id_from_arn(const char *arn, int len)

/*
* This deserializes the msgpack metadata buf to msgpack_object
* which can be used with flb_ra_translate in the main filter callback
* and then uses that msgpack_object to construct a list of key value pairs
* ready to attach to log records
*/
static int flb_ecs_metadata_buffer_init(struct flb_filter_ecs *ctx,
struct flb_ecs_metadata_buffer *meta)
Expand All @@ -324,6 +325,11 @@ static int flb_ecs_metadata_buffer_init(struct flb_filter_ecs *ctx,
msgpack_object root;
size_t off = 0;
int ret;
struct flb_ecs_metadata_key *metadata_key = NULL;
flb_sds_t val = NULL;
struct mk_list *tmp;
struct mk_list *head;
struct flb_ecs_metadata_keypair *keypair = NULL;

msgpack_unpacked_init(&result);
ret = msgpack_unpack_next(&result, meta->buf, meta->size, &off);
Expand All @@ -341,24 +347,73 @@ static int flb_ecs_metadata_buffer_init(struct flb_filter_ecs *ctx,
return -1;
}

meta->unpacked = result;
meta->obj = root;
/* create list of metadata key value pairs */
mk_list_init(&meta->metadata_keypairs);
meta->keypairs_len = 0;

meta->last_used_time = time(NULL);
meta->free_packer = FLB_TRUE;

mk_list_foreach_safe(head, tmp, &ctx->metadata_keys) {
metadata_key = mk_list_entry(head, struct flb_ecs_metadata_key, _head);
val = flb_ra_translate(metadata_key->ra, NULL, 0,
root, NULL);
if (!val) {
flb_plg_info(ctx->ins, "Translation failed for %s : %s for container ID: %s",
metadata_key->key, metadata_key->template, meta->id);
/* keep trying other keys*/
continue;
}

if (flb_sds_len(val) == 0) {
flb_plg_info(ctx->ins, "Translation failed for %s : %s for container ID: %s",
metadata_key->key, metadata_key->template, meta->id);
flb_sds_destroy(val);
/* keep trying other keys*/
continue;
}

keypair = flb_calloc(1, sizeof(struct flb_ecs_metadata_keypair));
if (!keypair) {
flb_errno();
msgpack_unpacked_destroy(&result);
flb_sds_destroy(val);
return -1;
}

keypair->key = metadata_key->key;
keypair->val = val;

mk_list_add(&keypair->_head, &meta->metadata_keypairs);
meta->keypairs_len += 1;
}

msgpack_unpacked_destroy(&result);

return 0;
}

static void flb_ecs_metadata_buffer_destroy(struct flb_ecs_metadata_buffer *meta)
{
struct flb_ecs_metadata_keypair *keypair = NULL;
struct mk_list *tmp;
struct mk_list *head;

if (meta) {
flb_free(meta->buf);
if (meta->free_packer == FLB_TRUE) {
msgpack_unpacked_destroy(&meta->unpacked);
}
if (meta->id) {
flb_sds_destroy(meta->id);
}
if (mk_list_is_set( &meta->metadata_keypairs) == 0) {
mk_list_foreach_safe(head, tmp, &meta->metadata_keypairs) {
keypair = mk_list_entry(head, struct flb_ecs_metadata_keypair, _head);
/* only need to free val. key is ref to flb_ecs_metadata_key.key*/
if (keypair->val) {
flb_sds_destroy(keypair->val);
}
mk_list_del(&keypair->_head);
flb_free(keypair);
}
}
flb_free(meta);
}
}
Expand Down Expand Up @@ -1325,7 +1380,7 @@ static int get_metadata_by_id(struct flb_filter_ecs *ctx,
/* get metadata for this container */
ret = flb_hash_get(ctx->container_hash_table,
container_short_id, flb_sds_len(container_short_id),
(void *) metadata_buffer, &size);
(void **) metadata_buffer, &size);

if (ret == -1) {
/* try fetch metadata */
Expand All @@ -1339,7 +1394,7 @@ static int get_metadata_by_id(struct flb_filter_ecs *ctx,
/* get from hash table */
ret = flb_hash_get(ctx->container_hash_table,
container_short_id, flb_sds_len(container_short_id),
(void *) metadata_buffer, &size);
(void **) metadata_buffer, &size);
}

flb_sds_destroy(container_short_id);
Expand Down Expand Up @@ -1369,14 +1424,14 @@ static int is_tag_marked_failed(struct flb_filter_ecs *ctx,
const char *tag, int tag_len)
{
int ret;
int val = 0;
int *val = NULL;
size_t val_size;

ret = flb_hash_get(ctx->failed_metadata_request_tags,
tag, tag_len,
(void *) &val, &val_size);
(void **) &val, &val_size);
if (ret != -1) {
if (val >= FLB_ECS_FILTER_METADATA_RETRIES) {
if (*val > ctx->agent_endpoint_retries) {
return FLB_TRUE;
}
}
Expand All @@ -1389,11 +1444,12 @@ static void mark_tag_failed(struct flb_filter_ecs *ctx,
{
int ret;
int *val = NULL;
int *new_val = NULL;
size_t val_size;

ret = flb_hash_get(ctx->failed_metadata_request_tags,
tag, tag_len,
(void *) val, &val_size);
(void **) &val, &val_size);

if (ret == -1) {
/* hash table copies memory to new heap block */
Expand All @@ -1409,16 +1465,29 @@ static void mark_tag_failed(struct flb_filter_ecs *ctx,
/* hash table will contain a copy */
flb_free(val);
} else {
/*
* val is memory returned from hash table
* if we simply update the value here and call flb_hash_add
* it first frees the old memory (which is what we passed it)
* then tries to copy over the memory we passed in to a new location
* flb_hash stores all entries as if they were strings, so we also
* can't simply increment the value returned by flb_hash_get
*/
new_val = flb_malloc(sizeof(int));
if (!new_val) {
flb_errno();
return;
}
/* increment number of failed metadata requests for this tag */
*val = *val + 1;
*new_val = *val + 1;
flb_hash_add(ctx->failed_metadata_request_tags,
tag, tag_len,
val, sizeof(int));
new_val, sizeof(int));
flb_plg_info(ctx->ins, "Failed to get ECS Metadata for tag %s %d times. "
"This might be because the logs for this tag do not come from an ECS Task Container. "
"This plugin will retry metadata requests at most %d times total for this tag.",
tag, *val, FLB_ECS_FILTER_METADATA_RETRIES);

tag, *new_val, ctx->agent_endpoint_retries);
flb_free(new_val);
}
}

Expand Down Expand Up @@ -1448,9 +1517,8 @@ static int cb_ecs_filter(const void *data, size_t bytes,
msgpack_object_kv *kv;
struct mk_list *tmp;
struct mk_list *head;
struct flb_ecs_metadata_key *metadata_key;
struct flb_ecs_metadata_buffer *metadata_buffer;
flb_sds_t val;
struct flb_ecs_metadata_keypair *keypair = NULL;
struct flb_ecs_metadata_buffer *metadata_buffer = NULL;

/* First check that the static cluster metadata has been retrieved */
if (ctx->has_cluster_metadata == FLB_FALSE) {
Expand All @@ -1467,7 +1535,7 @@ static int cb_ecs_filter(const void *data, size_t bytes,
if (check == FLB_TRUE) {
flb_plg_debug(ctx->ins, "Failed to get ECS Metadata for tag %s %d times. "
"Will not attempt to retry the metadata request. Will attach cluster metadata only.",
tag, FLB_ECS_FILTER_METADATA_RETRIES);
tag, ctx->agent_endpoint_retries);
}

if (check == FLB_FALSE && ctx->cluster_metadata_only == FLB_FALSE) {
Expand Down Expand Up @@ -1521,7 +1589,7 @@ static int cb_ecs_filter(const void *data, size_t bytes,
flb_time_append_to_msgpack(&tm, &tmp_pck, 0);

/* new record map size is old size + the new keys we will add */
total_records = obj->via.map.size + ctx->metadata_keys_len;
total_records = obj->via.map.size + metadata_buffer->keypairs_len;
msgpack_pack_map(&tmp_pck, total_records);

/* iterate through the old record map and add it to the new buffer */
Expand All @@ -1532,28 +1600,21 @@ static int cb_ecs_filter(const void *data, size_t bytes,
}

/* append new keys */
mk_list_foreach_safe(head, tmp, &ctx->metadata_keys) {
metadata_key = mk_list_entry(head, struct flb_ecs_metadata_key, _head);
val = flb_ra_translate(metadata_key->ra, NULL, 0,
metadata_buffer->obj, NULL);
if (!val) {
flb_plg_info(ctx->ins, "Translation failed for %s : %s",
metadata_key->key, metadata_key->template);
msgpack_unpacked_destroy(&result);
msgpack_sbuffer_destroy(&tmp_sbuf);
return FLB_FILTER_NOTOUCH;
if (metadata_buffer->keypairs_len > 0) {
mk_list_foreach_safe(head, tmp, &metadata_buffer->metadata_keypairs) {
keypair = mk_list_entry(head, struct flb_ecs_metadata_keypair, _head);

len = flb_sds_len(keypair->key);
msgpack_pack_str(&tmp_pck, len);
msgpack_pack_str_body(&tmp_pck,
keypair->key,
len);
len = flb_sds_len(keypair->val);
msgpack_pack_str(&tmp_pck, len);
msgpack_pack_str_body(&tmp_pck,
keypair->val,
len);
}
len = flb_sds_len(metadata_key->key);
msgpack_pack_str(&tmp_pck, len);
msgpack_pack_str_body(&tmp_pck,
metadata_key->key,
len);
len = flb_sds_len(val);
msgpack_pack_str(&tmp_pck, len);
msgpack_pack_str_body(&tmp_pck,
val,
len);
flb_sds_destroy(val);
}
}
msgpack_unpacked_destroy(&result);
Expand Down Expand Up @@ -1590,6 +1651,7 @@ static void flb_filter_ecs_destroy(struct flb_filter_ecs *ctx)
struct mk_list *head;
struct flb_ecs_metadata_key *metadata_key;
struct flb_ecs_metadata_buffer *buf;
struct flb_ecs_metadata_keypair *keypair = NULL;

if (ctx) {
if (ctx->ecs_upstream) {
Expand All @@ -1609,18 +1671,32 @@ static void flb_filter_ecs_destroy(struct flb_filter_ecs *ctx)
}
if (ctx->cluster_meta_buf.buf) {
flb_free(ctx->cluster_meta_buf.buf);
msgpack_unpacked_destroy(&ctx->cluster_meta_buf.unpacked);
if (mk_list_is_set(&ctx->cluster_meta_buf.metadata_keypairs) == 0) {
mk_list_foreach_safe(head, tmp, &ctx->cluster_meta_buf.metadata_keypairs) {
keypair = mk_list_entry(head, struct flb_ecs_metadata_keypair, _head);
/* only need to free val. key is ref to flb_ecs_metadata_key.key*/
if (keypair->val) {
flb_sds_destroy(keypair->val);
}
mk_list_del(&keypair->_head);
flb_free(keypair);
}
}
}
mk_list_foreach_safe(head, tmp, &ctx->metadata_keys) {
metadata_key = mk_list_entry(head, struct flb_ecs_metadata_key, _head);
mk_list_del(&metadata_key->_head);
flb_ecs_metadata_key_destroy(metadata_key);
if (mk_list_is_set(&ctx->metadata_keys) == 0) {
mk_list_foreach_safe(head, tmp, &ctx->metadata_keys) {
metadata_key = mk_list_entry(head, struct flb_ecs_metadata_key, _head);
mk_list_del(&metadata_key->_head);
flb_ecs_metadata_key_destroy(metadata_key);
}
}
mk_list_foreach_safe(head, tmp, &ctx->metadata_buffers) {
buf = mk_list_entry(head, struct flb_ecs_metadata_buffer, _head);
mk_list_del(&buf->_head);
flb_hash_del(ctx->container_hash_table, buf->id);
flb_ecs_metadata_buffer_destroy(buf);
if (mk_list_is_set(&ctx->metadata_buffers) == 0) {
mk_list_foreach_safe(head, tmp, &ctx->metadata_buffers) {
buf = mk_list_entry(head, struct flb_ecs_metadata_buffer, _head);
mk_list_del(&buf->_head);
flb_hash_del(ctx->container_hash_table, buf->id);
flb_ecs_metadata_buffer_destroy(buf);
}
}
if (ctx->container_hash_table) {
flb_hash_destroy(ctx->container_hash_table);
Expand Down Expand Up @@ -1694,6 +1770,15 @@ static struct flb_config_map config_map[] = {
"Defaults to 51678"
},

{
FLB_CONFIG_MAP_INT, "agent_endpoint_retries", FLB_ECS_FILTER_METADATA_RETRIES,
0, FLB_TRUE, offsetof(struct flb_filter_ecs, agent_endpoint_retries),
"Number of retries for failed metadata requests to ECS Agent Introspection "
"endpoint. The most common cause of failed metadata requests is that the "
"container the metadata request was made for is not part of an ECS Task. "
"Check if you have non-task containers and docker dual logging enabled."
},

{0}
};

Expand Down
22 changes: 16 additions & 6 deletions plugins/filter_ecs/ecs.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
#define FLB_ECS_FILTER_PORT "51678"
#define FLB_ECS_FILTER_CLUSTER_PATH "/v1/metadata"
#define FLB_ECS_FILTER_TASK_PATH_FORMAT "/v1/tasks?dockerid=%s"
#define FLB_ECS_FILTER_METADATA_RETRIES 2
#define FLB_ECS_FILTER_METADATA_RETRIES "3"

/*
* Kubernetes recommends not running more than 110 pods per node
Expand All @@ -49,21 +49,29 @@ struct flb_ecs_metadata_key {
struct mk_list _head;
};

/* metadata processed into KV pair ready to add to log record */
struct flb_ecs_metadata_keypair {
/* key is just a reference to the flb_ecs_metadata_key.key */
flb_sds_t key;
flb_sds_t val;

struct mk_list _head;
};

struct flb_ecs_metadata_buffer {
/* msgpack_sbuffer */
char *buf;
size_t size;

/* unpacked object to use with flb_ra_translate */
msgpack_unpacked unpacked;
msgpack_object obj;
int free_packer;

/* the hash table only stores a pointer- we need the list to track and free these */
struct mk_list _head;
/* we clean up the memory for these once ecs_meta_cache_ttl has expired */
time_t last_used_time;

/* List of processed metadata keys and values */
struct mk_list metadata_keypairs;
int keypairs_len;

/*
* To remove from the hash table on TTL expiration, we need the ID
* While we use a TTL hash, it won't clean up the memory, so we have a separate routine for that
Expand Down Expand Up @@ -109,6 +117,8 @@ struct flb_filter_ecs {
flb_sds_t ecs_host;
int ecs_port;

int agent_endpoint_retries;

/*
* This field is used when we build new container metadata objects
*/
Expand Down
Loading

0 comments on commit f5525cb

Please sign in to comment.