Skip to content

Commit

Permalink
Merge branch 'aws-fluent-bit-cherry-pick' of github.com:zhihonl/priva…
Browse files Browse the repository at this point in the history
…te-fluent-bit into add-entity
  • Loading branch information
nathalapooja committed Oct 17, 2024
2 parents fac95cb + dc025f8 commit 740a57a
Show file tree
Hide file tree
Showing 17 changed files with 186 additions and 94 deletions.
15 changes: 14 additions & 1 deletion plugins/filter_aws/aws.c
Original file line number Diff line number Diff line change
Expand Up @@ -558,14 +558,22 @@ static int cb_aws_filter(const void *data, size_t bytes,
ctx->availability_zone_len);
}

if (ctx->instance_id_include) {
if (ctx->instance_id_include && !ctx->enable_entity) {
msgpack_pack_str(&tmp_pck, FLB_FILTER_AWS_INSTANCE_ID_KEY_LEN);
msgpack_pack_str_body(&tmp_pck,
FLB_FILTER_AWS_INSTANCE_ID_KEY,
FLB_FILTER_AWS_INSTANCE_ID_KEY_LEN);
msgpack_pack_str(&tmp_pck, ctx->instance_id_len);
msgpack_pack_str_body(&tmp_pck,
ctx->instance_id, ctx->instance_id_len);
} else if (ctx->instance_id_include && ctx->enable_entity) {
msgpack_pack_str(&tmp_pck, FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY_LEN);
msgpack_pack_str_body(&tmp_pck,
FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY,
FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY_LEN);
msgpack_pack_str(&tmp_pck, ctx->instance_id_len);
msgpack_pack_str_body(&tmp_pck,
ctx->instance_id, ctx->instance_id_len);
}

if (ctx->instance_type_include) {
Expand Down Expand Up @@ -740,6 +748,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_filter_aws, hostname_include),
"Enable EC2 instance hostname"
},
{
FLB_CONFIG_MAP_BOOL, "enable_entity", "false",
0, FLB_TRUE, offsetof(struct flb_filter_aws, enable_entity),
"Enable entity prefix for necessary fields"
},
{0}
};

Expand Down
8 changes: 8 additions & 0 deletions plugins/filter_aws/aws.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
#define FLB_FILTER_AWS_AVAILABILITY_ZONE_KEY_LEN 2
#define FLB_FILTER_AWS_INSTANCE_ID_KEY "ec2_instance_id"
#define FLB_FILTER_AWS_INSTANCE_ID_KEY_LEN 15
#define FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY "aws_entity_ec2_instance_id"
#define FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY_LEN 26
#define FLB_FILTER_AWS_INSTANCE_TYPE_KEY "ec2_instance_type"
#define FLB_FILTER_AWS_INSTANCE_TYPE_KEY_LEN 17
#define FLB_FILTER_AWS_PRIVATE_IP_KEY "private_ip"
Expand Down Expand Up @@ -111,6 +113,12 @@ struct flb_filter_aws {
size_t hostname_len;
int hostname_include;

/*
* Enable entity prefix appending. This appends
* 'aws_entity' to relevant keys
*/
int enable_entity;

/* number of new keys added by this plugin */
int new_keys;

Expand Down
71 changes: 12 additions & 59 deletions plugins/filter_kubernetes/kube_meta.c
Original file line number Diff line number Diff line change
Expand Up @@ -1028,27 +1028,6 @@ static int search_item_in_items(struct flb_kube_meta *meta,
return ret;
}

static char* find_fallback_environment(struct flb_kube *ctx, struct flb_kube_meta *meta) {
char *fallback_env = NULL;

/*
* Possible fallback environments:
* 1. eks:cluster-name/namespace
* 2. k8s:cluster-name/namespace
*/
if(ctx->platform == NULL && ctx->set_platform != NULL) {
ctx->platform = flb_strdup(ctx->set_platform);
}
if (ctx->platform != NULL && meta->cluster != NULL && meta->namespace != NULL) {
int ret = asprintf(&fallback_env, "%s:%s/%s", ctx->platform, meta->cluster, meta->namespace);
if (ret == -1) {
return NULL;
}
return fallback_env;
}
return NULL;
}

static int merge_meta_from_tag(struct flb_kube *ctx, struct flb_kube_meta *meta,
char **out_buf, size_t *out_size)
{
Expand Down Expand Up @@ -1260,25 +1239,13 @@ static int merge_meta(struct flb_kube_meta *meta, struct flb_kube *ctx,
}
}
}
int fallback_environment_len = 0;
char *fallback_environment = NULL;
if(ctx->use_pod_association) {
fallback_environment = find_fallback_environment(ctx,meta);
if(fallback_environment) {
fallback_environment_len = strlen(fallback_environment);
}
pod_service_found = flb_hash_get(ctx->pod_hash_table,
meta->podname, meta->podname_len,
&tmp_service_attributes, &tmp_service_attr_size);
if (pod_service_found != -1 && tmp_service_attributes != NULL) {
map_size += tmp_service_attributes->fields;
}
if(pod_service_found != -1 && tmp_service_attributes != NULL && tmp_service_attributes->environment[0] == '\0' && fallback_environment) {
map_size++;
}
if(pod_service_found == -1 && meta->workload != NULL && fallback_environment) {
map_size++;
}
if(ctx->platform) {
map_size++;
}
Expand All @@ -1304,51 +1271,41 @@ static int merge_meta(struct flb_kube_meta *meta, struct flb_kube *ctx,
if(ctx->use_pod_association) {
if (pod_service_found != -1 && tmp_service_attributes != NULL) {
if (tmp_service_attributes->name[0] != '\0') {
msgpack_pack_str(&mp_pck, 12);
msgpack_pack_str_body(&mp_pck, "service_name", 12);
msgpack_pack_str(&mp_pck, 23);
msgpack_pack_str_body(&mp_pck, "aws_entity_service_name", 23);
msgpack_pack_str(&mp_pck, tmp_service_attributes->name_len);
msgpack_pack_str_body(&mp_pck, tmp_service_attributes->name, tmp_service_attributes->name_len);
}
if (tmp_service_attributes->environment[0] != '\0') {
msgpack_pack_str(&mp_pck, 11);
msgpack_pack_str_body(&mp_pck, "environment", 11);
msgpack_pack_str(&mp_pck, 22);
msgpack_pack_str_body(&mp_pck, "aws_entity_environment", 22);
msgpack_pack_str(&mp_pck, tmp_service_attributes->environment_len);
msgpack_pack_str_body(&mp_pck, tmp_service_attributes->environment, tmp_service_attributes->environment_len);
} else if(tmp_service_attributes->environment[0] == '\0' && fallback_environment) {
msgpack_pack_str(&mp_pck, 11);
msgpack_pack_str_body(&mp_pck, "environment", 11);
msgpack_pack_str(&mp_pck, fallback_environment_len);
msgpack_pack_str_body(&mp_pck, fallback_environment, fallback_environment_len);
}
if (tmp_service_attributes->name_source[0] != '\0') {
msgpack_pack_str(&mp_pck, 11);
msgpack_pack_str_body(&mp_pck, "name_source", 11);
msgpack_pack_str(&mp_pck, 22);
msgpack_pack_str_body(&mp_pck, "aws_entity_name_source", 22);
msgpack_pack_str(&mp_pck, tmp_service_attributes->name_source_len);
msgpack_pack_str_body(&mp_pck, tmp_service_attributes->name_source, tmp_service_attributes->name_source_len);
}
} else if ( pod_service_found == -1 && meta->workload != NULL && fallback_environment) {
msgpack_pack_str(&mp_pck, 11);
msgpack_pack_str_body(&mp_pck, "environment", 11);
msgpack_pack_str(&mp_pck, fallback_environment_len);
msgpack_pack_str_body(&mp_pck, fallback_environment, fallback_environment_len);
}

if(ctx->platform != NULL) {
int platform_len = strlen(ctx->platform);
msgpack_pack_str(&mp_pck, 8);
msgpack_pack_str_body(&mp_pck, "platform", 8);
msgpack_pack_str(&mp_pck, 19);
msgpack_pack_str_body(&mp_pck, "aws_entity_platform", 19);
msgpack_pack_str(&mp_pck, platform_len);
msgpack_pack_str_body(&mp_pck, ctx->platform, platform_len);
}
if (meta->cluster != NULL) {
msgpack_pack_str(&mp_pck, 7);
msgpack_pack_str_body(&mp_pck, "cluster", 7);
msgpack_pack_str(&mp_pck, 18);
msgpack_pack_str_body(&mp_pck, "aws_entity_cluster", 18);
msgpack_pack_str(&mp_pck, meta->cluster_len);
msgpack_pack_str_body(&mp_pck, meta->cluster, meta->cluster_len);
}
if (meta->workload != NULL) {
msgpack_pack_str(&mp_pck, 8);
msgpack_pack_str_body(&mp_pck, "workload", 8);
msgpack_pack_str(&mp_pck, 19);
msgpack_pack_str_body(&mp_pck, "aws_entity_workload", 19);
msgpack_pack_str(&mp_pck, meta->workload_len);
msgpack_pack_str_body(&mp_pck, meta->workload, meta->workload_len);
}
Expand Down Expand Up @@ -1460,10 +1417,6 @@ static int merge_meta(struct flb_kube_meta *meta, struct flb_kube *ctx,
*out_buf = mp_sbuf.data;
*out_size = mp_sbuf.size;

if(fallback_environment) {
flb_free(fallback_environment);
}

return 0;
}

Expand Down
Loading

0 comments on commit 740a57a

Please sign in to comment.