Skip to content

Commit

Permalink
out_cloudwatch_logs: add entity scraping logic for cluster and instan…
Browse files Browse the repository at this point in the history
…ce ID (#7)
  • Loading branch information
zhihonl authored Sep 5, 2024
1 parent e2f3f18 commit 8b949f5
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 1 deletion.
25 changes: 24 additions & 1 deletion plugins/filter_kubernetes/kube_meta.c
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,23 @@ static int get_api_server_info(struct flb_kube *ctx,
return 0;
}

/* Gather pods list information from Kubelet */
static void get_cluster_from_environment(struct flb_kube *ctx,struct flb_kube_meta *meta)
{
if(meta->cluster == NULL) {
char* cluster_name = getenv("CLUSTER_NAME");
int cluster_name_len = strlen(cluster_name);
if(cluster_name) {
meta->cluster = strdup(cluster_name);
meta->cluster_len = cluster_name_len;
meta->fields++;
} else {
free(cluster_name);
}
flb_plg_debug(ctx->ins, "Cluster name is %s.", meta->cluster);
}
}

static void cb_results(const char *name, const char *value,
size_t vlen, void *data)
{
Expand Down Expand Up @@ -1059,6 +1076,12 @@ static int merge_meta(struct flb_kube_meta *meta, struct flb_kube *ctx,

/* Append Regex fields */
msgpack_pack_map(&mp_pck, map_size);
if (meta->cluster != NULL) {
msgpack_pack_str(&mp_pck, 7);
msgpack_pack_str_body(&mp_pck, "cluster", 7);
msgpack_pack_str(&mp_pck, meta->cluster_len);
msgpack_pack_str_body(&mp_pck, meta->cluster, meta->cluster_len);
}
if (meta->podname != NULL) {
msgpack_pack_str(&mp_pck, 8);
msgpack_pack_str_body(&mp_pck, "pod_name", 8);
Expand Down Expand Up @@ -1334,7 +1357,7 @@ static int get_and_merge_meta(struct flb_kube *ctx, struct flb_kube_meta *meta,
int ret;
char *api_buf;
size_t api_size;

get_cluster_from_environment(ctx, meta);
if (ctx->use_tag_for_meta) {
ret = merge_meta_from_tag(ctx, meta, out_buf, out_size);
return ret;
Expand Down
2 changes: 2 additions & 0 deletions plugins/filter_kubernetes/kube_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ struct flb_kube;
struct flb_kube_meta {
int fields;

int cluster_len;
int namespace_len;
int podname_len;
int cache_key_len;
Expand All @@ -35,6 +36,7 @@ struct flb_kube_meta {
int container_hash_len;
int container_image_len;

char *cluster;
char *namespace;
char *podname;
char *container_name;
Expand Down
27 changes: 27 additions & 0 deletions plugins/out_cloudwatch_logs/cloudwatch_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,14 @@ static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *bu
,0)) {
goto error;
}
if(stream->entity->attributes->cluster_name != NULL && strlen(stream->entity->attributes->cluster_name) != 0) {
if (!snprintf(ts,256, ",%s%s%s","\"EKS.Cluster\":\"",stream->entity->attributes->cluster_name,"\"")) {
goto error;
}
if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) {
goto error;
}
}
if(stream->entity->attributes->namespace != NULL && strlen(stream->entity->attributes->namespace) != 0) {
if (!snprintf(ts,256, ",%s%s%s","\"K8s.Namespace\":\"",stream->entity->attributes->namespace,"\"")) {
goto error;
Expand All @@ -233,6 +241,14 @@ static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *bu
goto error;
}
}
if(stream->entity->attributes->instance_id != NULL && strlen(stream->entity->attributes->instance_id) != 0) {
if (!snprintf(ts,256, ",%s%s%s","\"EC2.InstanceId\":\"",buf->current_stream->entity->attributes->instance_id,"\"")) {
goto error;
}
if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) {
goto error;
}
}

if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
"}", 1)) {
Expand Down Expand Up @@ -880,10 +896,19 @@ void parse_entity(struct flb_cloudwatch *ctx, entity *entity, msgpack_object map
if(entity->attributes->node == NULL) {
entity->attributes->node = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
}
} else if(strncmp(kube_key.via.str.ptr, "cluster", 7) == 0) {
if(entity->attributes->cluster_name == NULL) {
entity->attributes->cluster_name = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
}
}
}
}
}
if(strncmp(key.via.str.ptr, "ec2_instance_id",15 ) == 0 ) {
if(entity->attributes->instance_id == NULL) {
entity->attributes->instance_id = flb_strndup(val.via.str.ptr, val.via.str.size);
}
}
}
}

Expand All @@ -895,6 +920,8 @@ void update_or_create_entity(struct flb_cloudwatch *ctx, struct log_stream *stre
stream->entity->attributes = flb_malloc(sizeof(entity_attributes));
stream->entity->attributes->namespace = NULL;
stream->entity->attributes->node = NULL;
stream->entity->attributes->cluster_name = NULL;
stream->entity->attributes->instance_id = NULL;
parse_entity(ctx,stream->entity,map, map.via.map.size);
} else {
parse_entity(ctx,stream->entity,map, map.via.map.size);
Expand Down

0 comments on commit 8b949f5

Please sign in to comment.