Skip to content

Commit

Permalink
in_tail: add custom keys to multiline payload
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Jul 24, 2021
1 parent 382e9d0 commit 655e17d
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 12 deletions.
146 changes: 136 additions & 10 deletions plugins/in_tail/tail_file.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,91 @@ static inline void consume_bytes(char *buf, int bytes, int length)
memmove(buf, buf + bytes, length - bytes);
}

static int record_append_custom_keys(struct flb_tail_file *file,
size_t processed_bytes,
char *in_data, size_t in_size,
char **out_data, size_t *out_size)
{
int i;
int ok = MSGPACK_UNPACK_SUCCESS;
int len;
size_t off = 0;
size_t total;
msgpack_unpacked result;
msgpack_object time;
msgpack_object map;
msgpack_object k;
msgpack_object v;
msgpack_sbuffer mp_sbuf;
msgpack_packer mp_pck;
struct flb_mp_map_header mh;

/* init new buffers */
msgpack_sbuffer_init(&mp_sbuf);
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);

/* Some extra content will be added... */
msgpack_unpacked_init(&result);
while ((msgpack_unpack_next(&result, in_data, in_size, &off) == ok)) {
time = result.data.via.array.ptr[0];
map = result.data.via.array.ptr[1];

msgpack_pack_array(&mp_pck, 2);
msgpack_pack_object(&mp_pck, time);

/* pack map */
flb_mp_map_header_init(&mh, &mp_pck);

/* append previous map keys */
for (i = 0; i < map.via.map.size; i++) {
k = map.via.map.ptr[i].key;
v = map.via.map.ptr[i].val;

flb_mp_map_header_append(&mh);
msgpack_pack_object(&mp_pck, k);
msgpack_pack_object(&mp_pck, v);
}

/* path_key */
if (file->config->path_key) {
len = flb_sds_len(file->config->path_key);

flb_mp_map_header_append(&mh);

/* key */
msgpack_pack_str(&mp_pck, len);
msgpack_pack_str_body(&mp_pck, file->config->path_key, len);

/* val */
msgpack_pack_str(&mp_pck, file->name_len);
msgpack_pack_str_body(&mp_pck, file->name, file->name_len);
}

/* offset_key */
if (file->config->offset_key) {
len = flb_sds_len(file->config->offset_key);

flb_mp_map_header_append(&mh);

/* key */
msgpack_pack_str(&mp_pck, len);
msgpack_pack_str_body(&mp_pck, file->config->offset_key, len);

/* val */
total = file->offset + processed_bytes;
msgpack_pack_uint64(&mp_pck, total);
}

/* finalize map */
flb_mp_map_header_end(&mh);
}

*out_data = mp_sbuf.data;
*out_size = mp_sbuf.size;

return 0;
}

static int unpack_and_pack(msgpack_packer *pck, msgpack_object *root,
const char *key, size_t key_len,
const char *val, size_t val_len, size_t val_uint64)
Expand Down Expand Up @@ -385,11 +470,45 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
if (lines > 0) {
/* Append buffer content to a chunk */
*bytes = processed_bytes;
flb_input_chunk_append_raw(ctx->ins,
file->tag_buf,
file->tag_len,
out_sbuf->data,
out_sbuf->size);

if (out_sbuf->size > 0) {
flb_input_chunk_append_raw(ctx->ins,
file->tag_buf,
file->tag_len,
out_sbuf->data,
out_sbuf->size);
}
else if (ctx->ml_ctx && file->mult_sbuf.size > 0) {
/* If no extra keys are needed, just enqueue the buffer */
if (file->config->path_key == NULL &&
file->config->offset_key == NULL) {
flb_input_chunk_append_raw(ctx->ins,
file->tag_buf,
file->tag_len,
file->mult_sbuf.data,
file->mult_sbuf.size);

}
else {
char *mult_buf = NULL;
size_t mult_size = 0;

/* adjust the records in a new buffer */
record_append_custom_keys(file,
processed_bytes,
file->mult_sbuf.data,
file->mult_sbuf.size,
&mult_buf, &mult_size);

flb_input_chunk_append_raw(ctx->ins,
file->tag_buf,
file->tag_len,
mult_buf,
mult_size);
flb_free(mult_buf);
}
file->mult_sbuf.size = 0;
}
}
else if (file->skip_next) {
*bytes = file->buf_len;
Expand Down Expand Up @@ -649,17 +768,16 @@ static int set_file_position(struct flb_tail_config *ctx,
return 0;
}


/* Multiline flush callback: invoked every time some content is complete */
static int ml_flush_callback(struct flb_ml_parser *parser,
struct flb_ml_stream *mst,
void *data, char *buf_data, size_t buf_size)
{
struct flb_tail_file *file = data;

flb_input_chunk_append_raw(file->config->ins,
file->tag_buf,
file->tag_len,
buf_data,
buf_size);
/* Enqueue the records in our file->multiline buffer */
msgpack_sbuffer_write(&file->mult_sbuf, buf_data, buf_size);
return 0;
}

Expand Down Expand Up @@ -741,7 +859,13 @@ int flb_tail_file_append(char *path, struct stat *st, int mode,
file->mult_keys = 0;
file->mult_flush_timeout = 0;
file->mult_skipping = FLB_FALSE;

/* multiline msgpack buffers */
msgpack_sbuffer_init(&file->mult_sbuf);
msgpack_packer_init(&file->mult_pck, &file->mult_sbuf,
msgpack_sbuffer_write);

/* docker mode */
file->dmode_flush_timeout = 0;
file->dmode_complete = true;
file->dmode_buf = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 65536 : 0);
Expand Down Expand Up @@ -887,6 +1011,8 @@ void flb_tail_file_remove(struct flb_tail_file *file)
mk_list_del(&file->_rotate_head);
}

msgpack_sbuffer_destroy(&file->mult_sbuf);

flb_sds_destroy(file->dmode_buf);
flb_sds_destroy(file->dmode_lastline);
mk_list_del(&file->_head);
Expand Down
4 changes: 2 additions & 2 deletions plugins/in_tail/tail_file_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ struct flb_tail_file {
int mult_firstline_append; /* bool: mult firstline appendable ? */
int mult_skipping; /* skipping because ignode_older than ? */
int mult_keys; /* total number of buffered keys */
msgpack_sbuffer mult_sbuf; /* temporary msgpack buffer */
msgpack_packer mult_pck; /* temporary msgpack packer */
msgpack_sbuffer mult_sbuf; /* temporary msgpack buffer */
msgpack_packer mult_pck; /* temporary msgpack packer */
struct flb_time mult_time; /* multiline time parsed from first line */

/* docker mode */
Expand Down

0 comments on commit 655e17d

Please sign in to comment.