diff --git a/src/flb_filter.c b/src/flb_filter.c index 853ad99a300..794bf4f719c 100644 --- a/src/flb_filter.c +++ b/src/flb_filter.c @@ -22,6 +22,8 @@ #include #include #include +#include +#include #include static inline int instance_id(struct flb_filter_plugin *p, @@ -62,20 +64,20 @@ void flb_filter_do(struct flb_input_chunk *ic, int in_records = 0; int out_records = 0; int diff = 0; + char *work_data; + size_t work_size; void *out_buf; size_t cur_size; size_t out_size; ssize_t content_size; + ssize_t write_at; struct mk_list *head; struct flb_filter_instance *f_ins; msgpack_zone *mp_zone = NULL; - content_size = cio_chunk_get_content_size(ic->chunk); - if (content_size <= 0) { - flb_error("[filter] cannot retrieve original content size"); - return; - } - content_size -= bytes; + + work_data = (char *) data; + work_size = bytes; /* Count number of incoming records */ mp_zone = msgpack_zone_new(MSGPACK_ZONE_CHUNK_SIZE); @@ -92,11 +94,17 @@ void flb_filter_do(struct flb_input_chunk *ic, out_buf = NULL; out_size = 0; + content_size = cio_chunk_get_content_size(ic->chunk); + + /* where to position the new content if modified ? */ + write_at = (content_size - work_size); + /* Count number of incoming records */ in_records = flb_mp_count_zone(data, bytes, mp_zone); /* Invoke the filter callback */ - ret = f_ins->p->cb_filter(data, bytes, /* msgpack raw data */ + ret = f_ins->p->cb_filter(work_data, /* msgpack buffer */ + work_size, /* msgpack size */ tag, tag_len, /* input tag */ &out_buf, /* new data */ &out_size, /* new data size */ @@ -122,9 +130,7 @@ void flb_filter_do(struct flb_input_chunk *ic, } else { #ifdef FLB_HAVE_METRICS - out_records = flb_mp_count_zone(out_buf, out_size, - mp_zone); - + out_records = flb_mp_count_zone(out_buf, out_size, mp_zone); if (out_records > in_records) { diff = (out_records - in_records); /* Summarize new records */ @@ -137,16 +143,18 @@ void flb_filter_do(struct flb_input_chunk *ic, flb_metrics_sum(FLB_METRIC_N_DROPPED, diff, f_ins->metrics); } - msgpack_zone_clear(mp_zone); #endif } - ret = flb_input_chunk_write_at(ic, content_size, + msgpack_zone_clear(mp_zone); + + ret = flb_input_chunk_write_at(ic, write_at, out_buf, out_size); /* Point back the 'data' pointer to the new address */ - bytes = out_size; ret = cio_chunk_get_content(ic->chunk, - data, &cur_size); - data += content_size; + &work_data, &cur_size); + + work_data += (cur_size - out_size); + work_size = out_size; flb_free(out_buf); } }