Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter_ecs: only attach metadata keys if their values are non-empty #6614

Merged
merged 8 commits into from
Feb 13, 2023
191 changes: 138 additions & 53 deletions plugins/filter_ecs/ecs.c
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,8 @@ static flb_sds_t parse_id_from_arn(const char *arn, int len)

/*
* This deserializes the msgpack metadata buf to msgpack_object
* which can be used with flb_ra_translate in the main filter callback
* and then uses that msgpack_object to construct a list of key value pairs
* ready to attach to log records
*/
static int flb_ecs_metadata_buffer_init(struct flb_filter_ecs *ctx,
struct flb_ecs_metadata_buffer *meta)
Expand All @@ -324,6 +325,11 @@ static int flb_ecs_metadata_buffer_init(struct flb_filter_ecs *ctx,
msgpack_object root;
size_t off = 0;
int ret;
struct flb_ecs_metadata_key *metadata_key = NULL;
flb_sds_t val = NULL;
struct mk_list *tmp;
struct mk_list *head;
struct flb_ecs_metadata_keypair *keypair = NULL;

msgpack_unpacked_init(&result);
ret = msgpack_unpack_next(&result, meta->buf, meta->size, &off);
Expand All @@ -341,24 +347,73 @@ static int flb_ecs_metadata_buffer_init(struct flb_filter_ecs *ctx,
return -1;
}

meta->unpacked = result;
meta->obj = root;
/* create list of metadata key value pairs */
mk_list_init(&meta->metadata_keypairs);
meta->keypairs_len = 0;

meta->last_used_time = time(NULL);
meta->free_packer = FLB_TRUE;

mk_list_foreach_safe(head, tmp, &ctx->metadata_keys) {
metadata_key = mk_list_entry(head, struct flb_ecs_metadata_key, _head);
val = flb_ra_translate(metadata_key->ra, NULL, 0,
root, NULL);
if (!val) {
flb_plg_info(ctx->ins, "Translation failed for %s : %s for container ID: %s",
metadata_key->key, metadata_key->template, meta->id);
/* keep trying other keys*/
continue;
}

if (flb_sds_len(val) == 0) {
flb_plg_info(ctx->ins, "Translation failed for %s : %s for container ID: %s",
metadata_key->key, metadata_key->template, meta->id);
flb_sds_destroy(val);
/* keep trying other keys*/
continue;
}
Comment on lines +367 to +373
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose value will never be null or length 0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(this was to address the other comment asking if we need to check some val==null condition as we do in other places)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uhhh its null if the record accessor translation fails

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not following you here.


keypair = flb_calloc(1, sizeof(struct flb_ecs_metadata_keypair));
if (!keypair) {
flb_errno();
msgpack_unpacked_destroy(&result);
flb_sds_destroy(val);
return -1;
}

keypair->key = metadata_key->key;
keypair->val = val;

mk_list_add(&keypair->_head, &meta->metadata_keypairs);
meta->keypairs_len += 1;
}

msgpack_unpacked_destroy(&result);

return 0;
}

static void flb_ecs_metadata_buffer_destroy(struct flb_ecs_metadata_buffer *meta)
{
struct flb_ecs_metadata_keypair *keypair = NULL;
struct mk_list *tmp;
struct mk_list *head;

if (meta) {
flb_free(meta->buf);
if (meta->free_packer == FLB_TRUE) {
msgpack_unpacked_destroy(&meta->unpacked);
}
if (meta->id) {
flb_sds_destroy(meta->id);
}
if (mk_list_is_set( &meta->metadata_keypairs) == 0) {
mk_list_foreach_safe(head, tmp, &meta->metadata_keypairs) {
keypair = mk_list_entry(head, struct flb_ecs_metadata_keypair, _head);
/* only need to free val. key is ref to flb_ecs_metadata_key.key*/
if (keypair->val) {
flb_sds_destroy(keypair->val);
}
mk_list_del(&keypair->_head);
flb_free(keypair);
}
}
flb_free(meta);
}
}
Expand Down Expand Up @@ -1325,7 +1380,7 @@ static int get_metadata_by_id(struct flb_filter_ecs *ctx,
/* get metadata for this container */
ret = flb_hash_get(ctx->container_hash_table,
container_short_id, flb_sds_len(container_short_id),
(void *) metadata_buffer, &size);
(void **) metadata_buffer, &size);

if (ret == -1) {
/* try fetch metadata */
Expand All @@ -1339,7 +1394,7 @@ static int get_metadata_by_id(struct flb_filter_ecs *ctx,
/* get from hash table */
ret = flb_hash_get(ctx->container_hash_table,
container_short_id, flb_sds_len(container_short_id),
(void *) metadata_buffer, &size);
(void **) metadata_buffer, &size);
}

flb_sds_destroy(container_short_id);
Expand Down Expand Up @@ -1369,14 +1424,14 @@ static int is_tag_marked_failed(struct flb_filter_ecs *ctx,
const char *tag, int tag_len)
{
int ret;
int val = 0;
int *val = NULL;
size_t val_size;

ret = flb_hash_get(ctx->failed_metadata_request_tags,
tag, tag_len,
(void *) &val, &val_size);
(void **) &val, &val_size);
if (ret != -1) {
if (val >= FLB_ECS_FILTER_METADATA_RETRIES) {
if (*val > ctx->agent_endpoint_retries) {
return FLB_TRUE;
}
}
Expand All @@ -1389,11 +1444,12 @@ static void mark_tag_failed(struct flb_filter_ecs *ctx,
{
int ret;
int *val = NULL;
int *new_val = NULL;
size_t val_size;

ret = flb_hash_get(ctx->failed_metadata_request_tags,
tag, tag_len,
(void *) val, &val_size);
(void **) &val, &val_size);

if (ret == -1) {
/* hash table copies memory to new heap block */
Expand All @@ -1409,16 +1465,29 @@ static void mark_tag_failed(struct flb_filter_ecs *ctx,
/* hash table will contain a copy */
flb_free(val);
} else {
/*
* val is memory returned from hash table
* if we simply update the value here and call flb_hash_add
* it first frees the old memory (which is what we passed it)
* then tries to copy over the memory we passed in to a new location
* flb_hash stores all entries as if they were strings, so we also
* can't simply increment the value returned by flb_hash_get
*/
new_val = flb_malloc(sizeof(int));
if (!new_val) {
flb_errno();
return;
}
/* increment number of failed metadata requests for this tag */
*val = *val + 1;
*new_val = *val + 1;
flb_hash_add(ctx->failed_metadata_request_tags,
tag, tag_len,
val, sizeof(int));
new_val, sizeof(int));
flb_plg_info(ctx->ins, "Failed to get ECS Metadata for tag %s %d times. "
"This might be because the logs for this tag do not come from an ECS Task Container. "
"This plugin will retry metadata requests at most %d times total for this tag.",
tag, *val, FLB_ECS_FILTER_METADATA_RETRIES);

tag, *new_val, ctx->agent_endpoint_retries);
flb_free(new_val);
Comment on lines +1489 to +1490
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section looks good now

}
}

Expand Down Expand Up @@ -1448,9 +1517,8 @@ static int cb_ecs_filter(const void *data, size_t bytes,
msgpack_object_kv *kv;
struct mk_list *tmp;
struct mk_list *head;
struct flb_ecs_metadata_key *metadata_key;
struct flb_ecs_metadata_buffer *metadata_buffer;
flb_sds_t val;
struct flb_ecs_metadata_keypair *keypair = NULL;
struct flb_ecs_metadata_buffer *metadata_buffer = NULL;

/* First check that the static cluster metadata has been retrieved */
if (ctx->has_cluster_metadata == FLB_FALSE) {
Expand All @@ -1467,7 +1535,7 @@ static int cb_ecs_filter(const void *data, size_t bytes,
if (check == FLB_TRUE) {
flb_plg_debug(ctx->ins, "Failed to get ECS Metadata for tag %s %d times. "
"Will not attempt to retry the metadata request. Will attach cluster metadata only.",
tag, FLB_ECS_FILTER_METADATA_RETRIES);
tag, ctx->agent_endpoint_retries);
}

if (check == FLB_FALSE && ctx->cluster_metadata_only == FLB_FALSE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If tag metadata retrieval does not fail, it would be nice for the retries counter to reset back to 0 or get deleted.

Currently it may be that total retries are being tracked where we only want to count consecutive retries.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then again, maybe on success you never will try to get the metadata ever again

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But there is ttl on the hash table so i think that there will be more retries after successful retrieval

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea on success we will add the container ID and its metadata to the hash table. Container IDs are static for the lifetime of a container.

Expand Down Expand Up @@ -1521,7 +1589,7 @@ static int cb_ecs_filter(const void *data, size_t bytes,
flb_time_append_to_msgpack(&tm, &tmp_pck, 0);

/* new record map size is old size + the new keys we will add */
total_records = obj->via.map.size + ctx->metadata_keys_len;
total_records = obj->via.map.size + metadata_buffer->keypairs_len;
msgpack_pack_map(&tmp_pck, total_records);

/* iterate through the old record map and add it to the new buffer */
Expand All @@ -1532,28 +1600,21 @@ static int cb_ecs_filter(const void *data, size_t bytes,
}

/* append new keys */
mk_list_foreach_safe(head, tmp, &ctx->metadata_keys) {
metadata_key = mk_list_entry(head, struct flb_ecs_metadata_key, _head);
val = flb_ra_translate(metadata_key->ra, NULL, 0,
metadata_buffer->obj, NULL);
if (!val) {
flb_plg_info(ctx->ins, "Translation failed for %s : %s",
metadata_key->key, metadata_key->template);
msgpack_unpacked_destroy(&result);
msgpack_sbuffer_destroy(&tmp_sbuf);
return FLB_FILTER_NOTOUCH;
if (metadata_buffer->keypairs_len > 0) {
mk_list_foreach_safe(head, tmp, &metadata_buffer->metadata_keypairs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great optimization around using record accessor unnecessarily

keypair = mk_list_entry(head, struct flb_ecs_metadata_keypair, _head);

len = flb_sds_len(keypair->key);
msgpack_pack_str(&tmp_pck, len);
msgpack_pack_str_body(&tmp_pck,
keypair->key,
len);
len = flb_sds_len(keypair->val);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will value always exist? I noticed that in the deletion code you check if value exists before deleting it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see other comment, I suppose it does always exist. the code is fine, though I'm not sure why in some places val == null is checked before use and other places it is not

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case, these are pre-processed metadata keypairs, so they are guaranteed to have non-null values.

msgpack_pack_str(&tmp_pck, len);
msgpack_pack_str_body(&tmp_pck,
keypair->val,
len);
}
len = flb_sds_len(metadata_key->key);
msgpack_pack_str(&tmp_pck, len);
msgpack_pack_str_body(&tmp_pck,
metadata_key->key,
len);
len = flb_sds_len(val);
msgpack_pack_str(&tmp_pck, len);
msgpack_pack_str_body(&tmp_pck,
val,
len);
flb_sds_destroy(val);
}
}
msgpack_unpacked_destroy(&result);
Expand Down Expand Up @@ -1590,6 +1651,7 @@ static void flb_filter_ecs_destroy(struct flb_filter_ecs *ctx)
struct mk_list *head;
struct flb_ecs_metadata_key *metadata_key;
struct flb_ecs_metadata_buffer *buf;
struct flb_ecs_metadata_keypair *keypair = NULL;

if (ctx) {
if (ctx->ecs_upstream) {
Expand All @@ -1609,18 +1671,32 @@ static void flb_filter_ecs_destroy(struct flb_filter_ecs *ctx)
}
if (ctx->cluster_meta_buf.buf) {
flb_free(ctx->cluster_meta_buf.buf);
msgpack_unpacked_destroy(&ctx->cluster_meta_buf.unpacked);
if (mk_list_is_set(&ctx->cluster_meta_buf.metadata_keypairs) == 0) {
mk_list_foreach_safe(head, tmp, &ctx->cluster_meta_buf.metadata_keypairs) {
keypair = mk_list_entry(head, struct flb_ecs_metadata_keypair, _head);
/* only need to free val. key is ref to flb_ecs_metadata_key.key*/
if (keypair->val) {
flb_sds_destroy(keypair->val);
}
mk_list_del(&keypair->_head);
flb_free(keypair);
}
}
}
mk_list_foreach_safe(head, tmp, &ctx->metadata_keys) {
metadata_key = mk_list_entry(head, struct flb_ecs_metadata_key, _head);
mk_list_del(&metadata_key->_head);
flb_ecs_metadata_key_destroy(metadata_key);
if (mk_list_is_set(&ctx->metadata_keys) == 0) {
mk_list_foreach_safe(head, tmp, &ctx->metadata_keys) {
metadata_key = mk_list_entry(head, struct flb_ecs_metadata_key, _head);
mk_list_del(&metadata_key->_head);
flb_ecs_metadata_key_destroy(metadata_key);
}
}
mk_list_foreach_safe(head, tmp, &ctx->metadata_buffers) {
buf = mk_list_entry(head, struct flb_ecs_metadata_buffer, _head);
mk_list_del(&buf->_head);
flb_hash_del(ctx->container_hash_table, buf->id);
flb_ecs_metadata_buffer_destroy(buf);
if (mk_list_is_set(&ctx->metadata_buffers) == 0) {
mk_list_foreach_safe(head, tmp, &ctx->metadata_buffers) {
buf = mk_list_entry(head, struct flb_ecs_metadata_buffer, _head);
mk_list_del(&buf->_head);
flb_hash_del(ctx->container_hash_table, buf->id);
flb_ecs_metadata_buffer_destroy(buf);
}
}
if (ctx->container_hash_table) {
flb_hash_destroy(ctx->container_hash_table);
Expand Down Expand Up @@ -1694,6 +1770,15 @@ static struct flb_config_map config_map[] = {
"Defaults to 51678"
},

{
FLB_CONFIG_MAP_INT, "agent_endpoint_retries", FLB_ECS_FILTER_METADATA_RETRIES,
0, FLB_TRUE, offsetof(struct flb_filter_ecs, agent_endpoint_retries),
"Number of retries for failed metadata requests to ECS Agent Introspection "
"endpoint. The most common cause of failed metadata requests is that the "
"container the metadata request was made for is not part of an ECS Task. "
"Check if you have non-task containers and docker dual logging enabled."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note. No action needed for this

},

{0}
};

Expand Down
22 changes: 16 additions & 6 deletions plugins/filter_ecs/ecs.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
#define FLB_ECS_FILTER_PORT "51678"
#define FLB_ECS_FILTER_CLUSTER_PATH "/v1/metadata"
#define FLB_ECS_FILTER_TASK_PATH_FORMAT "/v1/tasks?dockerid=%s"
#define FLB_ECS_FILTER_METADATA_RETRIES 2
#define FLB_ECS_FILTER_METADATA_RETRIES "3"

/*
* Kubernetes recommends not running more than 110 pods per node
Expand All @@ -49,21 +49,29 @@ struct flb_ecs_metadata_key {
struct mk_list _head;
};

/* metadata processed into KV pair ready to add to log record */
struct flb_ecs_metadata_keypair {
/* key is just a reference to the flb_ecs_metadata_key.key */
flb_sds_t key;
flb_sds_t val;

struct mk_list _head;
};

struct flb_ecs_metadata_buffer {
/* msgpack_sbuffer */
char *buf;
size_t size;

/* unpacked object to use with flb_ra_translate */
msgpack_unpacked unpacked;
msgpack_object obj;
int free_packer;

/* the hash table only stores a pointer- we need the list to track and free these */
struct mk_list _head;
/* we clean up the memory for these once ecs_meta_cache_ttl has expired */
time_t last_used_time;

/* List of processed metadata keys and values */
struct mk_list metadata_keypairs;
int keypairs_len;

/*
* To remove from the hash table on TTL expiration, we need the ID
* While we use a TTL hash, it won't clean up the memory, so we have a separate routine for that
Expand Down Expand Up @@ -109,6 +117,8 @@ struct flb_filter_ecs {
flb_sds_t ecs_host;
int ecs_port;

int agent_endpoint_retries;

/*
* This field is used when we build new container metadata objects
*/
Expand Down
Loading