From 57ea4498a5c603a9a81e6dfad98a38ba06f00d7a Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 15 Feb 2024 20:11:41 -0600 Subject: [PATCH] mp: mp_accessor API extended to active/deactivate record accessors The mp_accessor API allows to manipulate a msgpack object buffer by adding or removing keys through a definition of a list of record accessor patterns. By default, when running mp_accessor it iterates through all the list of record accessors, but there are cases where would be ideal to just enable/disable some of them. This patch extends the mp_accessor API allowing to activate/deactivate certain record accessors on-demand. Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_mp.h | 11 ++++- src/flb_mp.c | 82 ++++++++++++++++++++++++++++++------- 2 files changed, 78 insertions(+), 15 deletions(-) diff --git a/include/fluent-bit/flb_mp.h b/include/fluent-bit/flb_mp.h index af736e026e2..28a216d82c9 100644 --- a/include/fluent-bit/flb_mp.h +++ b/include/fluent-bit/flb_mp.h @@ -44,7 +44,6 @@ struct flb_mp_map_header { void *data; }; - /* */ struct flb_mp_accessor_match { int matched; @@ -54,6 +53,13 @@ struct flb_mp_accessor_match { struct flb_record_accessor *ra; }; +/* wrapper to hold a list of record_accessor contexts */ +struct flb_mp_accessor_ra { + int is_active; + struct flb_record_accessor *ra; + struct mk_list _head; +}; + /* A context to abstract usage of record accessor when multiple patterns exists */ struct flb_mp_accessor { int matches_size; @@ -75,5 +81,8 @@ void flb_mp_accessor_destroy(struct flb_mp_accessor *mpa); int flb_mp_accessor_keys_remove(struct flb_mp_accessor *mpa, msgpack_object *map, void **out_buf, size_t *out_size); +void flb_mp_accessor_set_active(struct flb_mp_accessor *mpa, int status); +int flb_mp_accessor_set_active_by_pattern(struct flb_mp_accessor *mpa, + const char *pattern, int status); #endif diff --git a/src/flb_mp.c b/src/flb_mp.c index 3a6bd9f904e..77f53fefe4a 100644 --- a/src/flb_mp.c +++ b/src/flb_mp.c @@ -372,8 +372,17 @@ static int insert_by_subkey_count(struct flb_record_accessor *ra, struct flb_mp_ { int subkey_count; int count; - struct mk_list *h; - struct flb_record_accessor *val_ra; + struct mk_list *head; + struct flb_mp_accessor_ra *val_ra; + struct flb_mp_accessor_ra *mp_ra; + + mp_ra = flb_calloc(1, sizeof(struct flb_mp_accessor_ra)); + if (!mp_ra) { + flb_errno(); + return -1; + } + mp_ra->is_active = FLB_TRUE; + mp_ra->ra = ra; /* * sort flb_record_accessor by number of subkey @@ -384,20 +393,57 @@ static int insert_by_subkey_count(struct flb_record_accessor *ra, struct flb_mp_ * $kubernetes[2]['annotations']['fluentbit.io/tag'] */ subkey_count = flb_ra_subkey_count(ra); - mk_list_foreach(h, &mpa->ra_list) { - val_ra = mk_list_entry(h, struct flb_record_accessor, _head); - count = flb_ra_subkey_count(val_ra); + mk_list_foreach(head, &mpa->ra_list) { + val_ra = mk_list_entry(head, struct flb_mp_accessor_ra, _head); + count = flb_ra_subkey_count(val_ra->ra); if (count >= subkey_count) { - mk_list_add_before(&ra->_head, &val_ra->_head, &mpa->ra_list); + mk_list_add_before(&mp_ra->_head, &val_ra->_head, &mpa->ra_list); return 0; } } /* add to tail of list */ - mk_list_add(&ra->_head, &mpa->ra_list); + mk_list_add(&mp_ra->_head, &mpa->ra_list); return 0; } +/* Set the active status for all record accessor patterns */ +void flb_mp_accessor_set_active(struct flb_mp_accessor *mpa, int status) +{ + struct mk_list *head; + struct flb_mp_accessor_ra *mp_ra; + + mk_list_foreach(head, &mpa->ra_list) { + mp_ra = mk_list_entry(head, struct flb_mp_accessor_ra, _head); + mp_ra->is_active = status; + } +} + +/* Set the active status for a specific record accessor pattern */ +int flb_mp_accessor_set_active_by_pattern(struct flb_mp_accessor *mpa, + const char *pattern, int status) +{ + int len; + struct mk_list *head; + struct flb_mp_accessor_ra *mp_ra; + + len = strlen(pattern); + + mk_list_foreach(head, &mpa->ra_list) { + mp_ra = mk_list_entry(head, struct flb_mp_accessor_ra, _head); + + if (len != flb_sds_len(mp_ra->ra->pattern)) { + continue; + } + + if (strcmp(mp_ra->ra->pattern, pattern) == 0) { + mp_ra->is_active = status; + return 0; + } + } + + return -1; +} /* * Create an 'mp accessor' context: this context allows to create a list of @@ -541,8 +587,8 @@ int flb_mp_accessor_keys_remove(struct flb_mp_accessor *mpa, msgpack_object *o_key; msgpack_object *o_val; struct mk_list *head; - struct flb_record_accessor *ra; struct flb_mp_accessor_match *match; + struct flb_mp_accessor_ra *mp_ra; struct flb_mp_map_header mh; msgpack_sbuffer mp_sbuf; msgpack_packer mp_pck; @@ -555,10 +601,15 @@ int flb_mp_accessor_keys_remove(struct flb_mp_accessor *mpa, memset(mpa->matches, '\0', mpa->matches_size); mk_list_foreach(head, &mpa->ra_list) { - ra = mk_list_entry(head, struct flb_record_accessor, _head); + mp_ra = mk_list_entry(head, struct flb_mp_accessor_ra, _head); + + if (mp_ra->is_active == FLB_FALSE) { + rule_id++; + continue; + } /* Apply the record accessor rule against the map */ - ret = flb_ra_get_kv_pair(ra, *map, &s_key, &o_key, &o_val); + ret = flb_ra_get_kv_pair(mp_ra->ra, *map, &s_key, &o_key, &o_val); if (ret == 0) { /* There is a match, register in the matches table */ match = &mpa->matches[rule_id]; @@ -566,7 +617,7 @@ int flb_mp_accessor_keys_remove(struct flb_mp_accessor *mpa, match->start_key = s_key; /* Initial key path that matched */ match->key = o_key; /* Final key that matched */ match->val = o_val; /* Final value */ - match->ra = ra; /* Record accessor context */ + match->ra = mp_ra->ra; /* Record accessor context */ matches++; } rule_id++; @@ -627,19 +678,22 @@ void flb_mp_accessor_destroy(struct flb_mp_accessor *mpa) struct mk_list *tmp; struct mk_list *head; struct flb_record_accessor *ra; + struct flb_mp_accessor_ra *mp_ra; if (!mpa) { return; } mk_list_foreach_safe(head, tmp, &mpa->ra_list) { - ra = mk_list_entry(head, struct flb_record_accessor, _head); - mk_list_del(&ra->_head); - flb_ra_destroy(ra); + mp_ra = mk_list_entry(head, struct flb_mp_accessor_ra, _head); + mk_list_del(&mp_ra->_head); + flb_ra_destroy(mp_ra->ra); + flb_free(mp_ra); } if (mpa->matches) { flb_free(mpa->matches); } + flb_free(mpa); }