diff --git a/include/fluent-bit/flb_input_chunk.h b/include/fluent-bit/flb_input_chunk.h index 6d36ec9b422..47df4c2e269 100644 --- a/include/fluent-bit/flb_input_chunk.h +++ b/include/fluent-bit/flb_input_chunk.h @@ -42,6 +42,10 @@ struct flb_input_chunk { int busy; /* buffer is being flushed */ int fs_backlog; /* chunk originated from fs backlog */ int sp_done; /* sp already processed this chunk */ +#ifdef FLB_HAVE_METRICS + int total_records; /* total records in the chunk */ + int added_records; /* recently added records */ +#endif void *chunk; /* context of struct cio_chunk */ off_t stream_off; /* stream offset */ msgpack_packer mp_pck; /* msgpack packer */ diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index fd3ab6ad5d0..01c7695710d 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -45,21 +45,32 @@ ssize_t flb_input_chunk_get_size(struct flb_input_chunk *ic) int flb_input_chunk_write(void *data, const char *buf, size_t len) { + int ret; struct flb_input_chunk *ic; ic = (struct flb_input_chunk *) data; - return cio_chunk_write(ic->chunk, buf, len); + ret = cio_chunk_write(ic->chunk, buf, len); +#ifdef FLB_HAVE_METRICS + if (ret == CIO_OK) { + ic->added_records = flb_mp_count(buf, len); + ic->total_records += ic->added_records; + } +#endif + + return ret; } int flb_input_chunk_write_at(void *data, off_t offset, const char *buf, size_t len) { + int ret; struct flb_input_chunk *ic; ic = (struct flb_input_chunk *) data; - return cio_chunk_write_at(ic->chunk, offset, buf, len); + ret = cio_chunk_write_at(ic->chunk, offset, buf, len); + return ret; } /* Create an input chunk using a Chunk I/O */ @@ -95,9 +106,9 @@ struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in, return ic; } - records = flb_mp_count(buf_data, buf_size); - if (records > 0) { - flb_metrics_sum(FLB_METRIC_N_RECORDS, records, in->metrics); + ic->total_records = flb_mp_count(buf_data, buf_size); + if (ic->total_records > 0) { + flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->total_records, in->metrics); flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics); } #endif @@ -172,6 +183,9 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, ic->fs_backlog = FLB_FALSE; ic->in = in; ic->stream_off = 0; +#ifdef FLB_HAVE_METRICS + ic->total_records = 0; +#endif msgpack_packer_init(&ic->mp_pck, ic, flb_input_chunk_write); mk_list_add(&ic->_head, &in->chunks); @@ -456,9 +470,8 @@ int flb_input_chunk_append_raw(struct flb_input_instance *in, /* Update 'input' metrics */ #ifdef FLB_HAVE_METRICS - records = flb_mp_count(buf, buf_size); - if (records > 0) { - flb_metrics_sum(FLB_METRIC_N_RECORDS, records, in->metrics); + if (ic->total_records > 0) { + flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->added_records, in->metrics); flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics); } #endif