From a3a42b59579a040f0e9ecb7e5e22205e53dc2f38 Mon Sep 17 00:00:00 2001 From: Leonardo Alminana Date: Mon, 23 Oct 2023 19:54:13 +0200 Subject: [PATCH 1/2] filter: modified flb_filter_do to operate in memory instead of a chunk Signed-off-by: Leonardo Alminana --- include/fluent-bit/flb_filter.h | 1 + src/flb_filter.c | 58 ++++++++++++++------------------- 2 files changed, 25 insertions(+), 34 deletions(-) diff --git a/include/fluent-bit/flb_filter.h b/include/fluent-bit/flb_filter.h index 12649073bd5..c176b474325 100644 --- a/include/fluent-bit/flb_filter.h +++ b/include/fluent-bit/flb_filter.h @@ -133,6 +133,7 @@ void flb_filter_instance_exit(struct flb_filter_instance *ins, void flb_filter_exit(struct flb_config *config); void flb_filter_do(struct flb_input_chunk *ic, const void *data, size_t bytes, + void **out_data, size_t *out_bytes, const char *tag, int tag_len, struct flb_config *config); const char *flb_filter_name(struct flb_filter_instance *ins); diff --git a/src/flb_filter.c b/src/flb_filter.c index 389709a9a4d..dc6370c8439 100644 --- a/src/flb_filter.c +++ b/src/flb_filter.c @@ -77,6 +77,7 @@ static inline int prop_key_check(const char *key, const char *kv, int k_len) void flb_filter_do(struct flb_input_chunk *ic, const void *data, size_t bytes, + void **out_data, size_t *out_bytes, const char *tag, int tag_len, struct flb_config *config) { @@ -90,13 +91,10 @@ void flb_filter_do(struct flb_input_chunk *ic, char *name; #endif char *ntag; - const char *work_data; + char *work_data; size_t work_size; void *out_buf; - size_t cur_size; size_t out_size; - ssize_t content_size; - ssize_t write_at; struct mk_list *head; struct flb_filter_instance *f_ins; struct flb_input_instance *i_ins = ic->in; @@ -106,6 +104,9 @@ void flb_filter_do(struct flb_input_chunk *ic, struct flb_time tm_finish; #endif /* FLB_HAVE_CHUNK_TRACE */ + *out_data = NULL; + *out_bytes = 0; + /* For the incoming Tag make sure to create a NULL terminated reference */ ntag = flb_malloc(tag_len + 1); if (!ntag) { @@ -116,7 +117,7 @@ void flb_filter_do(struct flb_input_chunk *ic, memcpy(ntag, tag, tag_len); ntag[tag_len] = '\0'; - work_data = (const char *) data; + work_data = (char *) data; work_size = bytes; #ifdef FLB_HAVE_METRICS @@ -131,9 +132,11 @@ void flb_filter_do(struct flb_input_chunk *ic, /* Iterate filters */ mk_list_foreach(head, &config->filters) { f_ins = mk_list_entry(head, struct flb_filter_instance, _head); + if (is_active(&f_ins->properties) == FLB_FALSE) { continue; } + if (flb_router_match(ntag, tag_len, f_ins->match #ifdef FLB_HAVE_REGEX , f_ins->match_regex @@ -145,16 +148,12 @@ void flb_filter_do(struct flb_input_chunk *ic, out_buf = NULL; out_size = 0; - content_size = cio_chunk_get_content_size(ic->chunk); - - /* where to position the new content if modified ? */ - write_at = (content_size - work_size); - #ifdef FLB_HAVE_CHUNK_TRACE if (ic->trace) { flb_time_get(&tm_start); } #endif /* FLB_HAVE_CHUNK_TRACE */ + /* Invoke the filter callback */ ret = f_ins->p->cb_filter(work_data, /* msgpack buffer */ work_size, /* msgpack size */ @@ -165,6 +164,7 @@ void flb_filter_do(struct flb_input_chunk *ic, i_ins, /* input instance */ f_ins->context, /* filter priv data */ config); + #ifdef FLB_HAVE_CHUNK_TRACE if (ic->trace) { flb_time_get(&tm_finish); @@ -176,19 +176,25 @@ void flb_filter_do(struct flb_input_chunk *ic, cmt_counter_add(f_ins->cmt_records, ts, in_records, 1, (char *[]) {name}); - cmt_counter_add(f_ins->cmt_bytes, ts, content_size, + cmt_counter_add(f_ins->cmt_bytes, ts, out_size, 1, (char *[]) {name}); flb_metrics_sum(FLB_METRIC_N_RECORDS, in_records, f_ins->metrics); - flb_metrics_sum(FLB_METRIC_N_BYTES, content_size, f_ins->metrics); + flb_metrics_sum(FLB_METRIC_N_BYTES, out_size, f_ins->metrics); #endif /* Override buffer just if it was modified */ if (ret == FLB_FILTER_MODIFIED) { + /* release intermediate buffer */ + if (work_data != data) { + flb_free(work_data); + } + + work_data = (char *) out_buf; + work_size = out_size; + /* all records removed, no data to continue processing */ if (out_size == 0) { - /* reset data content length */ - flb_input_chunk_write_at(ic, write_at, "", 0); #ifdef FLB_HAVE_CHUNK_TRACE if (ic->trace) { flb_chunk_trace_filter(ic->trace, (void *)f_ins, &tm_start, &tm_finish, "", 0); @@ -240,36 +246,19 @@ void flb_filter_do(struct flb_input_chunk *ic, ic->total_records = pre_records + in_records; #endif } - ret = flb_input_chunk_write_at(ic, write_at, - out_buf, out_size); - if (ret == -1) { - flb_error("[filter] could not write data to storage. " - "Skipping filtering."); - flb_free(out_buf); - continue; - } #ifdef FLB_HAVE_CHUNK_TRACE if (ic->trace) { flb_chunk_trace_filter(ic->trace, (void *)f_ins, &tm_start, &tm_finish, out_buf, out_size); } #endif /* FLB_HAVE_CHUNK_TRACE */ - - /* Point back the 'data' pointer to the new address */ - ret = cio_chunk_get_content(ic->chunk, - (char **) &work_data, &cur_size); - if (ret != CIO_OK) { - flb_error("[filter] error retrieving data chunk"); - } - else { - work_data += (cur_size - out_size); - work_size = out_size; - } - flb_free(out_buf); } } } + *out_data = work_data; + *out_bytes = work_size; + flb_free(ntag); } @@ -660,6 +649,7 @@ void flb_filter_instance_destroy(struct flb_filter_instance *ins) } mk_list_del(&ins->_head); + flb_free(ins); } From e603c33767d91726c719d09dd63cd5eafeb8e77f Mon Sep 17 00:00:00 2001 From: Leonardo Alminana Date: Mon, 23 Oct 2023 19:55:49 +0200 Subject: [PATCH 2/2] input_chunk: adapted chunk filtering code to operate on a local buffer Signed-off-by: Leonardo Alminana --- src/flb_input_chunk.c | 49 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 39 insertions(+), 10 deletions(-) diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index c71ae3ef089..c5b894aa45d 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -1390,6 +1390,10 @@ static int input_chunk_append_raw(struct flb_input_instance *in, size_t pre_real_size; struct flb_input_chunk *ic; struct flb_storage_input *si; + void *filtered_data_buffer; + size_t filtered_data_size; + void *final_data_buffer; + size_t final_data_size; /* memory ring-buffer checker */ if (in->storage_type == FLB_STORAGE_MEMRB) { @@ -1491,15 +1495,6 @@ static int input_chunk_append_raw(struct flb_input_instance *in, pre_real_size = flb_input_chunk_get_real_size(ic); } - /* Write the new data */ - ret = flb_input_chunk_write(ic, buf, buf_size); - if (ret == -1) { - flb_error("[input chunk] error writing data from %s instance", - in->name); - cio_chunk_tx_rollback(ic->chunk); - return -1; - } - #ifdef FLB_HAVE_CHUNK_TRACE flb_chunk_trace_do_input(ic); #endif /* FLB_HAVE_CHUNK_TRACE */ @@ -1529,11 +1524,45 @@ static int input_chunk_append_raw(struct flb_input_instance *in, } #endif + filtered_data_buffer = NULL; + final_data_buffer = (char *) buf; + final_data_size = buf_size; + /* Apply filters */ if (event_type == FLB_INPUT_LOGS) { flb_filter_do(ic, buf, buf_size, - tag, tag_len, in->config); + &filtered_data_buffer, + &filtered_data_size, + tag, tag_len, + in->config); + + if (filtered_data_buffer != NULL) { + final_data_buffer = filtered_data_buffer; + final_data_size = filtered_data_size; + } + } + + if (final_data_size > 0){ + ret = flb_input_chunk_write(ic, + final_data_buffer, + final_data_size); + } + else { + ret = 0; + } + + if (filtered_data_buffer != NULL && + filtered_data_buffer != buf) { + flb_free(filtered_data_buffer); + } + + if (ret == -1) { + flb_error("[input chunk] error writing data from %s instance", + in->name); + cio_chunk_tx_rollback(ic->chunk); + + return -1; } /* get the chunks content size */