Skip to content

Commit

Permalink
filter_kubernetes: add workload scraping logics for kubernetes filter (
Browse files Browse the repository at this point in the history
  • Loading branch information
zhihonl authored Sep 26, 2024
1 parent 2b400a7 commit c11c024
Show file tree
Hide file tree
Showing 28 changed files with 926 additions and 4 deletions.
3 changes: 3 additions & 0 deletions plugins/filter_kubernetes/kube_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ void flb_kube_conf_destroy(struct flb_kube *ctx)
if (ctx->parser == NULL && ctx->regex) {
flb_regex_destroy(ctx->regex);
}
if (ctx->deploymentRegex) {
flb_regex_destroy(ctx->deploymentRegex);
}

flb_free(ctx->api_host);
flb_free(ctx->token);
Expand Down
1 change: 1 addition & 0 deletions plugins/filter_kubernetes/kube_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ struct flb_kube {

/* Regex context to parse records */
struct flb_regex *regex;
struct flb_regex *deploymentRegex;
struct flb_parser *parser;

/* TLS CA certificate file */
Expand Down
111 changes: 109 additions & 2 deletions plugins/filter_kubernetes/kube_meta.c
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,102 @@ static void extract_container_hash(struct flb_kube_meta *meta,
}
}

static void cb_results_workload(const char *name, const char *value,
size_t vlen, void *data)
{
if (name == NULL || value == NULL || vlen == 0 || data == NULL) {
return;
}

struct flb_kube_meta *meta = data;

if (meta->workload == NULL && strcmp(name, "deployment") == 0) {
meta->workload = flb_strndup(value, vlen);
meta->workload_len = vlen;
meta->fields++;
}
}

static void search_workload(struct flb_kube_meta *meta,struct flb_kube *ctx,msgpack_object map)
{
int i,j,ownerIndex;
int regex_found;
int replicaset_match;
int podname_match = FLB_FALSE;
msgpack_object k, v;
msgpack_object_map ownerMap;
struct flb_regex_search result;
/* Temporary variable to store the workload value */
msgpack_object workload_val;

for (i = 0; i < map.via.map.size; i++) {

k = map.via.map.ptr[i].key;
v = map.via.map.ptr[i].val;
if (strncmp(k.via.str.ptr, "name", k.via.str.size) == 0) {

if (!strncmp(v.via.str.ptr, meta->podname, v.via.str.size)) {
podname_match = FLB_TRUE;
}

}
/* Example JSON for the below parsing:
* "ownerReferences": [
{
"apiVersion": "apps/v1",
"kind": "ReplicaSet",
"name": "my-replicaset",
"uid": "abcd1234-5678-efgh-ijkl-9876mnopqrst",
"controller": true,
"blockOwnerDeletion": true
}
]*/
if (podname_match && strncmp(k.via.str.ptr, "ownerReferences", k.via.str.size) == 0 && v.type == MSGPACK_OBJECT_ARRAY) {
for (j = 0; j < v.via.array.size; j++) {
if (v.via.array.ptr[j].type == MSGPACK_OBJECT_MAP) {
ownerMap = v.via.array.ptr[j].via.map;
for (ownerIndex = 0; ownerIndex < ownerMap.size; ownerIndex++) {
msgpack_object key = ownerMap.ptr[ownerIndex].key;
msgpack_object val = ownerMap.ptr[ownerIndex].val;

/* Ensure both key and value are strings */
if (key.type == MSGPACK_OBJECT_STR && val.type == MSGPACK_OBJECT_STR) {
if (strncmp(key.via.str.ptr, "kind", key.via.str.size) == 0 && strncmp(val.via.str.ptr, "ReplicaSet", val.via.str.size) == 0) {
replicaset_match = FLB_TRUE;
}

if (strncmp(key.via.str.ptr, "name", key.via.str.size) == 0) {
/* Store the value of 'name' in workload_val so it can be reused by set_workload */
workload_val = val;
if (replicaset_match) {
regex_found = flb_regex_do(ctx->deploymentRegex, val.via.str.ptr, val.via.str.size, &result);
if (regex_found > 0) {
/* Parse regex results */
flb_regex_parse(ctx->deploymentRegex, &result, cb_results_workload, meta);
} else {
/* Set workload if regex does not match */
goto set_workload;
}
} else {
/* Set workload if not a replicaset match */
goto set_workload;
}
}
}
}
}
}
}
}

return;

set_workload:
meta->workload = flb_strndup(workload_val.via.str.ptr, workload_val.via.str.size);
meta->workload_len = workload_val.via.str.size;
meta->fields++;
}

static int search_podname_and_namespace(struct flb_kube_meta *meta,
struct flb_kube *ctx,
msgpack_object map)
Expand Down Expand Up @@ -1006,6 +1102,7 @@ static int merge_meta(struct flb_kube_meta *meta, struct flb_kube *ctx,
k = api_map.via.map.ptr[i].key;
if (k.via.str.size == 8 && !strncmp(k.via.str.ptr, "metadata", 8)) {
meta_val = api_map.via.map.ptr[i].val;
search_workload(meta,ctx,meta_val);
if (meta_val.type == MSGPACK_OBJECT_MAP) {
meta_found = FLB_TRUE;
}
Expand Down Expand Up @@ -1123,6 +1220,13 @@ static int merge_meta(struct flb_kube_meta *meta, struct flb_kube *ctx,
}
}

if (meta->workload != NULL) {
msgpack_pack_str(&mp_pck, 8);
msgpack_pack_str_body(&mp_pck, "workload", 8);
msgpack_pack_str(&mp_pck, meta->workload_len);
msgpack_pack_str_body(&mp_pck, meta->workload, meta->workload_len);
}

/* Append API Server content */
if (have_uid >= 0) {
v = meta_val.via.map.ptr[have_uid].val;
Expand Down Expand Up @@ -1693,8 +1797,7 @@ int flb_kube_meta_get(struct flb_kube *ctx,
return 0;
}

int flb_kube_meta_release(struct flb_kube_meta *meta)
{
int flb_kube_meta_release(struct flb_kube_meta *meta) {
int r = 0;

if (meta->namespace) {
Expand Down Expand Up @@ -1731,6 +1834,10 @@ int flb_kube_meta_release(struct flb_kube_meta *meta)
flb_free(meta->cache_key);
}

if (meta->workload) {
flb_free(meta->workload);
}

if (meta->cluster) {
flb_free(meta->cluster);
}
Expand Down
2 changes: 2 additions & 0 deletions plugins/filter_kubernetes/kube_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ struct flb_kube_meta {
int docker_id_len;
int container_hash_len;
int container_image_len;
int workload_len;

char *cluster;
char *namespace;
char *podname;
char *container_name;
char *container_image;
char *docker_id;
char *workload;

char *container_hash; /* set only on Systemd mode */

Expand Down
1 change: 1 addition & 0 deletions plugins/filter_kubernetes/kube_regex.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ int flb_kube_regex_init(struct flb_kube *ctx)
ctx->regex = flb_regex_create(KUBE_TAG_TO_REGEX);
}
}
ctx->deploymentRegex = flb_regex_create(DEPLOYMENT_REGEX);

if (!ctx->regex) {
return -1;
Expand Down
2 changes: 2 additions & 0 deletions plugins/filter_kubernetes/kube_regex.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

#define KUBE_JOURNAL_TO_REGEX "^(?<name_prefix>[^_]+)_(?<container_name>[^\\._]+)(\\.(?<container_hash>[^_]+))?_(?<pod_name>[^_]+)_(?<namespace_name>[^_]+)_[^_]+_[^_]+$"

#define DEPLOYMENT_REGEX "^(?<deployment>.+)-(?<id>[bcdfghjklmnpqrstvwxz2456789]{6,10})$"

int flb_kube_regex_init(struct flb_kube *ctx);

#endif
12 changes: 12 additions & 0 deletions plugins/out_cloudwatch_logs/cloudwatch_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,14 @@ static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *bu
goto error;
}
}
if(stream->entity->attributes->workload != NULL && strlen(stream->entity->attributes->workload) != 0) {
if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"K8s.Workload\":\"",buf->current_stream->entity->attributes->workload,"\"")) {
goto error;
}
if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) {
goto error;
}
}
if(stream->entity->attributes->instance_id != NULL && strlen(stream->entity->attributes->instance_id) != 0) {
if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"EC2.InstanceId\":\"",buf->current_stream->entity->attributes->instance_id,"\"")) {
goto error;
Expand Down Expand Up @@ -943,6 +951,10 @@ void parse_entity(struct flb_cloudwatch *ctx, entity *entity, msgpack_object map
if(entity->attributes->cluster_name == NULL) {
entity->attributes->cluster_name = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
}
} else if(strncmp(kube_key.via.str.ptr, "workload", kube_key.via.str.size) == 0) {
if(entity->attributes->workload == NULL) {
entity->attributes->workload = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
}
} else if(strncmp(kube_key.via.str.ptr, "name_source", kube_key.via.str.size) == 0) {
if(entity->attributes->name_source == NULL) {
entity->attributes->name_source = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"log":"Fluent Bit is logging\n","stream":"stdout","time":"2019-04-01T17:58:33.598656444Z"}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"log":"Fluent Bit is logging\n","stream":"stdout","time":"2019-04-01T17:58:33.598656444Z"}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"log":"Fluent Bit is logging\n","stream":"stdout","time":"2019-04-01T17:58:33.598656444Z"}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"log":"Fluent Bit is logging\n","stream":"stdout","time":"2019-04-01T17:58:33.598656444Z"}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"log":"Fluent Bit is logging\n","stream":"stdout","time":"2019-04-01T17:58:33.598656444Z"}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"log":"Fluent Bit is logging\n","stream":"stdout","time":"2019-04-01T17:58:33.598656444Z"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
{
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"annotations": {
"prometheus.io/path": "/api/v1/metrics/prometheus",
"prometheus.io/port": "2020",
"prometheus.io/scrape": "true"
},
"creationTimestamp": "2019-04-03T09:29:00Z",
"labels": {
"app.kubernetes.io/name": "fluent-bit"
},
"name": "use-kubelet-disabled-daemonset",
"namespace": "options",
"resourceVersion": "74466568",
"selfLink": "/api/v1/namespaces/core/pods/base",
"uid": "e9f2963f-55f2-11e9-84c5-02e422b8a84a",
"ownerReferences": [
{
"apiVersion": "apps/v1",
"kind": "DaemonSet",
"name": "my-daemonset",
"uid": "abcd1234-5678-efgh-ijkl-9876mnopqrst",
"controller": true,
"blockOwnerDeletion": true
}
]
},
"spec": {
"containers": [
{
"image": "fluent/fluent-bit",
"imagePullPolicy": "Always",
"name": "fluent-bit",
"resources": {},
"stdin": true,
"stdinOnce": true,
"terminationMessagePath": "/dev/termination-log",
"terminationMessagePolicy": "File",
"tty": true,
"volumeMounts": [
{
"mountPath": "/var/run/secrets/kubernetes.io/serviceaccount",
"name": "default-token-9ffht",
"readOnly": true
}
]
}
],
"dnsPolicy": "ClusterFirst",
"nodeName": "ip-10-49-18-80.eu-west-1.compute.internal",
"restartPolicy": "Never",
"schedulerName": "default-scheduler",
"securityContext": {},
"serviceAccount": "default",
"serviceAccountName": "default",
"terminationGracePeriodSeconds": 30,
"tolerations": [
{
"effect": "NoExecute",
"key": "node.kubernetes.io/not-ready",
"operator": "Exists",
"tolerationSeconds": 300
},
{
"effect": "NoExecute",
"key": "node.kubernetes.io/unreachable",
"operator": "Exists",
"tolerationSeconds": 300
}
],
"volumes": [
{
"name": "default-token-9ffht",
"secret": {
"defaultMode": 420,
"secretName": "default-token-9ffht"
}
}
]
},
"status": {
"conditions": [
{
"lastProbeTime": null,
"lastTransitionTime": "2019-04-03T09:29:00Z",
"status": "True",
"type": "Initialized"
},
{
"lastProbeTime": null,
"lastTransitionTime": "2019-04-03T09:29:06Z",
"status": "True",
"type": "Ready"
},
{
"lastProbeTime": null,
"lastTransitionTime": "2019-04-03T09:29:00Z",
"status": "True",
"type": "PodScheduled"
}
],
"containerStatuses": [
{
"containerID": "docker://c9898099f6d235126d564ed38a020007ea7a6fac6e25e718de683c9dd0076c16",
"image": "fluent/fluent-bit:latest",
"imageID": "docker-pullable://fluent/fluent-bit@sha256:7ac0fd3569af866e9a6a22eb592744200d2dbe098cf066162453f8d0b06c531f",
"lastState": {},
"name": "fluent-bit",
"ready": true,
"restartCount": 0,
"state": {
"running": {
"startedAt": "2019-04-03T09:29:05Z"
}
}
}
],
"hostIP": "10.49.18.80",
"phase": "Running",
"podIP": "100.116.192.42",
"qosClass": "BestEffort",
"startTime": "2019-04-03T09:29:00Z"
}
}
Loading

0 comments on commit c11c024

Please sign in to comment.