Skip to content

Commit

Permalink
filter: fix buffer management on chain filters (#975)
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Dec 20, 2018
1 parent c5ce9e9 commit da8f718
Showing 1 changed file with 23 additions and 15 deletions.
38 changes: 23 additions & 15 deletions src/flb_filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <fluent-bit/flb_str.h>
#include <fluent-bit/flb_env.h>
#include <fluent-bit/flb_router.h>
#include <fluent-bit/flb_mp.h>
#include <fluent-bit/flb_pack.h>
#include <chunkio/chunkio.h>

static inline int instance_id(struct flb_filter_plugin *p,
Expand Down Expand Up @@ -62,20 +64,20 @@ void flb_filter_do(struct flb_input_chunk *ic,
int in_records = 0;
int out_records = 0;
int diff = 0;
char *work_data;
size_t work_size;
void *out_buf;
size_t cur_size;
size_t out_size;
ssize_t content_size;
ssize_t write_at;
struct mk_list *head;
struct flb_filter_instance *f_ins;
msgpack_zone *mp_zone = NULL;

content_size = cio_chunk_get_content_size(ic->chunk);
if (content_size <= 0) {
flb_error("[filter] cannot retrieve original content size");
return;
}
content_size -= bytes;

work_data = (char *) data;
work_size = bytes;

/* Count number of incoming records */
mp_zone = msgpack_zone_new(MSGPACK_ZONE_CHUNK_SIZE);
Expand All @@ -92,11 +94,17 @@ void flb_filter_do(struct flb_input_chunk *ic,
out_buf = NULL;
out_size = 0;

content_size = cio_chunk_get_content_size(ic->chunk);

/* where to position the new content if modified ? */
write_at = (content_size - work_size);

/* Count number of incoming records */
in_records = flb_mp_count_zone(data, bytes, mp_zone);

/* Invoke the filter callback */
ret = f_ins->p->cb_filter(data, bytes, /* msgpack raw data */
ret = f_ins->p->cb_filter(work_data, /* msgpack buffer */
work_size, /* msgpack size */
tag, tag_len, /* input tag */
&out_buf, /* new data */
&out_size, /* new data size */
Expand All @@ -122,9 +130,7 @@ void flb_filter_do(struct flb_input_chunk *ic,
}
else {
#ifdef FLB_HAVE_METRICS
out_records = flb_mp_count_zone(out_buf, out_size,
mp_zone);

out_records = flb_mp_count_zone(out_buf, out_size, mp_zone);
if (out_records > in_records) {
diff = (out_records - in_records);
/* Summarize new records */
Expand All @@ -137,16 +143,18 @@ void flb_filter_do(struct flb_input_chunk *ic,
flb_metrics_sum(FLB_METRIC_N_DROPPED,
diff, f_ins->metrics);
}
msgpack_zone_clear(mp_zone);
#endif
}
ret = flb_input_chunk_write_at(ic, content_size,
msgpack_zone_clear(mp_zone);

ret = flb_input_chunk_write_at(ic, write_at,
out_buf, out_size);
/* Point back the 'data' pointer to the new address */
bytes = out_size;
ret = cio_chunk_get_content(ic->chunk,
data, &cur_size);
data += content_size;
&work_data, &cur_size);

work_data += (cur_size - out_size);
work_size = out_size;
flb_free(out_buf);
}
}
Expand Down

0 comments on commit da8f718

Please sign in to comment.