Skip to content

Commit

Permalink
mp: mp_accessor API extended to active/deactivate record accessors
Browse files Browse the repository at this point in the history
The mp_accessor API allows to manipulate a msgpack object buffer
by adding or removing keys through a definition of a list of
record accessor patterns.

By default, when running mp_accessor it iterates through all the
list of record accessors, but there are cases where would be
ideal to just enable/disable some of them.

This patch extends the mp_accessor API allowing to activate/deactivate
certain record accessors on-demand.

Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Feb 19, 2024
1 parent 7bcefbb commit 57ea449
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 15 deletions.
11 changes: 10 additions & 1 deletion include/fluent-bit/flb_mp.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ struct flb_mp_map_header {
void *data;
};


/* */
struct flb_mp_accessor_match {
int matched;
Expand All @@ -54,6 +53,13 @@ struct flb_mp_accessor_match {
struct flb_record_accessor *ra;
};

/* wrapper to hold a list of record_accessor contexts */
struct flb_mp_accessor_ra {
int is_active;
struct flb_record_accessor *ra;
struct mk_list _head;
};

/* A context to abstract usage of record accessor when multiple patterns exists */
struct flb_mp_accessor {
int matches_size;
Expand All @@ -75,5 +81,8 @@ void flb_mp_accessor_destroy(struct flb_mp_accessor *mpa);
int flb_mp_accessor_keys_remove(struct flb_mp_accessor *mpa,
msgpack_object *map,
void **out_buf, size_t *out_size);
void flb_mp_accessor_set_active(struct flb_mp_accessor *mpa, int status);
int flb_mp_accessor_set_active_by_pattern(struct flb_mp_accessor *mpa,
const char *pattern, int status);

#endif
82 changes: 68 additions & 14 deletions src/flb_mp.c
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,17 @@ static int insert_by_subkey_count(struct flb_record_accessor *ra, struct flb_mp_
{
int subkey_count;
int count;
struct mk_list *h;
struct flb_record_accessor *val_ra;
struct mk_list *head;
struct flb_mp_accessor_ra *val_ra;
struct flb_mp_accessor_ra *mp_ra;

mp_ra = flb_calloc(1, sizeof(struct flb_mp_accessor_ra));
if (!mp_ra) {
flb_errno();
return -1;
}
mp_ra->is_active = FLB_TRUE;
mp_ra->ra = ra;

/*
* sort flb_record_accessor by number of subkey
Expand All @@ -384,20 +393,57 @@ static int insert_by_subkey_count(struct flb_record_accessor *ra, struct flb_mp_
* $kubernetes[2]['annotations']['fluentbit.io/tag']
*/
subkey_count = flb_ra_subkey_count(ra);
mk_list_foreach(h, &mpa->ra_list) {
val_ra = mk_list_entry(h, struct flb_record_accessor, _head);
count = flb_ra_subkey_count(val_ra);
mk_list_foreach(head, &mpa->ra_list) {
val_ra = mk_list_entry(head, struct flb_mp_accessor_ra, _head);
count = flb_ra_subkey_count(val_ra->ra);
if (count >= subkey_count) {
mk_list_add_before(&ra->_head, &val_ra->_head, &mpa->ra_list);
mk_list_add_before(&mp_ra->_head, &val_ra->_head, &mpa->ra_list);
return 0;
}
}

/* add to tail of list */
mk_list_add(&ra->_head, &mpa->ra_list);
mk_list_add(&mp_ra->_head, &mpa->ra_list);
return 0;
}

/* Set the active status for all record accessor patterns */
void flb_mp_accessor_set_active(struct flb_mp_accessor *mpa, int status)
{
struct mk_list *head;
struct flb_mp_accessor_ra *mp_ra;

mk_list_foreach(head, &mpa->ra_list) {
mp_ra = mk_list_entry(head, struct flb_mp_accessor_ra, _head);
mp_ra->is_active = status;
}
}

/* Set the active status for a specific record accessor pattern */
int flb_mp_accessor_set_active_by_pattern(struct flb_mp_accessor *mpa,
const char *pattern, int status)
{
int len;
struct mk_list *head;
struct flb_mp_accessor_ra *mp_ra;

len = strlen(pattern);

mk_list_foreach(head, &mpa->ra_list) {
mp_ra = mk_list_entry(head, struct flb_mp_accessor_ra, _head);

if (len != flb_sds_len(mp_ra->ra->pattern)) {
continue;
}

if (strcmp(mp_ra->ra->pattern, pattern) == 0) {
mp_ra->is_active = status;
return 0;
}
}

return -1;
}

/*
* Create an 'mp accessor' context: this context allows to create a list of
Expand Down Expand Up @@ -541,8 +587,8 @@ int flb_mp_accessor_keys_remove(struct flb_mp_accessor *mpa,
msgpack_object *o_key;
msgpack_object *o_val;
struct mk_list *head;
struct flb_record_accessor *ra;
struct flb_mp_accessor_match *match;
struct flb_mp_accessor_ra *mp_ra;
struct flb_mp_map_header mh;
msgpack_sbuffer mp_sbuf;
msgpack_packer mp_pck;
Expand All @@ -555,18 +601,23 @@ int flb_mp_accessor_keys_remove(struct flb_mp_accessor *mpa,
memset(mpa->matches, '\0', mpa->matches_size);

mk_list_foreach(head, &mpa->ra_list) {
ra = mk_list_entry(head, struct flb_record_accessor, _head);
mp_ra = mk_list_entry(head, struct flb_mp_accessor_ra, _head);

if (mp_ra->is_active == FLB_FALSE) {
rule_id++;
continue;
}

/* Apply the record accessor rule against the map */
ret = flb_ra_get_kv_pair(ra, *map, &s_key, &o_key, &o_val);
ret = flb_ra_get_kv_pair(mp_ra->ra, *map, &s_key, &o_key, &o_val);
if (ret == 0) {
/* There is a match, register in the matches table */
match = &mpa->matches[rule_id];
match->matched = FLB_TRUE;
match->start_key = s_key; /* Initial key path that matched */
match->key = o_key; /* Final key that matched */
match->val = o_val; /* Final value */
match->ra = ra; /* Record accessor context */
match->ra = mp_ra->ra; /* Record accessor context */
matches++;
}
rule_id++;
Expand Down Expand Up @@ -627,19 +678,22 @@ void flb_mp_accessor_destroy(struct flb_mp_accessor *mpa)
struct mk_list *tmp;
struct mk_list *head;
struct flb_record_accessor *ra;
struct flb_mp_accessor_ra *mp_ra;

if (!mpa) {
return;
}

mk_list_foreach_safe(head, tmp, &mpa->ra_list) {
ra = mk_list_entry(head, struct flb_record_accessor, _head);
mk_list_del(&ra->_head);
flb_ra_destroy(ra);
mp_ra = mk_list_entry(head, struct flb_mp_accessor_ra, _head);
mk_list_del(&mp_ra->_head);
flb_ra_destroy(mp_ra->ra);
flb_free(mp_ra);
}

if (mpa->matches) {
flb_free(mpa->matches);
}

flb_free(mpa);
}

0 comments on commit 57ea449

Please sign in to comment.