From e2812a456bf26cfff64075ace577e919785b0ba3 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 16 Aug 2022 07:53:52 -0700 Subject: [PATCH] filter_ecs: new filter for AWS ECS Metadata Signed-off-by: Wesley Pettit --- CMakeLists.txt | 69 ++ plugins/CMakeLists.txt | 1 + plugins/filter_ecs/CMakeLists.txt | 5 + plugins/filter_ecs/ecs.c | 1604 +++++++++++++++++++++++++++++ plugins/filter_ecs/ecs.h | 138 +++ tests/runtime/CMakeLists.txt | 1 + tests/runtime/filter_ecs.c | 372 +++++++ 7 files changed, 2190 insertions(+) create mode 100644 plugins/filter_ecs/CMakeLists.txt create mode 100644 plugins/filter_ecs/ecs.c create mode 100644 plugins/filter_ecs/ecs.h create mode 100644 tests/runtime/filter_ecs.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 133801047e9..ce0f2883566 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -176,6 +176,7 @@ option(FLB_IN_STORAGE_BACKLOG "Enable storage backlog input plugin" option(FLB_IN_EMITTER "Enable emitter input plugin" Yes) option(FLB_IN_NODE_EXPORTER_METRICS "Enable node exporter metrics input plugin" Yes) option(FLB_IN_WINDOWS_EXPORTER_METRICS "Enable windows exporter metrics input plugin" Yes) +<<<<<<< HEAD option(FLB_IN_OPENTELEMETRY "Enable OpenTelemetry input plugin" Yes) option(FLB_OUT_AZURE "Enable Azure output plugin" Yes) option(FLB_OUT_AZURE_BLOB "Enable Azure output plugin" Yes) @@ -242,6 +243,74 @@ option(FLB_FILTER_TENSORFLOW "Enable tensorflow filter" option(FLB_FILTER_GEOIP2 "Enable geoip2 filter" Yes) option(FLB_FILTER_NIGHTFALL "Enable Nightfall filter" Yes) option(FLB_FILTER_WASM "Enable WASM filter" Yes) +======= +option(FLB_IN_OPENTELEMETRY "Enable OpenTelemetry input plugin" Yes) +option(FLB_OUT_AZURE "Enable Azure output plugin" Yes) +option(FLB_OUT_AZURE_BLOB "Enable Azure output plugin" Yes) +option(FLB_OUT_AZURE_KUSTO "Enable Azure Kusto output plugin" Yes) +option(FLB_OUT_BIGQUERY "Enable BigQuery output plugin" Yes) +option(FLB_OUT_CALYPTIA "Enable Calyptia monitoring plugin" Yes) +option(FLB_OUT_COUNTER "Enable Counter output plugin" Yes) +option(FLB_OUT_DATADOG "Enable DataDog output plugin" Yes) +option(FLB_OUT_ES "Enable Elasticsearch output plugin" Yes) +option(FLB_OUT_EXIT "Enable Exit output plugin" Yes) +option(FLB_OUT_FORWARD "Enable Forward output plugin" Yes) +option(FLB_OUT_GELF "Enable GELF output plugin" Yes) +option(FLB_OUT_HTTP "Enable HTTP output plugin" Yes) +option(FLB_OUT_INFLUXDB "Enable InfluxDB output plugin" Yes) +option(FLB_OUT_NATS "Enable NATS output plugin" Yes) +option(FLB_OUT_NRLOGS "Enable New Relic output plugin" Yes) +option(FLB_OUT_OPENSEARCH "Enable OpenSearch output plugin" Yes) +option(FLB_OUT_TCP "Enable TCP output plugin" Yes) +option(FLB_OUT_PLOT "Enable Plot output plugin" Yes) +option(FLB_OUT_FILE "Enable file output plugin" Yes) +option(FLB_OUT_TD "Enable Treasure Data output plugin" Yes) +option(FLB_OUT_RETRY "Enable Retry test output plugin" No) +option(FLB_OUT_PGSQL "Enable PostgreSQL output plugin" No) +option(FLB_OUT_SKYWALKING "Enable Apache SkyWalking output plugin" Yes) +option(FLB_OUT_SLACK "Enable Slack output plugin" Yes) +option(FLB_OUT_SPLUNK "Enable Splunk output plugin" Yes) +option(FLB_OUT_STACKDRIVER "Enable Stackdriver output plugin" Yes) +option(FLB_OUT_STDOUT "Enable STDOUT output plugin" Yes) +option(FLB_OUT_SYSLOG "Enable Syslog output plugin" Yes) +option(FLB_OUT_LIB "Enable library mode output plugin" Yes) +option(FLB_OUT_NULL "Enable dev null output plugin" Yes) +option(FLB_OUT_FLOWCOUNTER "Enable flowcount output plugin" Yes) +option(FLB_OUT_LOGDNA "Enable LogDNA output plugin" Yes) +option(FLB_OUT_LOKI "Enable Loki output plugin" Yes) +option(FLB_OUT_KAFKA "Enable Kafka output plugin" No) +option(FLB_OUT_KAFKA_REST "Enable Kafka Rest output plugin" Yes) +option(FLB_OUT_CLOUDWATCH_LOGS "Enable AWS CloudWatch output plugin" Yes) +option(FLB_OUT_KINESIS_FIREHOSE "Enable AWS Firehose output plugin" Yes) +option(FLB_OUT_KINESIS_STREAMS "Enable AWS Kinesis output plugin" Yes) +option(FLB_OUT_OPENTELEMETRY "Enable OpenTelemetry plugin" Yes) +option(FLB_OUT_PROMETHEUS_EXPORTER "Enable Prometheus exporter plugin" Yes) +option(FLB_OUT_PROMETHEUS_REMOTE_WRITE "Enable Prometheus remote write plugin" Yes) +option(FLB_OUT_S3 "Enable AWS S3 output plugin" Yes) +option(FLB_OUT_WEBSOCKET "Enable Websocket output plugin" Yes) +option(FLB_FILTER_ALTER_SIZE "Enable alter_size filter" Yes) +option(FLB_FILTER_AWS "Enable aws filter" Yes) +option(FLB_FILTER_CHECKLIST "Enable checklist filter" Yes) +option(FLB_FILTER_ECS "Enable AWS ECS filter" Yes) +option(FLB_FILTER_EXPECT "Enable expect filter" Yes) +option(FLB_FILTER_GREP "Enable grep filter" Yes) +option(FLB_FILTER_MODIFY "Enable modify filter" Yes) +option(FLB_FILTER_STDOUT "Enable stdout filter" Yes) +option(FLB_FILTER_PARSER "Enable parser filter" Yes) +option(FLB_FILTER_KUBERNETES "Enable kubernetes filter" Yes) +option(FLB_FILTER_REWRITE_TAG "Enable tag rewrite filter" Yes) +option(FLB_FILTER_THROTTLE "Enable throttle filter" Yes) +option(FLB_FILTER_THROTTLE_SIZE "Enable throttle size filter" No) +option(FLB_FILTER_TYPE_CONVERTER "Enable type converter filter" Yes) +option(FLB_FILTER_MULTILINE "Enable multiline filter" Yes) +option(FLB_FILTER_NEST "Enable nest filter" Yes) +option(FLB_FILTER_LUA "Enable Lua scripting filter" Yes) +option(FLB_FILTER_LUA_USE_MPACK "Enable mpack on the lua filter" Yes) +option(FLB_FILTER_RECORD_MODIFIER "Enable record_modifier filter" Yes) +option(FLB_FILTER_TENSORFLOW "Enable tensorflow filter" No) +option(FLB_FILTER_GEOIP2 "Enable geoip2 filter" Yes) +option(FLB_FILTER_NIGHTFALL "Enable Nightfall filter" Yes) +>>>>>>> filter_ecs: new filter for AWS ECS Metadata if(DEFINED FLB_NIGHTLY_BUILD AND NOT "${FLB_NIGHTLY_BUILD}" STREQUAL "") FLB_DEFINITION_VAL(FLB_NIGHTLY_BUILD ${FLB_NIGHTLY_BUILD}) diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 36412a1429b..8f45ee78ecb 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -281,6 +281,7 @@ REGISTER_OUT_PLUGIN("out_s3") REGISTER_FILTER_PLUGIN("filter_alter_size") REGISTER_FILTER_PLUGIN("filter_aws") REGISTER_FILTER_PLUGIN("filter_checklist") +REGISTER_FILTER_PLUGIN("filter_ecs") REGISTER_FILTER_PLUGIN("filter_record_modifier") REGISTER_FILTER_PLUGIN("filter_throttle") REGISTER_FILTER_PLUGIN("filter_throttle_size") diff --git a/plugins/filter_ecs/CMakeLists.txt b/plugins/filter_ecs/CMakeLists.txt new file mode 100644 index 00000000000..335a870f7fb --- /dev/null +++ b/plugins/filter_ecs/CMakeLists.txt @@ -0,0 +1,5 @@ +set(src + ecs.c + ) + +FLB_PLUGIN(filter_ecs "${src}" "") diff --git a/plugins/filter_ecs/ecs.c b/plugins/filter_ecs/ecs.c new file mode 100644 index 00000000000..7482aeeb7f8 --- /dev/null +++ b/plugins/filter_ecs/ecs.c @@ -0,0 +1,1604 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "ecs.h" + +static int get_ecs_cluster_metadata(struct flb_filter_ecs *ctx); +static void flb_filter_ecs_destroy(struct flb_filter_ecs *ctx); + +/* cluster meta is static so we can expose it on global ctx for other plugins to use */ +static void expose_ecs_cluster_meta(struct flb_filter_ecs *ctx) +{ + struct flb_env *env; + struct flb_config *config = ctx->ins->config; + + env = config->env; + + flb_env_set(env, "ecs", "enabled"); + + if (ctx->cluster_metadata.cluster_name) { + flb_env_set(env, + "aws.ecs.cluster_name", + ctx->cluster_metadata.cluster_name); + } + + if (ctx->cluster_metadata.container_instance_arn) { + flb_env_set(env, + "aws.ecs.container_instance_arn", + ctx->cluster_metadata.container_instance_arn); + } + + if (ctx->cluster_metadata.container_instance_id) { + flb_env_set(env, + "aws.ecs.container_instance_id", + ctx->cluster_metadata.container_instance_id); + } + + if (ctx->cluster_metadata.ecs_agent_version) { + flb_env_set(env, + "aws.ecs.ecs_agent_version", + ctx->cluster_metadata.container_instance_id); + } +} + +static int cb_ecs_init(struct flb_filter_instance *f_ins, + struct flb_config *config, + void *data) +{ + int ret; + struct flb_filter_ecs *ctx = NULL; + struct mk_list *head; + struct mk_list *split; + struct flb_kv *kv; + struct flb_split_entry *sentry; + int list_size; + struct flb_ecs_metadata_key *ecs_meta = NULL; + (void) data; + + /* Create context */ + ctx = flb_calloc(1, sizeof(struct flb_filter_ecs)); + if (!ctx) { + flb_errno(); + return -1; + } + + ctx->ins = f_ins; + + /* Populate context with config map defaults and incoming properties */ + ret = flb_filter_config_map_set(f_ins, (void *) ctx); + if (ret == -1) { + flb_plg_error(f_ins, "configuration error"); + flb_free(ctx); + return -1; + } + + mk_list_init(&ctx->metadata_keys); + ctx->metadata_keys_len = 0; + mk_list_init(&ctx->metadata_buffers); + + mk_list_foreach(head, &f_ins->properties) { + kv = mk_list_entry(head, struct flb_kv, _head); + + if (strcasecmp(kv->key, "add") == 0) { + split = flb_utils_split(kv->val, ' ', 2); + list_size = mk_list_size(split); + + if (list_size == 0 || list_size > 2) { + flb_plg_error(ctx->ins, "Invalid config for %s", kv->key); + flb_utils_split_free(split); + goto error; + } + + sentry = mk_list_entry_first(split, struct flb_split_entry, _head); + ecs_meta = flb_calloc(1, sizeof(struct flb_ecs_metadata_key)); + if (!ecs_meta) { + flb_errno(); + flb_utils_split_free(split); + goto error; + } + + ecs_meta->key = flb_sds_create_len(sentry->value, sentry->len); + if (!ecs_meta->key) { + flb_errno(); + flb_utils_split_free(split); + goto error; + } + + sentry = mk_list_entry_last(split, struct flb_split_entry, _head); + + ecs_meta->template = flb_sds_create_len(sentry->value, sentry->len); + if (!ecs_meta->template) { + flb_errno(); + flb_utils_split_free(split); + goto error; + } + + ecs_meta->ra = flb_ra_create(ecs_meta->template, FLB_FALSE); + if (ecs_meta->ra == NULL) { + flb_plg_error(ctx->ins, "Could not parse template for `%s`", ecs_meta->key); + flb_utils_split_free(split); + goto error; + } + + mk_list_add(&ecs_meta->_head, &ctx->metadata_keys); + ctx->metadata_keys_len++; + flb_utils_split_free(split); + } + } + + ctx->ecs_upstream = flb_upstream_create(config, + FLB_ECS_FILTER_HOST, + FLB_ECS_FILTER_PORT, + FLB_IO_TCP, + NULL); + + if (!ctx->ecs_upstream) { + flb_errno(); + flb_plg_error(ctx->ins, "Could not create upstream connection to ECS Agent"); + goto error; + } + + /* + * Remove async flag from upstream + * Filters can not coroutine-yield. + */ + ctx->ecs_upstream->flags &= ~(FLB_IO_ASYNC); + ctx->has_cluster_metadata = FLB_FALSE; + + /* entries are only evicted when TTL is reached and a get is issued */ + ctx->container_hash_table = flb_hash_create_with_ttl(ctx->ecs_meta_cache_ttl, + FLB_HASH_EVICT_OLDER, + FLB_ECS_FILTER_HASH_TABLE_SIZE, + FLB_ECS_FILTER_HASH_TABLE_SIZE); + if (!ctx->container_hash_table) { + flb_plg_error(f_ins, "failed to create container_hash_table"); + goto error; + } + + ctx->ecs_tag_prefix_len = strlen(ctx->ecs_tag_prefix); + + /* attempt to get metadata in init, can retry in cb_filter */ + ret = get_ecs_cluster_metadata(ctx); + + flb_filter_set_context(f_ins, ctx); + return 0; + +error: + flb_plg_error(ctx->ins, "Initialization failed."); + flb_filter_ecs_destroy(ctx); + return -1; +} + +static int plugin_under_test() +{ + if (getenv("FLB_ECS_PLUGIN_UNDER_TEST") != NULL) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +static char *mock_error_response(char *error_env_var) +{ + char *err_val = NULL; + char *error = NULL; + int len = 0; + + err_val = getenv(error_env_var); + if (err_val != NULL && strlen(err_val) > 0) { + error = flb_malloc(strlen(err_val) + sizeof(char)); + if (error == NULL) { + flb_errno(); + return NULL; + } + + len = strlen(err_val); + memcpy(error, err_val, len); + error[len] = '\0'; + return error; + } + + return NULL; +} + +static struct flb_http_client *mock_http_call(char *error_env_var, char *api) +{ + /* create an http client so that we can set the response */ + struct flb_http_client *c = NULL; + char *error = mock_error_response(error_env_var); + + c = flb_calloc(1, sizeof(struct flb_http_client)); + if (!c) { + flb_errno(); + flb_free(error); + return NULL; + } + mk_list_init(&c->headers); + + if (error != NULL) { + c->resp.status = 400; + /* resp.data is freed on destroy, payload is supposed to reference it */ + c->resp.data = error; + c->resp.payload = c->resp.data; + c->resp.payload_size = strlen(error); + } + else { + c->resp.status = 200; + if (strcmp(api, "Cluster") == 0) { + /* mocked success response */ + c->resp.payload = "{\"Cluster\": \"cluster_name\",\"ContainerInstanceArn\": \"arn:aws:ecs:region:aws_account_id:container-instance/cluster_name/container_instance_id\",\"Version\": \"Amazon ECS Agent - v1.30.0 (02ff320c)\"}"; + c->resp.payload_size = strlen(c->resp.payload); + } + else { + c->resp.payload = "{\"Arn\": \"arn:aws:ecs:us-west-2:012345678910:task/default/e01d58a8-151b-40e8-bc01-22647b9ecfec\",\"Containers\": [{\"DockerId\": \"79c796ed2a7f864f485c76f83f3165488097279d296a7c05bd5201a1c69b2920\",\"DockerName\": \"ecs-nginx-efs-2-nginx-9ac0808dd0afa495f001\",\"Name\": \"nginx\"}],\"DesiredStatus\": \"RUNNING\",\"Family\": \"nginx-efs\",\"KnownStatus\": \"RUNNING\",\"Version\": \"2\"}"; + c->resp.payload_size = strlen(c->resp.payload); + } + } + + return c; +} + +/* + * Both container instance and task ARNs have the ID at the end after last '/' + */ +static flb_sds_t parse_id_from_arn(const char *arn, int len) +{ + int i; + flb_sds_t ID = NULL; + int last_slash = 0; + int id_start = 0; + + for (i = 0; i < len; i++) { + if (arn[i] == '/') { + last_slash = i; + } + } + + if (last_slash == 0 || last_slash >= len - 2) { + return NULL; + } + id_start = last_slash + 1; + + ID = flb_sds_create_len(arn + id_start, len - id_start); + if (ID == NULL) { + flb_errno(); + return NULL; + } + + return ID; +} + +/* + * This deserializes the msgpack metadata buf to msgpack_object + * which can be used with flb_ra_translate in the main filter callback + */ +static int flb_ecs_metadata_buffer_init(struct flb_filter_ecs *ctx, + struct flb_ecs_metadata_buffer *meta) +{ + msgpack_unpacked result; + msgpack_object root; + size_t off = 0; + int ret; + + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, meta->buf, meta->size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + flb_plg_error(ctx->ins, "Cannot unpack flb_ecs_metadata_buffer"); + msgpack_unpacked_destroy(&result); + return -1; + } + + root = result.data; + if (root.type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "Cannot unpack flb_ecs_metadata_buffer, msgpack_type=%i", + root.type); + msgpack_unpacked_destroy(&result); + return -1; + } + + meta->unpacked = result; + meta->obj = root; + meta->last_used_time = time(NULL); + meta->free_packer = FLB_TRUE; + + return 0; +} + +static void flb_ecs_metadata_buffer_destroy(struct flb_ecs_metadata_buffer *meta) +{ + if (meta) { + flb_free(meta->buf); + if (meta->free_packer == FLB_TRUE) { + msgpack_unpacked_destroy(&meta->unpacked); + } + if (meta->id) { + flb_sds_destroy(meta->id); + } + flb_free(meta); + } +} + +/* + * Get cluster and container instance info, which are static and never change + */ +static int get_ecs_cluster_metadata(struct flb_filter_ecs *ctx) +{ + struct flb_http_client *c; + struct flb_upstream_conn *u_conn; + int ret; + int root_type; + int found_cluster = FLB_FALSE; + int found_version = FLB_FALSE; + int found_instance = FLB_FALSE; + int i; + int len; + char *buffer; + size_t size; + size_t b_sent; + size_t off = 0; + msgpack_unpacked result; + msgpack_object root; + msgpack_object key; + msgpack_object val; + msgpack_sbuffer tmp_sbuf; + msgpack_packer tmp_pck; + flb_sds_t container_instance_id = NULL; + flb_sds_t tmp = NULL; + + u_conn = flb_upstream_conn_get(ctx->ecs_upstream); + + if (!u_conn) { + flb_plg_error(ctx->ins, "ECS agent introspection endpoint connection error"); + return -1; + } + + /* Compose HTTP Client request*/ + if (plugin_under_test() == FLB_TRUE) { + c = mock_http_call("TEST_CLUSTER_ERROR", "Cluster"); + ret = 0; + } + else { + c = flb_http_client(u_conn, FLB_HTTP_GET, + FLB_ECS_FILTER_CLUSTER_PATH, + NULL, 0, + FLB_ECS_FILTER_HOST, FLB_ECS_FILTER_PORT, + NULL, 0); + flb_http_buffer_size(c, 0); /* 0 means unlimited */ + + flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); + + ret = flb_http_do(c, &b_sent); + flb_plg_debug(ctx->ins, "http_do=%i, " + "HTTP Status: %i", + ret, c->resp.status); + } + + if (ret != 0 || c->resp.status != 200) { + if (c->resp.payload_size > 0) { + flb_plg_warn(ctx->ins, "Failed to get metadata from %s, will retry", + FLB_ECS_FILTER_CLUSTER_PATH); + flb_plg_debug(ctx->ins, "HTTP response\n%s", + c->resp.payload); + } else { + flb_plg_warn(ctx->ins, "%s response status was %d with no payload, will retry", + FLB_ECS_FILTER_CLUSTER_PATH, + c->resp.status); + } + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + return -1; + } + + ret = flb_pack_json(c->resp.payload, c->resp.payload_size, + &buffer, &size, &root_type); + + if (ret < 0) { + flb_plg_warn(ctx->ins, "Could not parse response from %s; response=\n%s", + FLB_ECS_FILTER_CLUSTER_PATH, c->resp.payload); + return -1; + } + + /* parse metadata response */ + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, buffer, size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + flb_plg_error(ctx->ins, "Cannot unpack %s response to find metadata\n%s", + FLB_ECS_FILTER_CLUSTER_PATH, c->resp.payload); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + return -1; + } + + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + + root = result.data; + if (root.type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "%s response parsing failed, msgpack_type=%i", + FLB_ECS_FILTER_CLUSTER_PATH, + root.type); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + return -1; + } + + /* +Metadata Response: +{ + "Cluster": "cluster_name", + "ContainerInstanceArn": "arn:aws:ecs:region:aws_account_id:container-instance/cluster_name/container_instance_id", + "Version": "Amazon ECS Agent - v1.30.0 (02ff320c)" +} +But our metadata keys names are: +{ + "ClusterName": "cluster_name", + "ContainerInstanceArn": "arn:aws:ecs:region:aws_account_id:container-instance/cluster_name/container_instance_id", + "ContainerInstanceID": "container_instance_id" + "ECSAgentVersion": "Amazon ECS Agent - v1.30.0 (02ff320c)" +} + */ + + for (i = 0; i < root.via.map.size; i++) { + key = root.via.map.ptr[i].key; + if (key.type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "%s response parsing failed, msgpack key type=%i", + FLB_ECS_FILTER_CLUSTER_PATH, + key.type); + continue; + } + + if (key.via.str.size == 7 && strncmp(key.via.str.ptr, "Cluster", 7) == 0) { + val = root.via.map.ptr[i].val; + if (val.type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "metadata parsing: unexpected 'Cluster' value type=%i", + val.type); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + return -1; + } + + found_cluster = FLB_TRUE; + if (ctx->cluster_metadata.cluster_name == NULL) { + tmp = flb_sds_create_len(val.via.str.ptr, (int) val.via.str.size); + if (!tmp) { + flb_errno(); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + return -1; + } + ctx->cluster_metadata.cluster_name = tmp; + } + + } + else if (key.via.str.size == 20 && strncmp(key.via.str.ptr, "ContainerInstanceArn", 20) == 0) { + val = root.via.map.ptr[i].val; + if (val.type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "metadata parsing: unexpected 'ContainerInstanceArn' value type=%i", + val.type); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + return -1; + } + + /* first the ARN */ + found_instance = FLB_TRUE; + if (ctx->cluster_metadata.container_instance_arn == NULL) { + tmp = flb_sds_create_len(val.via.str.ptr, (int) val.via.str.size); + if (!tmp) { + flb_errno(); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + return -1; + } + ctx->cluster_metadata.container_instance_arn = tmp; + } + + /* then the ID */ + if (ctx->cluster_metadata.container_instance_id == NULL) { + container_instance_id = parse_id_from_arn(val.via.str.ptr, (int) val.via.str.size); + if (container_instance_id == NULL) { + flb_plg_error(ctx->ins, "metadata parsing: failed to get ID from %.*s", + (int) val.via.str.size, val.via.str.ptr); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + return -1; + } + ctx->cluster_metadata.container_instance_id = container_instance_id; + } + + } else if (key.via.str.size == 7 && strncmp(key.via.str.ptr, "Version", 7) == 0) { + val = root.via.map.ptr[i].val; + if (val.type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "metadata parsing: unexpected 'Version' value type=%i", + val.type); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + return -1; + } + + found_version = FLB_TRUE; + if (ctx->cluster_metadata.ecs_agent_version == NULL) { + tmp = flb_sds_create_len(val.via.str.ptr, (int) val.via.str.size); + if (!tmp) { + flb_errno(); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + return -1; + } + ctx->cluster_metadata.ecs_agent_version = tmp; + } + } + + } + + flb_free(buffer); + msgpack_unpacked_destroy(&result); + + if (found_cluster == FLB_FALSE) { + flb_plg_error(ctx->ins, "Could not parse 'Cluster' from %s response", + FLB_ECS_FILTER_CLUSTER_PATH); + return -1; + } + if (found_instance == FLB_FALSE) { + flb_plg_error(ctx->ins, "Could not parse 'ContainerInstanceArn' from %s response", + FLB_ECS_FILTER_CLUSTER_PATH); + return -1; + } + if (found_version == FLB_FALSE) { + flb_plg_error(ctx->ins, "Could not parse 'Version' from %s response", + FLB_ECS_FILTER_CLUSTER_PATH); + return -1; + } + + /* + * We also create a standalone cluster metadata msgpack object + * This is used as a fallback for logs when we can't find the + * task metadata for a log. It is valid to attach cluster meta + * to eg. Docker daemon logs which are not an AWS ECS Task via + * the `cluster_metadata_only` setting. + */ + msgpack_sbuffer_init(&tmp_sbuf); + msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); + msgpack_pack_map(&tmp_pck, 4); + + msgpack_pack_str(&tmp_pck, 11); + msgpack_pack_str_body(&tmp_pck, + "ClusterName", + 11); + len = flb_sds_len(ctx->cluster_metadata.cluster_name); + msgpack_pack_str(&tmp_pck, len); + msgpack_pack_str_body(&tmp_pck, + ctx->cluster_metadata.cluster_name, + len); + + msgpack_pack_str(&tmp_pck, 20); + msgpack_pack_str_body(&tmp_pck, + "ContainerInstanceArn", + 20); + len = flb_sds_len(ctx->cluster_metadata.container_instance_arn); + msgpack_pack_str(&tmp_pck, len); + msgpack_pack_str_body(&tmp_pck, + ctx->cluster_metadata.container_instance_arn, + len); + + msgpack_pack_str(&tmp_pck, 19); + msgpack_pack_str_body(&tmp_pck, + "ContainerInstanceID", + 19); + len = flb_sds_len(ctx->cluster_metadata.container_instance_id); + msgpack_pack_str(&tmp_pck, len); + msgpack_pack_str_body(&tmp_pck, + ctx->cluster_metadata.container_instance_id, + len); + + msgpack_pack_str(&tmp_pck, 15); + msgpack_pack_str_body(&tmp_pck, + "ECSAgentVersion", + 15); + len = flb_sds_len(ctx->cluster_metadata.ecs_agent_version); + msgpack_pack_str(&tmp_pck, len); + msgpack_pack_str_body(&tmp_pck, + ctx->cluster_metadata.ecs_agent_version, + len); + + ctx->cluster_meta_buf.buf = tmp_sbuf.data; + ctx->cluster_meta_buf.size = tmp_sbuf.size; + + ret = flb_ecs_metadata_buffer_init(ctx, &ctx->cluster_meta_buf); + if (ret < 0) { + flb_plg_error(ctx->ins, "Could not init metadata buffer from %s response", + FLB_ECS_FILTER_CLUSTER_PATH); + msgpack_sbuffer_destroy(&tmp_sbuf); + ctx->cluster_meta_buf.buf = NULL; + ctx->cluster_meta_buf.size = 0; + return -1; + } + + ctx->has_cluster_metadata = FLB_TRUE; + expose_ecs_cluster_meta(ctx); + return 0; +} + +/* + * This is the helper function used by get_task_metadata() + * that actually creates the final metadata msgpack buffer + * with our final key names. + * It collects cluster, task, and container metadata into one +The new metadata msgpack is flat and looks like: +{ + "ContainerID": "79c796ed2a7f864f485c76f83f3165488097279d296a7c05bd5201a1c69b2920", + "DockerContainerName": "ecs-nginx-efs-2-nginx-9ac0808dd0afa495f001", + "ContainerName": "nginx", + + "ClusterName": "cluster_name", + "ContainerInstanceArn": "arn:aws:ecs:region:aws_account_id:container-instance/cluster_name/container_instance_id", + "ContainerInstanceID": "container_instance_id" + "ECSAgentVersion": "Amazon ECS Agent - v1.30.0 (02ff320c)" + + "TaskARN": "arn:aws:ecs:us-west-2:012345678910:task/default/example5-58ff-46c9-ae05-543f8example", + "TaskID: "example5-58ff-46c9-ae05-543f8example", + "TaskDefFamily": "hello_world", + "TaskDefVersion": "8", +} + */ +static int process_container_response(struct flb_filter_ecs *ctx, + msgpack_object container, + struct flb_ecs_task_metadata task_meta) +{ + int ret; + int found_id = FLB_FALSE; + int found_ecs_name = FLB_FALSE; + int found_docker_name = FLB_FALSE; + int i; + int len; + struct flb_ecs_metadata_buffer *cont_meta_buf; + msgpack_object key; + msgpack_object val; + msgpack_sbuffer tmp_sbuf; + msgpack_packer tmp_pck; + flb_sds_t short_id = NULL; + + /* + * We copy the metadata response to a new buffer + * So we can define the metadata key names + */ + msgpack_sbuffer_init(&tmp_sbuf); + msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); + + /* 3 container metadata keys, 4 for instance/cluster, 4 for the task */ + msgpack_pack_map(&tmp_pck, 11); + + /* 1st- process/pack the raw container metadata response */ + for (i = 0; i < container.via.map.size; i++) { + key = container.via.map.ptr[i].key; + if (key.type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "Container metadata parsing failed, msgpack key type=%i", + key.type); + continue; + } + + if (key.via.str.size == 8 && strncmp(key.via.str.ptr, "DockerId", 8) == 0) { + val = container.via.map.ptr[i].val; + if (val.type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "metadata parsing: unexpected 'DockerId' value type=%i", + val.type); + msgpack_sbuffer_destroy(&tmp_sbuf); + if (short_id != NULL) { + flb_sds_destroy(short_id); + } + return -1; + } + + /* save the short ID for hash table key */ + short_id = flb_sds_create_len(val.via.str.ptr, 12); + if (!short_id) { + flb_errno(); + msgpack_sbuffer_destroy(&tmp_sbuf); + return -1; + } + + found_id = FLB_TRUE; + msgpack_pack_str(&tmp_pck, 11); + msgpack_pack_str_body(&tmp_pck, + "ContainerID", + 11); + msgpack_pack_str(&tmp_pck, (int) val.via.str.size); + msgpack_pack_str_body(&tmp_pck, + val.via.str.ptr, + (int) val.via.str.size); + } + else if (key.via.str.size == 10 && strncmp(key.via.str.ptr, "DockerName", 10) == 0) { + val = container.via.map.ptr[i].val; + if (val.type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "metadata parsing: unexpected 'DockerName' value type=%i", + val.type); + msgpack_sbuffer_destroy(&tmp_sbuf); + if (short_id != NULL) { + flb_sds_destroy(short_id); + } + return -1; + } + + /* first pack the ARN */ + found_docker_name = FLB_TRUE; + msgpack_pack_str(&tmp_pck, 19); + msgpack_pack_str_body(&tmp_pck, + "DockerContainerName", + 19); + msgpack_pack_str(&tmp_pck, (int) val.via.str.size); + msgpack_pack_str_body(&tmp_pck, + val.via.str.ptr, + (int) val.via.str.size); + } else if (key.via.str.size == 4 && strncmp(key.via.str.ptr, "Name", 4) == 0) { + val = container.via.map.ptr[i].val; + if (val.type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "metadata parsing: unexpected 'Name' value type=%i", + val.type); + msgpack_sbuffer_destroy(&tmp_sbuf); + if (short_id != NULL) { + flb_sds_destroy(short_id); + } + return -1; + } + + found_ecs_name = FLB_TRUE; + msgpack_pack_str(&tmp_pck, 13); + msgpack_pack_str_body(&tmp_pck, + "ContainerName", + 13); + msgpack_pack_str(&tmp_pck, (int) val.via.str.size); + msgpack_pack_str_body(&tmp_pck, + val.via.str.ptr, + (int) val.via.str.size); + } + } + + if (found_id == FLB_FALSE) { + flb_plg_error(ctx->ins, "Could not parse Task 'DockerId' from container response"); + msgpack_sbuffer_destroy(&tmp_sbuf); + return -1; + } + if (found_docker_name == FLB_FALSE) { + flb_plg_error(ctx->ins, "Could not parse 'DockerName' from container response"); + msgpack_sbuffer_destroy(&tmp_sbuf); + if (short_id != NULL) { + flb_sds_destroy(short_id); + } + return -1; + } + if (found_ecs_name == FLB_FALSE) { + flb_plg_error(ctx->ins, "Could not parse 'Name' from container response"); + msgpack_sbuffer_destroy(&tmp_sbuf); + if (short_id != NULL) { + flb_sds_destroy(short_id); + } + return -1; + } + + /* 2nd - Add the task fields from the task_meta temp buf we were given */ + msgpack_pack_str(&tmp_pck, 13); + msgpack_pack_str_body(&tmp_pck, + "TaskDefFamily", + 13); + msgpack_pack_str(&tmp_pck, task_meta.task_def_family_len); + msgpack_pack_str_body(&tmp_pck, + task_meta.task_def_family, + task_meta.task_def_family_len); + + msgpack_pack_str(&tmp_pck, 7); + msgpack_pack_str_body(&tmp_pck, + "TaskARN", + 7); + msgpack_pack_str(&tmp_pck, task_meta.task_arn_len); + msgpack_pack_str_body(&tmp_pck, + task_meta.task_arn, + task_meta.task_arn_len); + msgpack_pack_str(&tmp_pck, 6); + msgpack_pack_str_body(&tmp_pck, + "TaskID", + 6); + msgpack_pack_str(&tmp_pck, task_meta.task_id_len); + msgpack_pack_str_body(&tmp_pck, + task_meta.task_id, + task_meta.task_id_len); + + msgpack_pack_str(&tmp_pck, 14); + msgpack_pack_str_body(&tmp_pck, + "TaskDefVersion", + 14); + msgpack_pack_str(&tmp_pck, task_meta.task_def_version_len); + msgpack_pack_str_body(&tmp_pck, + task_meta.task_def_version, + task_meta.task_def_version_len); + + /* 3rd - Add the static cluster fields from the plugin context */ + msgpack_pack_str(&tmp_pck, 11); + msgpack_pack_str_body(&tmp_pck, + "ClusterName", + 11); + len = flb_sds_len(ctx->cluster_metadata.cluster_name); + msgpack_pack_str(&tmp_pck, len); + msgpack_pack_str_body(&tmp_pck, + ctx->cluster_metadata.cluster_name, + len); + + msgpack_pack_str(&tmp_pck, 20); + msgpack_pack_str_body(&tmp_pck, + "ContainerInstanceArn", + 20); + len = flb_sds_len(ctx->cluster_metadata.container_instance_arn); + msgpack_pack_str(&tmp_pck, len); + msgpack_pack_str_body(&tmp_pck, + ctx->cluster_metadata.container_instance_arn, + len); + + msgpack_pack_str(&tmp_pck, 19); + msgpack_pack_str_body(&tmp_pck, + "ContainerInstanceID", + 19); + len = flb_sds_len(ctx->cluster_metadata.container_instance_id); + msgpack_pack_str(&tmp_pck, len); + msgpack_pack_str_body(&tmp_pck, + ctx->cluster_metadata.container_instance_id, + len); + + msgpack_pack_str(&tmp_pck, 15); + msgpack_pack_str_body(&tmp_pck, + "ECSAgentVersion", + 15); + len = flb_sds_len(ctx->cluster_metadata.ecs_agent_version); + msgpack_pack_str(&tmp_pck, len); + msgpack_pack_str_body(&tmp_pck, + ctx->cluster_metadata.ecs_agent_version, + len); + + cont_meta_buf = flb_calloc(1, sizeof(struct flb_ecs_metadata_buffer)); + if (!cont_meta_buf) { + flb_errno(); + msgpack_sbuffer_destroy(&tmp_sbuf); + flb_sds_destroy(short_id); + return -1; + } + + cont_meta_buf->buf = tmp_sbuf.data; + cont_meta_buf->size = tmp_sbuf.size; + + ret = flb_ecs_metadata_buffer_init(ctx, cont_meta_buf); + if (ret < 0) { + flb_plg_error(ctx->ins, "Could not init metadata buffer from container response"); + msgpack_sbuffer_destroy(&tmp_sbuf); + flb_free(cont_meta_buf); + flb_sds_destroy(short_id); + return -1; + } + cont_meta_buf->id = short_id; + mk_list_add(&cont_meta_buf->_head, &ctx->metadata_buffers); + + /* + * Size is set to 0 so the table just stores our pointer + * Otherwise it will try to copy the memory to a new buffer + */ + ret = flb_hash_add(ctx->container_hash_table, + short_id, strlen(short_id), + cont_meta_buf, 0); + + if (ret == -1) { + flb_plg_error(ctx->ins, "Could not add container ID %s to metadata hash table", + short_id); + flb_ecs_metadata_buffer_destroy(cont_meta_buf); + } else { + ret = 0; + flb_plg_debug(ctx->ins, "Added `%s` to container metadata hash table", + short_id); + } + return ret; +} + +/* + * Gets the container and task metadata for a task via a container's + * 12 char short ID. This can be used with the ECS Agent + * Introspection API: http://localhost:51678/v1/tasks?dockerid={short_id} + * Entries in the hash table will be added for all containers in the task + */ +static int get_task_metadata(struct flb_filter_ecs *ctx, char* short_id) +{ + struct flb_http_client *c; + struct flb_upstream_conn *u_conn; + int ret; + int root_type; + int found_task = FLB_FALSE; + int found_version = FLB_FALSE; + int found_family = FLB_FALSE; + int found_containers = FLB_FALSE; + int i; + int k; + char *buffer; + size_t size; + size_t b_sent; + size_t off = 0; + msgpack_unpacked result; + msgpack_object root; + msgpack_object key; + msgpack_object val; + msgpack_object container; + flb_sds_t tmp; + flb_sds_t http_path; + flb_sds_t task_id = NULL; + struct flb_ecs_task_metadata task_meta; + + tmp = flb_sds_create_size(64); + if (!tmp) { + return -1; + } + http_path = flb_sds_printf(&tmp, FLB_ECS_FILTER_TASK_PATH_FORMAT, short_id); + if (!http_path) { + flb_sds_destroy(tmp); + return -1; + } + + u_conn = flb_upstream_conn_get(ctx->ecs_upstream); + + if (!u_conn) { + flb_plg_error(ctx->ins, "ECS agent introspection endpoint connection error"); + flb_sds_destroy(http_path); + return -1; + } + + /* Compose HTTP Client request*/ + if (plugin_under_test() == FLB_TRUE) { + c = mock_http_call("TEST_TASK_ERROR", "Task"); + ret = 0; + } + else { + c = flb_http_client(u_conn, FLB_HTTP_GET, + http_path, + NULL, 0, + FLB_ECS_FILTER_HOST, FLB_ECS_FILTER_PORT, + NULL, 0); + flb_http_buffer_size(c, 0); /* 0 means unlimited */ + + flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); + + ret = flb_http_do(c, &b_sent); + flb_plg_debug(ctx->ins, "http_do=%i, " + "HTTP Status: %i", + ret, c->resp.status); + } + + if (ret != 0 || c->resp.status != 200) { + if (c->resp.payload_size > 0) { + flb_plg_warn(ctx->ins, "Failed to get metadata from %s, will retry", + http_path); + flb_plg_debug(ctx->ins, "HTTP response\n%s", + c->resp.payload); + } else { + flb_plg_warn(ctx->ins, "%s response status was %d with no payload, will retry", + http_path, + c->resp.status); + } + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + flb_sds_destroy(http_path); + return -1; + } + + ret = flb_pack_json(c->resp.payload, c->resp.payload_size, + &buffer, &size, &root_type); + + if (ret < 0) { + flb_plg_warn(ctx->ins, "Could not parse response from %s; response=\n%s", + http_path, c->resp.payload); + flb_sds_destroy(http_path); + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + return -1; + } + + /* parse metadata response */ + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, buffer, size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + flb_plg_error(ctx->ins, "Cannot unpack %s response to find metadata\n%s", + http_path, c->resp.payload); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + flb_sds_destroy(http_path); + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + return -1; + } + + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + + root = result.data; + if (root.type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "%s response parsing failed, msgpack_type=%i", + http_path, + root.type); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + flb_sds_destroy(http_path); + return -1; + } + + /* +Metadata Response: +{ + "Arn": "arn:aws:ecs:us-west-2:012345678910:task/default/e01d58a8-151b-40e8-bc01-22647b9ecfec", + "Containers": [ + { + "DockerId": "79c796ed2a7f864f485c76f83f3165488097279d296a7c05bd5201a1c69b2920", + "DockerName": "ecs-nginx-efs-2-nginx-9ac0808dd0afa495f001", + "Name": "nginx" + } + ], + "DesiredStatus": "RUNNING", + "Family": "nginx-efs", + "KnownStatus": "RUNNING", + "Version": "2" +} + */ + + for (i = 0; i < root.via.map.size; i++) { + key = root.via.map.ptr[i].key; + if (key.type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "%s response parsing failed, msgpack key type=%i", + http_path, + key.type); + continue; + } + + if (key.via.str.size == 6 && strncmp(key.via.str.ptr, "Family", 6) == 0) { + val = root.via.map.ptr[i].val; + if (val.type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "metadata parsing: unexpected 'Family' value type=%i", + val.type); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + flb_sds_destroy(http_path); + if (task_id) { + flb_sds_destroy(task_id); + } + return -1; + } + found_family = FLB_TRUE; + task_meta.task_def_family = val.via.str.ptr; + task_meta.task_def_family_len = (int) val.via.str.size; + } + else if (key.via.str.size == 3 && strncmp(key.via.str.ptr, "Arn", 3) == 0) { + val = root.via.map.ptr[i].val; + if (val.type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "metadata parsing: unexpected 'Arn' value type=%i", + val.type); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + flb_sds_destroy(http_path); + if (task_id) { + flb_sds_destroy(task_id); + } + return -1; + } + /* first get the ARN */ + found_task = FLB_TRUE; + task_meta.task_arn = val.via.str.ptr; + task_meta.task_arn_len = (int) val.via.str.size; + + /* then get the ID */ + task_id = parse_id_from_arn(val.via.str.ptr, (int) val.via.str.size); + if (task_id == NULL) { + flb_plg_error(ctx->ins, "metadata parsing: failed to get ID from %.*s", + (int) val.via.str.size, val.via.str.ptr); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + flb_sds_destroy(http_path); + if (task_id) { + flb_sds_destroy(task_id); + } + return -1; + } + + task_meta.task_id = task_id; + task_meta.task_id_len = flb_sds_len(task_id); + } else if (key.via.str.size == 7 && strncmp(key.via.str.ptr, "Version", 7) == 0) { + val = root.via.map.ptr[i].val; + if (val.type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "metadata parsing: unexpected 'Version' value type=%i", + val.type); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + flb_sds_destroy(http_path); + if (task_id) { + flb_sds_destroy(task_id); + } + return -1; + } + found_version = FLB_TRUE; + task_meta.task_def_version = val.via.str.ptr; + task_meta.task_def_version_len = (int) val.via.str.size; + } else if (key.via.str.size == 10 && strncmp(key.via.str.ptr, "Containers", 10) == 0) { + val = root.via.map.ptr[i].val; + if (val.type != MSGPACK_OBJECT_ARRAY ) { + flb_plg_error(ctx->ins, "metadata parsing: unexpected 'Containers' value type=%i", + val.type); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + flb_sds_destroy(http_path); + if (task_id) { + flb_sds_destroy(task_id); + } + return -1; + } + found_containers = FLB_TRUE; + } + } + + if (found_task == FLB_FALSE) { + flb_plg_error(ctx->ins, "Could not parse Task 'Arn' from %s response", + http_path); + flb_sds_destroy(http_path); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + return -1; + } + if (found_family == FLB_FALSE) { + flb_plg_error(ctx->ins, "Could not parse 'Family' from %s response", + http_path); + flb_sds_destroy(http_path); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + if (task_id) { + flb_sds_destroy(task_id); + } + return -1; + } + if (found_version == FLB_FALSE) { + flb_plg_error(ctx->ins, "Could not parse 'Version' from %s response", + http_path); + flb_sds_destroy(http_path); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + if (task_id) { + flb_sds_destroy(task_id); + } + return -1; + } + if (found_containers == FLB_FALSE) { + flb_plg_error(ctx->ins, "Could not parse 'Containers' from %s response", + http_path); + flb_sds_destroy(http_path); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + if (task_id) { + flb_sds_destroy(task_id); + } + return -1; + } + + /* + * Process metadata response a 2nd time to get the Containers list + * This is because we need one complete metadata buf per container + * with all task metadata. So we collect task before we process containers. + */ + for (i = 0; i < root.via.map.size; i++) { + key = root.via.map.ptr[i].key; + if (key.type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "%s response parsing failed, msgpack key type=%i", + http_path, + key.type); + continue; + } + + if (key.via.str.size == 10 && strncmp(key.via.str.ptr, "Containers", 10) == 0) { + val = root.via.map.ptr[i].val; + if (val.type != MSGPACK_OBJECT_ARRAY ) { + flb_plg_error(ctx->ins, "metadata parsing: unexpected 'Containers' value type=%i", + val.type); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + flb_sds_destroy(http_path); + flb_sds_destroy(task_id); + return -1; + } + + /* iterate through list of containers and process them*/ + for (k = 0; k < val.via.array.size; k++) { + container = val.via.array.ptr[k]; + if (container.type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "metadata parsing: unexpected 'Containers[%d]' inner value type=%i", + k, + container.type); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + flb_sds_destroy(http_path); + flb_sds_destroy(task_id); + return -1; + } + ret = process_container_response(ctx, container, task_meta); + if (ret < 0) { + flb_plg_error(ctx->ins, "metadata parsing: failed to parse 'Containers[%d]'", + k); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + flb_sds_destroy(http_path); + flb_sds_destroy(task_id); + return -1; + } + } + } + } + + flb_free(buffer); + msgpack_unpacked_destroy(&result); + flb_sds_destroy(task_id); + flb_sds_destroy(http_path); + return 0; +} + +static int get_metadata_by_id(struct flb_filter_ecs *ctx, + const char *tag, int tag_len, + struct flb_ecs_metadata_buffer **metadata_buffer) +{ + flb_sds_t container_short_id = NULL; + const char *tmp; + int ret; + size_t size; + + if (ctx->ecs_tag_prefix_len + 12 > tag_len) { + flb_plg_error(ctx->ins, "Tag '%s' length check failed: tag is expected " + "to be or be prefixed with '{ecs_tag_prefix}{12 character container short ID}'", + tag); + return -1; + } + + ret = strncmp(ctx->ecs_tag_prefix, tag, ctx->ecs_tag_prefix_len); + if (ret != 0) { + flb_plg_error(ctx->ins, "Tag '%s' is not prefixed with ecs_tag_prefix '%s'", + tag, ctx->ecs_tag_prefix); + return -1; + } + + tmp = tag + ctx->ecs_tag_prefix_len; + container_short_id = flb_sds_create_len(tmp, 12); + if (!container_short_id) { + flb_errno(); + return -1; + } + + /* get metadata for this container */ + ret = flb_hash_get(ctx->container_hash_table, + container_short_id, flb_sds_len(container_short_id), + (void *) metadata_buffer, &size); + + if (ret == -1) { + /* try fetch metadata */ + ret = get_task_metadata(ctx, container_short_id); + if (ret < 0) { + flb_plg_error(ctx->ins, "Requesting metadata from ECS Agent introspection endpoint failed"); + flb_sds_destroy(container_short_id); + return -1; + } + /* get from hash table */ + ret = flb_hash_get(ctx->container_hash_table, + container_short_id, flb_sds_len(container_short_id), + (void *) metadata_buffer, &size); + } + + flb_sds_destroy(container_short_id); + return ret; +} + +static void clean_old_metadata_buffers(struct flb_filter_ecs *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct flb_ecs_metadata_buffer *buf; + time_t now = time(NULL); + + mk_list_foreach_safe(head, tmp, &ctx->metadata_buffers) { + buf = mk_list_entry(head, struct flb_ecs_metadata_buffer, _head); + if (now > (buf->last_used_time + ctx->ecs_meta_cache_ttl)) { + flb_plg_debug(ctx->ins, "cleaning buffer: now=%ld, ttl=%ld, last_used_time=%ld", + now, ctx->ecs_meta_cache_ttl, buf->last_used_time); + mk_list_del(&buf->_head); + flb_hash_del(ctx->container_hash_table, buf->id); + flb_ecs_metadata_buffer_destroy(buf); + } + } +} + +static int cb_ecs_filter(const void *data, size_t bytes, + const char *tag, int tag_len, + void **out_buf, size_t *out_size, + struct flb_filter_instance *f_ins, + struct flb_input_instance *i_ins, + void *context, + struct flb_config *config) +{ + struct flb_filter_ecs *ctx = context; + (void) f_ins; + (void) i_ins; + (void) config; + size_t off = 0; + int i = 0; + int ret; + int len; + struct flb_time tm; + int total_records; + msgpack_sbuffer tmp_sbuf; + msgpack_packer tmp_pck; + msgpack_unpacked result; + msgpack_object *obj; + msgpack_object_kv *kv; + struct mk_list *tmp; + struct mk_list *head; + struct flb_ecs_metadata_key *metadata_key; + struct flb_ecs_metadata_buffer *metadata_buffer; + flb_sds_t val; + + /* First check that the static cluster metadata has been retrieved */ + if (ctx->has_cluster_metadata == FLB_FALSE) { + ret = get_ecs_cluster_metadata(ctx); + if (ret < 0) { + flb_plg_error(ctx->ins, "Could not retrieve cluster metadata " + "from ECS Agent"); + return FLB_FILTER_NOTOUCH; + } + } + + if (ctx->cluster_metadata_only == FLB_FALSE) { + ret = get_metadata_by_id(ctx, tag, tag_len, &metadata_buffer); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to get ECS Task metadata for %s, " + "falling back to process cluster metadata only. If " + "this is intentional, set `Cluster_Metadata_Only On`", + tag); + return FLB_FILTER_NOTOUCH; + } + } else { + metadata_buffer = &ctx->cluster_meta_buf; + } + + metadata_buffer->last_used_time = time(NULL); + + /* Create temporary msgpack buffer */ + msgpack_sbuffer_init(&tmp_sbuf); + msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); + + /* Iterate over each item */ + msgpack_unpacked_init(&result); + while (msgpack_unpack_next(&result, data, bytes, &off) + == MSGPACK_UNPACK_SUCCESS) { + /* + * Each record is a msgpack array [timestamp, map] of the + * timestamp and record map. We 'unpack' each record, and then re-pack + * it with the new fields added. + */ + + if (result.data.type != MSGPACK_OBJECT_ARRAY) { + flb_plg_error(ctx->ins, "cb_filter buffer wrong type, msgpack_type=%i", + result.data.type); + continue; + } + + /* unpack the array of [timestamp, map] */ + flb_time_pop_from_msgpack(&tm, &result, &obj); + + /* obj should now be the record map */ + if (obj->type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "Record wrong type, msgpack_type=%i", + obj->type); + continue; + } + + /* re-pack the array into a new buffer */ + msgpack_pack_array(&tmp_pck, 2); + flb_time_append_to_msgpack(&tm, &tmp_pck, 0); + + /* new record map size is old size + the new keys we will add */ + total_records = obj->via.map.size + ctx->metadata_keys_len; + msgpack_pack_map(&tmp_pck, total_records); + + /* iterate through the old record map and add it to the new buffer */ + kv = obj->via.map.ptr; + for(i=0; i < obj->via.map.size; i++) { + msgpack_pack_object(&tmp_pck, (kv+i)->key); + msgpack_pack_object(&tmp_pck, (kv+i)->val); + } + + /* append new keys */ + mk_list_foreach_safe(head, tmp, &ctx->metadata_keys) { + metadata_key = mk_list_entry(head, struct flb_ecs_metadata_key, _head); + val = flb_ra_translate(metadata_key->ra, NULL, 0, + metadata_buffer->obj, NULL); + if (!val) { + flb_plg_error(ctx->ins, "Translation failed for %s : %s", + metadata_key->key, metadata_key->template); + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&tmp_sbuf); + return FLB_FILTER_NOTOUCH; + } + len = flb_sds_len(metadata_key->key); + msgpack_pack_str(&tmp_pck, len); + msgpack_pack_str_body(&tmp_pck, + metadata_key->key, + len); + len = flb_sds_len(val); + msgpack_pack_str(&tmp_pck, len); + msgpack_pack_str_body(&tmp_pck, + val, + len); + flb_sds_destroy(val); + } + } + msgpack_unpacked_destroy(&result); + + if (ctx->cluster_metadata_only == FLB_FALSE) { + clean_old_metadata_buffers(ctx); + } + + /* link new buffers */ + *out_buf = tmp_sbuf.data; + *out_size = tmp_sbuf.size; + return FLB_FILTER_MODIFIED; +} + +static void flb_ecs_metadata_key_destroy(struct flb_ecs_metadata_key *metadata_key) +{ + if (metadata_key) { + if (metadata_key->key) { + flb_sds_destroy(metadata_key->key); + } + if (metadata_key->template) { + flb_sds_destroy(metadata_key->template); + } + if (metadata_key->ra) { + flb_ra_destroy(metadata_key->ra); + } + flb_free(metadata_key); + } +} + +static void flb_filter_ecs_destroy(struct flb_filter_ecs *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct flb_ecs_metadata_key *metadata_key; + struct flb_ecs_metadata_buffer *buf; + + if (ctx) { + if (ctx->ecs_upstream) { + flb_upstream_destroy(ctx->ecs_upstream); + } + if (ctx->cluster_metadata.cluster_name) { + flb_sds_destroy(ctx->cluster_metadata.cluster_name); + } + if (ctx->cluster_metadata.container_instance_arn) { + flb_sds_destroy(ctx->cluster_metadata.container_instance_arn); + } + if (ctx->cluster_metadata.container_instance_id) { + flb_sds_destroy(ctx->cluster_metadata.container_instance_id); + } + if (ctx->cluster_metadata.ecs_agent_version) { + flb_sds_destroy(ctx->cluster_metadata.ecs_agent_version); + } + if (ctx->cluster_meta_buf.buf) { + flb_free(ctx->cluster_meta_buf.buf); + msgpack_unpacked_destroy(&ctx->cluster_meta_buf.unpacked); + } + mk_list_foreach_safe(head, tmp, &ctx->metadata_keys) { + metadata_key = mk_list_entry(head, struct flb_ecs_metadata_key, _head); + mk_list_del(&metadata_key->_head); + flb_ecs_metadata_key_destroy(metadata_key); + } + mk_list_foreach_safe(head, tmp, &ctx->metadata_buffers) { + buf = mk_list_entry(head, struct flb_ecs_metadata_buffer, _head); + mk_list_del(&buf->_head); + flb_hash_del(ctx->container_hash_table, buf->id); + flb_ecs_metadata_buffer_destroy(buf); + } + if (ctx->container_hash_table) { + flb_hash_destroy(ctx->container_hash_table); + } + flb_free(ctx); + } +} + +static int cb_ecs_exit(void *data, struct flb_config *config) +{ + struct flb_filter_ecs *ctx = data; + + flb_filter_ecs_destroy(ctx); + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + + { + FLB_CONFIG_MAP_STR, "add", NULL, + FLB_CONFIG_MAP_MULT, FLB_FALSE, 0, + "Add a metadata key/value pair with the given key and given value from the given template. " + "Format is `Add KEY TEMPLATE`." + }, + + { + FLB_CONFIG_MAP_STR, "ecs_tag_prefix", "", + 0, FLB_TRUE, offsetof(struct flb_filter_ecs, ecs_tag_prefix), + "This filter must obtain the 12 character container short ID to query " + "for ECS Task metadata. The filter removes the prefx from the tag and then assumes " + "the next 12 characters are the short container ID. If the container short ID, " + "is not found in the tag, the filter can/must fallback to only attaching cluster metadata " + "(cluster name, container instance ID/ARN, and ECS Agent version)." + }, + + { + FLB_CONFIG_MAP_BOOL, "cluster_metadata_only", "false", + 0, FLB_TRUE, offsetof(struct flb_filter_ecs, cluster_metadata_only), + "Only attempt to attach the cluster related metadata to logs " + "(cluster name, container instance ID/ARN, and ECS Agent version). " + "With this option off, if this filter can not obtain the task metadata for a log, it will " + "output errors. Use this option if you have logs that are not part of an " + "ECS task (ex: Docker Daemon logs)." + }, + + { + FLB_CONFIG_MAP_TIME, "ecs_meta_cache_ttl", "3600", + 0, FLB_TRUE, offsetof(struct flb_filter_ecs, ecs_meta_cache_ttl), + "Configurable TTL for cached ECS Task Metadata. Default 3600s (1 hour)" + "For example, set this value to 600 or 600s or 10m and cache entries " + "which have been created more than 10 minutes will be evicted." + "Cache eviction is needed to purge task metadata for tasks that " + "have been stopped." + }, + + {0} +}; + +struct flb_filter_plugin filter_ecs_plugin = { + .name = "ecs", + .description = "Add AWS ECS Metadata", + .cb_init = cb_ecs_init, + .cb_filter = cb_ecs_filter, + .cb_exit = cb_ecs_exit, + .config_map = config_map, + .flags = 0 +}; diff --git a/plugins/filter_ecs/ecs.h b/plugins/filter_ecs/ecs.h new file mode 100644 index 00000000000..9ca4a9bd6f7 --- /dev/null +++ b/plugins/filter_ecs/ecs.h @@ -0,0 +1,138 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_FILTER_ECS_H +#define FLB_FILTER_ECS_H + +#include +#include +#include +#include +#include + +#define FLB_ECS_FILTER_HOST "127.0.0.1" +#define FLB_ECS_FILTER_PORT 51678 +#define FLB_ECS_FILTER_CLUSTER_PATH "/v1/metadata" +#define FLB_ECS_FILTER_TASK_PATH_FORMAT "/v1/tasks?dockerid=%s" + +/* + * Kubernetes recommends not running more than 110 pods per node + * In ECS, the number of tasks per instance will vary considerably + * But this should be a very safe starting size for the table + * Since we use the TTL hash table there is no max size. + */ +#define FLB_ECS_FILTER_HASH_TABLE_SIZE 100 + + +struct flb_ecs_metadata_key { + flb_sds_t key; + flb_sds_t template; + struct flb_record_accessor *ra; + + struct mk_list _head; +}; + +struct flb_ecs_metadata_buffer { + /* msgpack_sbuffer */ + char *buf; + size_t size; + + /* unpacked object to use with flb_ra_translate */ + msgpack_unpacked unpacked; + msgpack_object obj; + int free_packer; + + /* the hash table only stores a pointer- we need the list to track and free these */ + struct mk_list _head; + /* we clean up the memory for these once ecs_meta_cache_ttl has expired */ + time_t last_used_time; + + /* + * To remove from the hash table on TTL expiration, we need the ID + * While we use a TTL hash, it won't clean up the memory, so we have a separate routine for that + * and it needs to ensure that the list and hash table has the same contents + */ + flb_sds_t id; +}; + +struct flb_ecs_cluster_metadata { + flb_sds_t cluster_name; + flb_sds_t container_instance_arn; + flb_sds_t container_instance_id; + flb_sds_t ecs_agent_version; +}; + +/* + * The ECS Agent task response gives us both task & container at the same time + * We need a temporary structure to organize the task metadata + * Before we create the final flb_ecs_metadata_buffer objects with all metadata + * So this struct just stores tmp pointers to the deserialized msgpack + */ +struct flb_ecs_task_metadata { + const char* task_arn; + int task_arn_len; + const char *task_id; + int task_id_len; + const char *task_def_family; + int task_def_family_len; + const char *task_def_version; + int task_def_version_len; +}; + +struct flb_filter_ecs { + /* upstream connection to ECS Agent */ + struct flb_upstream *ecs_upstream; + + /* Filter plugin instance reference */ + struct flb_filter_instance *ins; + + struct mk_list metadata_keys; + int metadata_keys_len; + + /* + * This field is used when we build new container metadata objects + */ + struct flb_ecs_cluster_metadata cluster_metadata; + int has_cluster_metadata; + /* + * If looking up the container fails, we should still always be able to + * attach cluster metadata. So we have a fallback metadata buffer for that. + * For example, users may want to attach cluster name to Docker Daemon logs, + * even though Docker is not an AWS ECS Task/container. + */ + struct flb_ecs_metadata_buffer cluster_meta_buf; + + /* + * Maps 12 char container short ID to metadata buffer + */ + struct flb_hash *container_hash_table; + + /* + * The hash table only stores pointers, so we keep a list of meta objects + * that need to be freed + */ + struct mk_list metadata_buffers; + + int ecs_meta_cache_ttl; + char *ecs_tag_prefix; + int ecs_tag_prefix_len; + int cluster_metadata_only; +}; + +#endif diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index 44259d8b2a0..14dfdeee717 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -73,6 +73,7 @@ if(FLB_IN_LIB AND FLB_OUT_LIB) if (FLB_FILTER_WASM) FLB_RT_TEST(FLB_FILTER_WASM "filter_wasm.c") endif () + FLB_RT_TEST(FLB_FILTER_ECS "filter_ecs.c") endif() diff --git a/tests/runtime/filter_ecs.c b/tests/runtime/filter_ecs.c new file mode 100644 index 00000000000..19b46379388 --- /dev/null +++ b/tests/runtime/filter_ecs.c @@ -0,0 +1,372 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include +#include "flb_tests_runtime.h" + +#define ERROR_RESPONSE "NOT FOUND" + + +struct filter_test { + flb_ctx_t *flb; /* Fluent Bit library context */ + int i_ffd; /* Input fd */ + int f_ffd; /* Filter fd */ +}; + +struct filter_test_result { + char *expected_pattern; /* string that must occur in output */ + int expected_pattern_index; /* which record to check for the pattern */ + int expected_records; /* expected number of outputted records */ + int actual_records; /* actual number of outputted records */ +}; + +/* Callback to check expected results */ +static int cb_check_result(void *record, size_t size, void *data) +{ + char *p; + struct filter_test_result *expected; + char *result; + + expected = (struct filter_test_result *) data; + result = (char *) record; + + if (expected->expected_pattern_index == expected->actual_records) { + p = strstr(result, expected->expected_pattern); + TEST_CHECK(p != NULL); + + if (!p) { + flb_error("Expected to find: '%s' in result '%s'", + expected->expected_pattern, result); + } + /* + * If you want to debug your test + * + * printf("Expect: '%s' in result '%s'\n", expected->expected_pattern, result); + */ + } + + expected->actual_records++; + + flb_free(record); + return 0; +} + + +struct str_list { + size_t size; /* size of lists */ + int ignore_min_line_num; /* ignore line if the length is less than this value */ + char **lists; /* string lists */ +}; + + +static struct filter_test *filter_test_create(struct flb_lib_out_cb *data, + char *tag) +{ + int i_ffd; + int f_ffd; + int o_ffd; + struct filter_test *ctx; + + ctx = flb_malloc(sizeof(struct filter_test)); + if (!ctx) { + flb_errno(); + return NULL; + } + + /* Service config */ + ctx->flb = flb_create(); + flb_service_set(ctx->flb, + "Flush", "0.200000000", + "Grace", "1", + NULL); + + /* Input */ + i_ffd = flb_input(ctx->flb, (char *) "lib", NULL); + TEST_CHECK(i_ffd >= 0); + /* filter relies on the tag container 12 char short container ID */ + flb_input_set(ctx->flb, i_ffd, "tag", tag, NULL); + ctx->i_ffd = i_ffd; + + /* Filter configuration */ + f_ffd = flb_filter(ctx->flb, (char *) "ecs", NULL); + TEST_CHECK(f_ffd >= 0); + flb_filter_set(ctx->flb, f_ffd, "match", "*", NULL); + ctx->f_ffd = f_ffd; + + /* Output */ + o_ffd = flb_output(ctx->flb, (char *) "lib", (void *) data); + TEST_CHECK(o_ffd >= 0); + flb_output_set(ctx->flb, o_ffd, + "match", "*", + "format", "json", + NULL); + + return ctx; +} + +static void filter_test_destroy(struct filter_test *ctx) +{ + flb_stop(ctx->flb); + flb_destroy(ctx->flb); + flb_free(ctx); +} + +static void flb_test_ecs_filter() +{ + int len; + int ret; + int bytes; + char *p; + struct flb_lib_out_cb cb_data; + struct filter_test *ctx; + struct filter_test_result expected = { 0 }; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_ECS_PLUGIN_UNDER_TEST", "true", 1); + + /* Create test context */ + ctx = filter_test_create((void *) &cb_data, "testprefix-79c796ed2a7f"); + if (!ctx) { + exit(EXIT_FAILURE); + } + + /* Configure filter */ + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "ecs_tag_prefix", "testprefix-", + "ADD", "resource $ClusterName.$TaskID.$ContainerName", + NULL); + TEST_CHECK(ret == 0); + + /* Prepare output callback with expected result */ + expected.expected_records = 1; /* 1 record with metadata added */ + expected.expected_pattern = "cluster_name.e01d58a8-151b-40e8-bc01-22647b9ecfec.nginx"; + expected.expected_pattern_index = 0; + cb_data.cb = cb_check_result; + cb_data.data = (void *) &expected; + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Ingest data samples */ + p = "[0, {\"log\":\"error: my error\"}]"; + len = strlen(p); + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, p, len); + TEST_CHECK(bytes == len); + + /* check number of outputted records */ + sleep(2); + TEST_CHECK(expected.actual_records == expected.expected_records); + filter_test_destroy(ctx); +} + +static void flb_test_ecs_filter_no_prefix() +{ + int len; + int ret; + int bytes; + char *p; + struct flb_lib_out_cb cb_data; + struct filter_test *ctx; + struct filter_test_result expected = { 0 }; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_ECS_PLUGIN_UNDER_TEST", "true", 1); + + /* Create test context */ + ctx = filter_test_create((void *) &cb_data, "79c796ed2a7f"); + if (!ctx) { + exit(EXIT_FAILURE); + } + + /* Configure filter */ + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "ecs_tag_prefix", "", + "ADD", "resource $ClusterName.$TaskID.$ContainerName", + NULL); + TEST_CHECK(ret == 0); + + /* Prepare output callback with expected result */ + expected.expected_records = 1; /* 1 record with metadata added */ + expected.expected_pattern = "cluster_name.e01d58a8-151b-40e8-bc01-22647b9ecfec.nginx"; + expected.expected_pattern_index = 0; + cb_data.cb = cb_check_result; + cb_data.data = (void *) &expected; + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Ingest data samples */ + p = "[0, {\"log\":\"error: my error\"}]"; + len = strlen(p); + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, p, len); + TEST_CHECK(bytes == len); + + /* check number of outputted records */ + sleep(2); + TEST_CHECK(expected.actual_records == expected.expected_records); + filter_test_destroy(ctx); +} + +static void flb_test_ecs_filter_cluster_metadata_only() +{ + int len; + int ret; + int bytes; + char *p; + struct flb_lib_out_cb cb_data; + struct filter_test *ctx; + struct filter_test_result expected = { 0 }; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_ECS_PLUGIN_UNDER_TEST", "true", 1); + + /* Create test context */ + ctx = filter_test_create((void *) &cb_data, "var.lib.ecs.79c796ed2a7f"); + if (!ctx) { + exit(EXIT_FAILURE); + } + + /* Configure filter */ + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "ecs_tag_prefix", "", + "cluster_metadata_only", "on", + /* only cluster value will be populated */ + "ADD", "resource $ClusterName.$TaskID.$ContainerName", + NULL); + TEST_CHECK(ret == 0); + + /* Prepare output callback with expected result */ + expected.expected_records = 1; /* 1 record with only cluster metadata values added */ + expected.expected_pattern = "cluster_name.."; + expected.expected_pattern_index = 0; + cb_data.cb = cb_check_result; + cb_data.data = (void *) &expected; + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Ingest data samples */ + p = "[0, {\"log\":\"error: my error\"}]"; + len = strlen(p); + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, p, len); + TEST_CHECK(bytes == len); + + /* check number of outputted records */ + sleep(2); + TEST_CHECK(expected.actual_records == expected.expected_records); + filter_test_destroy(ctx); +} + +static void flb_test_ecs_filter_cluster_error() +{ + int len; + int ret; + int bytes; + char *p; + struct flb_lib_out_cb cb_data; + struct filter_test *ctx; + struct filter_test_result expected = { 0 }; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_ECS_PLUGIN_UNDER_TEST", "true", 1); + setenv("TEST_CLUSTER_ERROR", ERROR_RESPONSE, 1); + + /* Create test context */ + ctx = filter_test_create((void *) &cb_data, "79c796ed2a7f"); + if (!ctx) { + exit(EXIT_FAILURE); + } + + /* Configure filter */ + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "ecs_tag_prefix", "", + "ADD", "resource $ClusterName.$TaskID.$ContainerName", + NULL); + TEST_CHECK(ret == 0); + + /* this test is mainly for leak checking on error, not for checking result record */ + expected.expected_records = 1; /* 1 record with no metadata */ + expected.expected_pattern = ""; + expected.expected_pattern_index = 0; + cb_data.cb = cb_check_result; + cb_data.data = (void *) &expected; + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Ingest data samples */ + p = "[0, {\"log\":\"error: my error\"}]"; + len = strlen(p); + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, p, len); + TEST_CHECK(bytes == len); + + /* check number of outputted records */ + sleep(2); + TEST_CHECK(expected.actual_records == expected.expected_records); + filter_test_destroy(ctx); +} + +static void flb_test_ecs_filter_task_error() +{ + int len; + int ret; + int bytes; + char *p; + struct flb_lib_out_cb cb_data; + struct filter_test *ctx; + struct filter_test_result expected = { 0 }; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_ECS_PLUGIN_UNDER_TEST", "true", 1); + setenv("TEST_TASK_ERROR", ERROR_RESPONSE, 1); + + /* Create test context */ + ctx = filter_test_create((void *) &cb_data, "79c796ed2a7f"); + if (!ctx) { + exit(EXIT_FAILURE); + } + + /* Configure filter */ + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "ecs_tag_prefix", "", + "ADD", "resource $ClusterName.$TaskID.$ContainerName", + NULL); + TEST_CHECK(ret == 0); + + /* this test is mainly for leak checking on error, not for checking result record */ + expected.expected_records = 1; /* 1 record with no metadata */ + expected.expected_pattern = ""; + expected.expected_pattern_index = 0; + cb_data.cb = cb_check_result; + cb_data.data = (void *) &expected; + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Ingest data samples */ + p = "[0, {\"log\":\"error: my error\"}]"; + len = strlen(p); + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, p, len); + TEST_CHECK(bytes == len); + + /* check number of outputted records */ + sleep(2); + TEST_CHECK(expected.actual_records == expected.expected_records); + filter_test_destroy(ctx); +} + +TEST_LIST = { + + {"flb_test_ecs_filter" , flb_test_ecs_filter }, + {"flb_test_ecs_filter_no_prefix" , flb_test_ecs_filter_no_prefix }, + {"flb_test_ecs_filter_cluster_metadata_only" , flb_test_ecs_filter_cluster_metadata_only }, + {"flb_test_ecs_filter_cluster_error" , flb_test_ecs_filter_cluster_error }, + {"flb_test_ecs_filter_task_error" , flb_test_ecs_filter_task_error }, + + {NULL, NULL} +};