Skip to content

Commit

Permalink
Merge branch 'aws-fluent-bit-cherry-pick' of github.com:zhihonl/priva…
Browse files Browse the repository at this point in the history
…te-fluent-bit into add-entity
  • Loading branch information
nathalapooja committed Oct 22, 2024
2 parents 5dc2e63 + a24118a commit 9de7ed7
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 35 deletions.
3 changes: 2 additions & 1 deletion plugins/filter_aws/aws.c
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,8 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_BOOL, "enable_entity", "false",
0, FLB_TRUE, offsetof(struct flb_filter_aws, enable_entity),
"Enable entity prefix for necessary fields"
"Enable entity prefix for fields used for constructing entity."
"This currently only affects instance ID"
},
{0}
};
Expand Down
23 changes: 14 additions & 9 deletions plugins/filter_kubernetes/kube_meta.c
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@ static int get_meta_file_info(struct flb_kube *ctx, const char *namespace,
static int get_meta_info_from_request(struct flb_kube *ctx,
struct flb_upstream *upstream,
const char *namespace,
const char *resource,
const char *resource_type,
const char *resource_name,
char **buffer, size_t *size,
int *root_type,
char* uri)
Expand Down Expand Up @@ -389,9 +390,9 @@ static int get_meta_info_from_request(struct flb_kube *ctx,
}

ret = flb_http_do(c, &b_sent);
flb_plg_debug(ctx->ins, "Request (ns=%s, resource=%s) http_do=%i, "
flb_plg_debug(ctx->ins, "Request (ns=%s, %s=%s) http_do=%i, "
"HTTP Status: %i",
namespace, resource, ret, c->resp.status);
namespace, resource_type, resource_name, ret, c->resp.status);

if (ret != 0 || c->resp.status != 200) {
if (c->resp.payload_size > 0) {
Expand Down Expand Up @@ -441,7 +442,7 @@ static int get_pods_from_kubelet(struct flb_kube *ctx,
}
flb_plg_debug(ctx->ins,
"Send out request to Kubelet for pods information.");
packed = get_meta_info_from_request(ctx, ctx->upstream, namespace, podname,
packed = get_meta_info_from_request(ctx, ctx->upstream, namespace, FLB_KUBE_POD, podname,
&buf, &size, &root_type, uri);
}

Expand Down Expand Up @@ -482,10 +483,10 @@ static int get_api_server_configmap(struct flb_kube *ctx,
flb_plg_debug(ctx->ins,
"Send out request to API Server for configmap information");
if(ctx->use_kubelet) {
packed = get_meta_info_from_request(ctx,ctx->kubernetes_upstream, namespace, configmap,
packed = get_meta_info_from_request(ctx,ctx->kubernetes_upstream, namespace,FLB_KUBE_CONFIGMAP, configmap,
&buf, &size, &root_type, uri);
} else {
packed = get_meta_info_from_request(ctx,ctx->upstream, namespace, configmap,
packed = get_meta_info_from_request(ctx,ctx->upstream, namespace, FLB_KUBE_CONFIGMAP, configmap,
&buf, &size, &root_type, uri);
}
}
Expand Down Expand Up @@ -530,7 +531,7 @@ static int get_api_server_info(struct flb_kube *ctx,
}
flb_plg_debug(ctx->ins,
"Send out request to API Server for pods information");
packed = get_meta_info_from_request(ctx, ctx->upstream, namespace, podname,
packed = get_meta_info_from_request(ctx, ctx->upstream, namespace,FLB_KUBE_POD, podname,
&buf, &size, &root_type, uri);
}

Expand Down Expand Up @@ -785,7 +786,9 @@ static void cb_results_workload(const char *name, const char *value,

/*
* Search workload based on the following priority
* where the top is highest priority
* where the top is highest priority. This is done
* to find the owner of the pod which helps with
* determining the upper-level management of the pod
* 1. Deployment name
* 2. StatefulSet name
* 3. DaemonSet name
Expand Down Expand Up @@ -1116,6 +1119,7 @@ static int merge_meta(struct flb_kube_meta *meta, struct flb_kube *ctx,
msgpack_object ann_map;
struct flb_kube_props props = {0};
struct service_attributes *tmp_service_attributes = {0};

/*
* - reg_buf: is a msgpack Map containing meta captured using Regex
*
Expand Down Expand Up @@ -1937,7 +1941,8 @@ int flb_kube_meta_get(struct flb_kube *ctx,
return 0;
}

int flb_kube_meta_release(struct flb_kube_meta *meta) {
int flb_kube_meta_release(struct flb_kube_meta *meta)
{
int r = 0;

if (meta->namespace) {
Expand Down
4 changes: 4 additions & 0 deletions plugins/filter_kubernetes/kube_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ struct flb_kube_meta {
#define FLB_KUBE_API_CONFIGMAP_FMT "/api/v1/namespaces/%s/configmaps/%s"
#define FLB_KUBELET_PODS "/pods"

/* Constants for possible kubernetes resources */
#define FLB_KUBE_POD "pod"
#define FLB_KUBE_CONFIGMAP "configmap"

int flb_kube_meta_init(struct flb_kube *ctx, struct flb_config *config);
int flb_kube_meta_fetch(struct flb_kube *ctx);
int flb_kube_dummy_meta_get(char **out_buf, size_t *out_size);
Expand Down
9 changes: 5 additions & 4 deletions plugins/filter_kubernetes/kubernetes.c
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ static int cb_kube_init(struct flb_filter_instance *f_ins,
// Start the background thread
if (pthread_create(&background_thread, NULL, update_pod_service_map, NULL) != 0) {
flb_error("Failed to create background thread");
background_thread = NULL;
free(task_args);
}
}
Expand Down Expand Up @@ -1167,7 +1168,7 @@ static struct flb_config_map config_map[] = {
FLB_CONFIG_MAP_BOOL, "use_pod_association", "false",
0, FLB_TRUE, offsetof(struct flb_kube, use_pod_association),
"use custom endpoint to get pod to service name mapping"
},
},
/*
* The host used for pod to service name association , default is 127.0.0.1
* Will only check when "use_pod_association" config is set to true
Expand All @@ -1185,7 +1186,7 @@ static struct flb_config_map config_map[] = {
FLB_CONFIG_MAP_STR, "pod_association_endpoint", "/kubernetes/pod-to-service-env-map",
0, FLB_TRUE, offsetof(struct flb_kube, pod_association_endpoint),
"endpoint to connect with when performing pod to service name association"
},
},
/*
* The port for pod to service name association endpoint, default is 4311
* Will only check when "use_pod_association" config is set to true
Expand Down Expand Up @@ -1241,13 +1242,13 @@ static struct flb_config_map config_map[] = {
FLB_CONFIG_MAP_BOOL, "pod_association_host_tls_verify", "true",
0, FLB_TRUE, offsetof(struct flb_kube, pod_association_host_tls_verify),
"enable or disable verification of TLS peer certificate"
},
},
{
FLB_CONFIG_MAP_STR, "set_platform", NULL,
0, FLB_TRUE, offsetof(struct flb_kube, set_platform),
"Set the platform that kubernetes is in. Possible values are k8s and eks"
"This should only be used for testing purpose"
},
},
/* EOF */
{0}
};
Expand Down
66 changes: 45 additions & 21 deletions plugins/out_cloudwatch_logs/cloudwatch_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf,
}
// If we are missing the service name, the entity will get rejected by the frontend anyway
// so do not emit entity unless service name is filled
if(ctx->add_entity && stream->entity != NULL && stream->entity->key_attributes->name != NULL) {
if(ctx->add_entity && stream->entity != NULL && stream->entity->key_attributes != NULL && stream->entity->key_attributes->name != NULL) {
if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
"\"entity\":{", 10)) {
goto error;
Expand Down Expand Up @@ -523,7 +523,10 @@ static int truncate_log(const struct flb_cloudwatch *ctx, const char *log_buffer
return FLB_FALSE;
}

// Helper function to remove keys from a nested map
/*
* Helper function to remove keys prefixed with aws_entity
* from a message pack map
*/
void remove_key_from_nested_map(msgpack_object_map *nested_map, msgpack_packer *pk, int filtered_fields) {
const int remaining_kv_pairs = nested_map->size - filtered_fields;

Expand All @@ -545,7 +548,10 @@ void remove_key_from_nested_map(msgpack_object_map *nested_map, msgpack_packer *
}
}

// Main function to remove a key from a nested map inside the root map
/*
* Main function to remove keys prefixed with aws_entity
* from the root and nested message pack map
*/
void remove_unneeded_field(msgpack_object *root_map, const char *nested_map_key, msgpack_packer *pk,int root_filtered_fields, int filtered_fields) {
if (root_map->type == MSGPACK_OBJECT_MAP) {
msgpack_object_map root = root_map->via.map;
Expand Down Expand Up @@ -1034,16 +1040,20 @@ void parse_entity(struct flb_cloudwatch *ctx, entity *entity, msgpack_object map
kube_key = val.via.map.ptr[j].key;
kube_val = val.via.map.ptr[j].val;
if(strncmp(kube_key.via.str.ptr, "aws_entity_service_name", kube_key.via.str.size) == 0) {
if(entity->key_attributes->name == NULL) {
if(!entity->service_name_found) {
entity->filter_count++;
} else {
entity->service_name_found++;
}
if(entity->key_attributes->name != NULL) {
flb_free(entity->key_attributes->name);
}
entity->key_attributes->name = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
} else if(strncmp(kube_key.via.str.ptr, "aws_entity_environment", kube_key.via.str.size) == 0) {
if(entity->key_attributes->environment == NULL) {
if(!entity->environment_found) {
entity->filter_count++;
} else {
entity->environment_found++;
}
if(entity->key_attributes->environment != NULL) {
flb_free(entity->key_attributes->environment);
}
entity->key_attributes->environment = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
Expand Down Expand Up @@ -1072,9 +1082,11 @@ void parse_entity(struct flb_cloudwatch *ctx, entity *entity, msgpack_object map
}
entity->attributes->workload = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
} else if(strncmp(kube_key.via.str.ptr, "aws_entity_name_source", kube_key.via.str.size) == 0) {
if(entity->attributes->name_source == NULL) {
if(!entity->name_source_found) {
entity->filter_count++;
} else {
entity->name_source_found++;
}
if(entity->attributes->name_source != NULL) {
flb_free(entity->attributes->name_source);
}
entity->attributes->name_source = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
Expand Down Expand Up @@ -1128,11 +1140,11 @@ void update_or_create_entity(struct flb_cloudwatch *ctx, struct log_stream *stre
memset(stream->entity->attributes, 0, sizeof(entity_attributes));
stream->entity->filter_count = 0;
stream->entity->root_filter_count = 0;

parse_entity(ctx,stream->entity,map, map.via.map.size);
} else {
parse_entity(ctx,stream->entity,map, map.via.map.size);
stream->entity->service_name_found = 0;
stream->entity->environment_found = 0;
stream->entity->name_source_found = 0;
}
parse_entity(ctx,stream->entity,map, map.via.map.size);
if (!stream->entity) {
flb_plg_warn(ctx->ins, "Failed to generate entity");
}
Expand Down Expand Up @@ -1160,6 +1172,12 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
msgpack_object emf_payload;
/* msgpack::sbuffer is a simple buffer implementation. */
msgpack_sbuffer mp_sbuf;
/*
* Msgpack objects used to store msgpack after filtering out fields
* with aws entity prefix
*/
msgpack_sbuffer filtered_sbuf;
msgpack_unpacked modified_unpacked;

struct log_stream *stream;

Expand Down Expand Up @@ -1207,6 +1225,10 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
map = root.via.array.ptr[1];
map_size = map.via.map.size;

if(ctx->kubernete_metadata_enabled && ctx->add_entity) {
msgpack_sbuffer_init(&filtered_sbuf);
msgpack_unpacked_init(&modified_unpacked);
}
stream = get_log_stream(ctx, tag, map);
if (!stream) {
flb_plg_debug(ctx->ins, "Couldn't determine log group & stream for record with tag %s", tag);
Expand All @@ -1216,21 +1238,15 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
update_or_create_entity(ctx,stream,map);
// Prepare a buffer to pack the modified map
if(stream->entity != NULL && (stream->entity->root_filter_count > 0 || stream->entity->filter_count > 0)) {
msgpack_sbuffer sbuf;
msgpack_sbuffer_init(&sbuf);
msgpack_packer pk;
msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
msgpack_packer_init(&pk, &filtered_sbuf, msgpack_sbuffer_write);
remove_unneeded_field(&map, "kubernetes",&pk,stream->entity->root_filter_count, stream->entity->filter_count);

// Now, unpack the modified data into a new msgpack_object
msgpack_unpacked modified_unpacked;
msgpack_unpacked_init(&modified_unpacked);
size_t modified_offset = 0;
if (msgpack_unpack_next(&modified_unpacked, sbuf.data, sbuf.size, &modified_offset)) {
if (msgpack_unpack_next(&modified_unpacked, filtered_sbuf.data, filtered_sbuf.size, &modified_offset)) {
map = modified_unpacked.data;
}
msgpack_sbuffer_destroy(&sbuf);
msgpack_unpacked_destroy(&modified_unpacked);
}
}

Expand Down Expand Up @@ -1351,6 +1367,10 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
if (ret == 0) {
i++;
}
if(ctx->kubernete_metadata_enabled && ctx->add_entity) {
msgpack_sbuffer_destroy(&filtered_sbuf);
msgpack_unpacked_destroy(&modified_unpacked);
}
}
msgpack_unpacked_destroy(&result);

Expand All @@ -1366,6 +1386,10 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,

error:
msgpack_unpacked_destroy(&result);
if(ctx->kubernete_metadata_enabled && ctx->add_entity) {
msgpack_sbuffer_destroy(&filtered_sbuf);
msgpack_unpacked_destroy(&modified_unpacked);
}
return -1;
}

Expand Down
3 changes: 3 additions & 0 deletions plugins/out_cloudwatch_logs/cloudwatch_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ typedef struct entity {
struct entity_key_attributes *key_attributes;
struct entity_attributes *attributes;
int filter_count;
int service_name_found;
int environment_found;
int name_source_found;
int root_filter_count;
}entity;

Expand Down

0 comments on commit 9de7ed7

Please sign in to comment.