Skip to content

Commit

Permalink
filter: compose safety Tag and fix write offset (#993)
Browse files Browse the repository at this point in the history
Filter was trusting that input plugins was ingesting a NULL terminated
value, but if the data comes from Forward this will not be the case and
when calling the Routing will not match any rule.

This patch add a safety tag composer plus fix a problem when setting up
the write position when data has been removed.

Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Dec 20, 2018
1 parent da8f718 commit abf83a4
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions src/flb_filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ void flb_filter_do(struct flb_input_chunk *ic,
int in_records = 0;
int out_records = 0;
int diff = 0;
char *ntag;
char *work_data;
size_t work_size;
void *out_buf;
Expand All @@ -76,6 +77,17 @@ void flb_filter_do(struct flb_input_chunk *ic,
msgpack_zone *mp_zone = NULL;


/* For the incoming Tag make sure to create a NULL terminated reference */
ntag = flb_malloc(tag_len + 1);
if (!ntag) {
flb_errno();
flb_error("[filter] could not filter record due to memory problems");
return;
}
memcpy(ntag, tag, tag_len);
ntag[tag_len] = '\0';


work_data = (char *) data;
work_size = bytes;

Expand All @@ -85,7 +97,7 @@ void flb_filter_do(struct flb_input_chunk *ic,
/* Iterate filters */
mk_list_foreach(head, &config->filters) {
f_ins = mk_list_entry(head, struct flb_filter_instance, _head);
if (flb_router_match(tag, tag_len, f_ins->match
if (flb_router_match(ntag, tag_len, f_ins->match
#ifdef FLB_HAVE_REGEX
, f_ins->match_regex
#endif
Expand All @@ -100,7 +112,7 @@ void flb_filter_do(struct flb_input_chunk *ic,
write_at = (content_size - work_size);

/* Count number of incoming records */
in_records = flb_mp_count_zone(data, bytes, mp_zone);
in_records = flb_mp_count_zone(work_data, work_size, mp_zone);

/* Invoke the filter callback */
ret = f_ins->p->cb_filter(work_data, /* msgpack buffer */
Expand All @@ -117,7 +129,7 @@ void flb_filter_do(struct flb_input_chunk *ic,
/* all records removed, no data to continue processing */
if (out_size == 0) {
/* reset data content length */
flb_input_chunk_write_at(ic, content_size, "", 0);
flb_input_chunk_write_at(ic, write_at, "", 0);

#ifdef FLB_HAVE_METRICS
/* Summarize all records removed */
Expand Down Expand Up @@ -161,6 +173,7 @@ void flb_filter_do(struct flb_input_chunk *ic,
}

msgpack_zone_free(mp_zone);
flb_free(ntag);
}

int flb_filter_set_property(struct flb_filter_instance *filter, char *k, char *v)
Expand Down

0 comments on commit abf83a4

Please sign in to comment.