Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

input_chunk: update records in the right place #8223

Merged
merged 4 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/flb_filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,11 @@ void flb_filter_do(struct flb_input_chunk *ic,
#ifdef FLB_HAVE_METRICS
/* timestamp */
ts = cfl_time_now();
#endif

/* Count number of incoming records */
in_records = ic->added_records;
pre_records = ic->total_records - in_records;
#endif

/* Iterate filters */
mk_list_foreach(head, &config->filters) {
Expand Down Expand Up @@ -201,10 +201,9 @@ void flb_filter_do(struct flb_input_chunk *ic,
}
#endif /* FLB_HAVE_CHUNK_TRACE */


#ifdef FLB_HAVE_METRICS
ic->total_records = pre_records;

#ifdef FLB_HAVE_METRICS
/* cmetrics */
cmt_counter_add(f_ins->cmt_drop_records, ts, in_records,
1, (char *[]) {name});
Expand All @@ -216,8 +215,9 @@ void flb_filter_do(struct flb_input_chunk *ic,
break;
}
else {
#ifdef FLB_HAVE_METRICS
out_records = flb_mp_count(out_buf, out_size);

#ifdef FLB_HAVE_METRICS
if (out_records > in_records) {
diff = (out_records - in_records);

Expand All @@ -240,11 +240,11 @@ void flb_filter_do(struct flb_input_chunk *ic,
flb_metrics_sum(FLB_METRIC_N_DROPPED,
diff, f_ins->metrics);
}
#endif

/* set number of records in new chunk */
in_records = out_records;
ic->total_records = pre_records + in_records;
#endif
}

#ifdef FLB_HAVE_CHUNK_TRACE
Expand Down
50 changes: 25 additions & 25 deletions src/flb_input_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1499,31 +1499,6 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
flb_chunk_trace_do_input(ic);
#endif /* FLB_HAVE_CHUNK_TRACE */

/* Update 'input' metrics */
#ifdef FLB_HAVE_METRICS
if (ret == CIO_OK) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When in this spot, this check would fail because the last time ret is set is by a function that returns CIO_TRUE. CIO_TRUE is a !0 macro. CIO_OK is 0, meaning this check would always fail when flb_chunk_is_up or cio_chunk_up_force succeeded.

ic->added_records = n_records;
ic->total_records += n_records;
}

if (ic->total_records > 0) {
/* timestamp */
ts = cfl_time_now();

/* fluentbit_input_records_total */
cmt_counter_add(in->cmt_records, ts, ic->added_records,
1, (char *[]) {(char *) flb_input_name(in)});

/* fluentbit_input_bytes_total */
cmt_counter_add(in->cmt_bytes, ts, buf_size,
1, (char *[]) {(char *) flb_input_name(in)});

/* OLD api */
flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->added_records, in->metrics);
flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics);
}
#endif

filtered_data_buffer = NULL;
final_data_buffer = (char *) buf;
final_data_size = buf_size;
Expand Down Expand Up @@ -1555,6 +1530,31 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
flb_free(filtered_data_buffer);
}

if (ret == CIO_OK) {
ic->added_records = n_records;
braydonk marked this conversation as resolved.
Show resolved Hide resolved
ic->total_records += n_records;
}

/* Update 'input' metrics */
#ifdef FLB_HAVE_METRICS
if (ic->total_records > 0) {
/* timestamp */
ts = cfl_time_now();

/* fluentbit_input_records_total */
cmt_counter_add(in->cmt_records, ts, ic->added_records,
1, (char *[]) {(char *) flb_input_name(in)});

/* fluentbit_input_bytes_total */
cmt_counter_add(in->cmt_bytes, ts, buf_size,
1, (char *[]) {(char *) flb_input_name(in)});

/* OLD api */
flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->added_records, in->metrics);
flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics);
}
#endif

if (ret == -1) {
flb_error("[input chunk] error writing data from %s instance",
in->name);
Expand Down
2 changes: 0 additions & 2 deletions src/flb_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -362,9 +362,7 @@ struct flb_task *flb_task_create(uint64_t ref_id,
return NULL;
}

#ifdef FLB_HAVE_METRICS
total_events = ((struct flb_input_chunk *) ic)->total_records;
#endif

/* event chunk */
evc = flb_event_chunk_create(ic->event_type,
Expand Down
90 changes: 89 additions & 1 deletion tests/internal/input_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ void flb_test_input_chunk_fs_chunks_size_real()
flb_input_chunk_append_raw(i_ins, FLB_INPUT_LOGS, 256, "dummy", 4, (void *)buf, 256);
msgpack_sbuffer_destroy(&mp_sbuf);

/* clean up test chunks */
/* Check each test chunk for size discrepancy */
mk_list_foreach_safe(head, tmp, &i_ins->chunks) {
ic = mk_list_entry(head, struct flb_input_chunk, _head);
if (cio_chunk_get_real_size(ic->chunk) != cio_chunk_get_content_size(ic->chunk)) {
Expand Down Expand Up @@ -506,11 +506,99 @@ void flb_test_input_chunk_fs_chunks_size_real()
flb_config_exit(cfg);
}

/* This tests uses the subsystems of the engine directly
* to avoid threading issues when submitting chunks.
*/
void flb_test_input_chunk_correct_total_records(void)
{
int records;
struct flb_input_instance *i_ins;
struct flb_output_instance *o_ins;
struct mk_list *tmp;
struct mk_list *head;
struct flb_input_chunk *ic;
struct flb_task *task;
struct flb_config *cfg;
struct cio_ctx *cio;
msgpack_sbuffer mp_sbuf;
char buf[262144];
struct mk_event_loop *evl;
struct cio_options opts = {0};

flb_init_env();
cfg = flb_config_init();
evl = mk_event_loop_create(256);

TEST_CHECK(evl != NULL);
cfg->evl = evl;

flb_log_create(cfg, FLB_LOG_STDERR, FLB_LOG_DEBUG, NULL);

i_ins = flb_input_new(cfg, "dummy", NULL, FLB_TRUE);
i_ins->storage_type = CIO_STORE_FS;

cio_options_init(&opts);

opts.root_path = "/tmp/input-chunk-fs_chunks-size_real";
opts.log_cb = log_cb;
opts.log_level = CIO_LOG_DEBUG;
opts.flags = CIO_OPEN;

cio = cio_create(&opts);
flb_storage_input_create(cio, i_ins);
flb_input_init_all(cfg);

o_ins = flb_output_new(cfg, "http", NULL, FLB_TRUE);
// not the right way to do this
o_ins->id = 1;
TEST_CHECK_(o_ins != NULL, "unable to instance output");
flb_output_set_property(o_ins, "match", "*");
flb_output_set_property(o_ins, "storage.total_limit_size", "1M");

TEST_CHECK_((flb_router_io_set(cfg) != -1), "unable to router");

/* fill up the chunk ... */
memset((void *)buf, 0x41, sizeof(buf));
msgpack_sbuffer_init(&mp_sbuf);
gen_buf(&mp_sbuf, buf, sizeof(buf));

records = flb_mp_count(buf, sizeof(buf));
flb_input_chunk_append_raw(i_ins, FLB_INPUT_LOGS, records, "dummy", 4, (void *)buf, sizeof(buf));
msgpack_sbuffer_destroy(&mp_sbuf);

/* Check each chunk's total records */
mk_list_foreach_safe(head, tmp, &i_ins->chunks) {
ic = mk_list_entry(head, struct flb_input_chunk, _head);
TEST_CHECK_(ic->total_records > 0, "found input chunk with 0 total records");
}

/* FORCE clean up test tasks*/
mk_list_foreach_safe(head, tmp, &i_ins->tasks) {
task = mk_list_entry(head, struct flb_task, _head);
flb_info("[task] cleanup test task");
flb_task_destroy(task, FLB_TRUE);
}

/* clean up test chunks */
mk_list_foreach_safe(head, tmp, &i_ins->chunks) {
ic = mk_list_entry(head, struct flb_input_chunk, _head);
flb_input_chunk_destroy(ic, FLB_TRUE);
}

cio_destroy(cio);
flb_router_exit(cfg);
flb_input_exit_all(cfg);
flb_output_exit(cfg);
flb_config_exit(cfg);
}


/* Test list */
TEST_LIST = {
{"input_chunk_exceed_limit", flb_test_input_chunk_exceed_limit},
{"input_chunk_buffer_valid", flb_test_input_chunk_buffer_valid},
{"input_chunk_dropping_chunks", flb_test_input_chunk_dropping_chunks},
{"input_chunk_fs_chunk_size_real", flb_test_input_chunk_fs_chunks_size_real},
{"input_chunk_correct_total_records", flb_test_input_chunk_correct_total_records},
{NULL, NULL}
};
Loading