Skip to content

Commit

Permalink
input_chunk: keep number of records in the chunk (#2159)
Browse files Browse the repository at this point in the history
this patch makes the Input Chunk interface to have a counter for
the number of records ingested into the chunk, with this we can
avoid re-counting on other parts of the pipeline.

Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed May 11, 2020
1 parent 92876a6 commit c2da9a1
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 8 deletions.
4 changes: 4 additions & 0 deletions include/fluent-bit/flb_input_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ struct flb_input_chunk {
int busy; /* buffer is being flushed */
int fs_backlog; /* chunk originated from fs backlog */
int sp_done; /* sp already processed this chunk */
#ifdef FLB_HAVE_METRICS
int total_records; /* total records in the chunk */
int added_records; /* recently added records */
#endif
void *chunk; /* context of struct cio_chunk */
off_t stream_off; /* stream offset */
msgpack_packer mp_pck; /* msgpack packer */
Expand Down
29 changes: 21 additions & 8 deletions src/flb_input_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,32 @@ ssize_t flb_input_chunk_get_size(struct flb_input_chunk *ic)

int flb_input_chunk_write(void *data, const char *buf, size_t len)
{
int ret;
struct flb_input_chunk *ic;

ic = (struct flb_input_chunk *) data;

return cio_chunk_write(ic->chunk, buf, len);
ret = cio_chunk_write(ic->chunk, buf, len);
#ifdef FLB_HAVE_METRICS
if (ret == CIO_OK) {
ic->added_records = flb_mp_count(buf, len);
ic->total_records += ic->added_records;
}
#endif

return ret;
}

int flb_input_chunk_write_at(void *data, off_t offset,
const char *buf, size_t len)
{
int ret;
struct flb_input_chunk *ic;

ic = (struct flb_input_chunk *) data;

return cio_chunk_write_at(ic->chunk, offset, buf, len);
ret = cio_chunk_write_at(ic->chunk, offset, buf, len);
return ret;
}

/* Create an input chunk using a Chunk I/O */
Expand Down Expand Up @@ -95,9 +106,9 @@ struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in,
return ic;
}

records = flb_mp_count(buf_data, buf_size);
if (records > 0) {
flb_metrics_sum(FLB_METRIC_N_RECORDS, records, in->metrics);
ic->total_records = flb_mp_count(buf_data, buf_size);
if (ic->total_records > 0) {
flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->total_records, in->metrics);
flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics);
}
#endif
Expand Down Expand Up @@ -172,6 +183,9 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in,
ic->fs_backlog = FLB_FALSE;
ic->in = in;
ic->stream_off = 0;
#ifdef FLB_HAVE_METRICS
ic->total_records = 0;
#endif
msgpack_packer_init(&ic->mp_pck, ic, flb_input_chunk_write);
mk_list_add(&ic->_head, &in->chunks);

Expand Down Expand Up @@ -456,9 +470,8 @@ int flb_input_chunk_append_raw(struct flb_input_instance *in,

/* Update 'input' metrics */
#ifdef FLB_HAVE_METRICS
records = flb_mp_count(buf, buf_size);
if (records > 0) {
flb_metrics_sum(FLB_METRIC_N_RECORDS, records, in->metrics);
if (ic->total_records > 0) {
flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->added_records, in->metrics);
flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics);
}
#endif
Expand Down

0 comments on commit c2da9a1

Please sign in to comment.