Skip to content

Commit

Permalink
multiline: extend support for concat records and override properties
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Jul 19, 2021
1 parent c37f9f6 commit 460694b
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 33 deletions.
10 changes: 10 additions & 0 deletions include/fluent-bit/multiline/flb_ml.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,16 @@ struct flb_ml_parser_ins {

void *cb_data; /* opaque data */


/*
* Duplicate original parser definition properties for this instance. There
* are cases where the caller wants an instance of a certain multiline
* parser but with a custom value for key_content, key_pattern or key_group.
*/
flb_sds_t key_content;
flb_sds_t key_pattern;
flb_sds_t key_group;

/*
* last stream_id and last_stream_group: keeping a reference of the last
* insertion path is important to determinate when should we flush our
Expand Down
2 changes: 2 additions & 0 deletions include/fluent-bit/multiline/flb_ml_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ void flb_ml_parser_destroy_all(struct mk_list *list);

struct flb_ml_parser_ins *flb_ml_parser_instance_create(struct flb_ml *ml,
char *name);
int flb_ml_parser_instance_set(struct flb_ml_parser_ins *p, char *prop, char *val);

int flb_ml_parser_instance_destroy(struct flb_ml_parser_ins *ins);
int flb_ml_parser_instance_has_data(struct flb_ml_parser_ins *ins);

Expand Down
137 changes: 104 additions & 33 deletions src/multiline/flb_ml.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,12 @@ int flb_ml_register_context(struct flb_ml_stream_group *group,
return 0;
}

static inline void breakline_prepare(struct flb_ml_parser *mlp,
static inline void breakline_prepare(struct flb_ml_parser_ins *parser_i,
struct flb_ml_stream_group *stream_group)
{
int len;

if (mlp->key_content) {
if (parser_i->key_content) {
return;
}

Expand Down Expand Up @@ -221,7 +221,6 @@ static int package_content(struct flb_ml_stream *mst,
val = val_pattern;
}


if (val) {
buf_data = (char *) val->via.str.ptr;
buf_size = val->via.str.size;
Expand Down Expand Up @@ -260,7 +259,7 @@ static int package_content(struct flb_ml_stream *mst,
}

/* Prepare concatenation */
breakline_prepare(parser, stream_group);
breakline_prepare(parser_i, stream_group);

/* Concatenate value */
if (val_content) {
Expand Down Expand Up @@ -294,7 +293,7 @@ static int package_content(struct flb_ml_stream *mst,
}

/* Prepare concatenation */
breakline_prepare(parser, stream_group);
breakline_prepare(parser_i, stream_group);

/* Concatenate value */
if (val_content) {
Expand Down Expand Up @@ -325,7 +324,7 @@ static int package_content(struct flb_ml_stream *mst,

/* Concatenate value */
flb_sds_cat_safe(&stream_group->buf, buf, size);
breakline_prepare(parser, stream_group);
breakline_prepare(parser_i, stream_group);
flb_ml_flush_stream_group(parser, mst, stream_group);
}
else {
Expand Down Expand Up @@ -395,7 +394,6 @@ static int process_append(struct flb_ml_parser_ins *parser_i,
msgpack_object *val_group = NULL;
msgpack_unpacked result;
struct flb_time tm_record;
struct flb_ml_parser *parser = parser_i->ml_parser;

/* Lookup the key */
if (type == FLB_ML_TYPE_TEXT) {
Expand Down Expand Up @@ -434,21 +432,22 @@ static int process_append(struct flb_ml_parser_ins *parser_i,
}

/* Lookup for key_content entry */
id_content = get_key_id(full_map, parser->key_content);
id_content = get_key_id(full_map, parser_i->key_content);
if (id_content == -1) {
if (unpacked) {
msgpack_unpacked_destroy(&result);
}
return -1;
}

val_content = &full_map->via.map.ptr[id_content].val;
if (val_content->type != MSGPACK_OBJECT_STR) {
val_content = NULL;
}

/* Optional: Lookup for key_pattern entry */
if (parser->key_pattern) {
id_pattern = get_key_id(full_map, parser->key_pattern);
if (parser_i->key_pattern) {
id_pattern = get_key_id(full_map, parser_i->key_pattern);
if (id_pattern >= 0) {
val_pattern = &full_map->via.map.ptr[id_pattern].val;
if (val_pattern->type != MSGPACK_OBJECT_STR) {
Expand All @@ -458,8 +457,8 @@ static int process_append(struct flb_ml_parser_ins *parser_i,
}

/* Optional: lookup for key_group entry */
if (parser->key_group) {
id_group = get_key_id(full_map, parser->key_group);
if (parser_i->key_group) {
id_group = get_key_id(full_map, parser_i->key_group);
if (id_group >= 0) {
val_group = &full_map->via.map.ptr[id_group].val;
if (val_group->type != MSGPACK_OBJECT_STR) {
Expand All @@ -485,7 +484,8 @@ static int process_append(struct flb_ml_parser_ins *parser_i,
static int ml_append_try_parser(struct flb_ml_parser_ins *parser,
uint64_t stream_id,
int type,
struct flb_time *tm, void *buf, size_t size)
struct flb_time *tm, void *buf, size_t size,
msgpack_object *map)
{
int ret;
int release = FLB_FALSE;
Expand Down Expand Up @@ -536,7 +536,7 @@ static int ml_append_try_parser(struct flb_ml_parser_ins *parser,
}

/* Process the binary record */
ret = process_append(parser, mst, type, &out_time, NULL, out_buf, out_size);
ret = process_append(parser, mst, type, &out_time, map, out_buf, out_size);
if (ret == -1) {
if (release == FLB_TRUE) {
flb_free(out_buf);
Expand Down Expand Up @@ -577,7 +577,7 @@ int flb_ml_append(struct flb_ml *ml, uint64_t stream_id,

if (lru_parser && lru_parser->last_stream_id == stream_id) {
ret = ml_append_try_parser(lru_parser, lru_parser->last_stream_id, type,
tm, buf, size);
tm, buf, size, NULL);
if (ret == 0) {
processed = FLB_TRUE;
break;
Expand All @@ -602,7 +602,7 @@ int flb_ml_append(struct flb_ml *ml, uint64_t stream_id,
}

ret = ml_append_try_parser(parser_i, stream_id, type,
tm, buf, size);
tm, buf, size, NULL);
if (ret == 0) {
group->lru_parser = parser_i;
group->lru_parser->last_stream_id = stream_id;
Expand Down Expand Up @@ -648,13 +648,16 @@ int flb_ml_append(struct flb_ml *ml, uint64_t stream_id,
int flb_ml_append_object(struct flb_ml *ml, uint64_t stream_id,
struct flb_time *tm, msgpack_object *obj)
{
int type;
int ret;
int type;
int processed = FLB_FALSE;
struct mk_list *head;
struct mk_list *head_group;
struct flb_ml_group *group;
struct flb_ml_parser_ins *lru_parser = NULL;
struct flb_ml_parser_ins *parser_i;
struct flb_ml_stream *mst;
struct flb_ml_stream_group *st_group;

/*
* As incoming objects, we only accept Fluent Bit array format
Expand All @@ -679,22 +682,89 @@ int flb_ml_append_object(struct flb_ml *ml, uint64_t stream_id,
mk_list_foreach(head, &ml->groups) {
group = mk_list_entry(head, struct flb_ml_group, _head);

mk_list_foreach(head_group, &group->parsers) {
parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head);
/* Check if the incoming data matches the last recently used parser */
lru_parser = group->lru_parser;

/* Get the stream */
mst = flb_ml_stream_get(parser_i, stream_id);
if (!mst) {
flb_error("[multiline] invalid stream_id %" PRIu64 ", could not "
"append content to multiline context", stream_id);
return -1;
if (lru_parser && lru_parser->last_stream_id == stream_id) {
ret = ml_append_try_parser(lru_parser, lru_parser->last_stream_id, type,
tm, NULL, 0, obj);
if (ret == 0) {
processed = FLB_TRUE;
break;
}
else {
flb_ml_flush_parser_instance(ml,
lru_parser,
lru_parser->last_stream_id);
}
}
else if (lru_parser && lru_parser->last_stream_id > 0) {
flb_ml_flush_parser_instance(ml,
lru_parser,
lru_parser->last_stream_id);
}
}

mk_list_foreach(head_group, &group->parsers) {
parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head);
if (lru_parser && parser_i == lru_parser) {
continue;
}

ret = process_append(parser_i, mst, type, tm, obj, NULL, 0);
ret = ml_append_try_parser(parser_i, stream_id, type,
tm, NULL, 0, obj);
if (ret == 0) {
group->lru_parser = parser_i;
group->lru_parser->last_stream_id = stream_id;
lru_parser = parser_i;
processed = FLB_TRUE;
break;
}
else {
parser_i = NULL;
}

}

if (!processed) {
if (lru_parser) {
flb_ml_flush_parser_instance(ml, lru_parser, stream_id);
parser_i = lru_parser;
}
else {
/* get the first parser (just to make use of it buffers) */
parser_i = mk_list_entry_first(&group->parsers,
struct flb_ml_parser_ins,
_head);
}

flb_ml_flush_parser_instance(ml, parser_i, stream_id);
mst = flb_ml_stream_get(parser_i, stream_id);
if (!mst) {
flb_error("[multiline] invalid stream_id %" PRIu64 ", could not "
"append content to multiline context", stream_id);
return -1;
}

/* Get stream group */
st_group = flb_ml_stream_group_get(mst->parser, mst, NULL);

/* Append record content to group msgpack buffer */
msgpack_pack_array(&st_group->mp_pck, 2);
flb_time_append_to_msgpack(tm, &st_group->mp_pck, 0);
msgpack_pack_object(&st_group->mp_pck, *obj);

/* force flush */
mst->cb_flush(parser_i->ml_parser,
mst, mst->cb_data,
st_group->mp_sbuf.data, st_group->mp_sbuf.size);

/* reset group buffer counters */
st_group->mp_sbuf.size = 0;
flb_sds_len_set(st_group->buf, 0);

/* Update last flush time */
st_group->last_flush = time_ms_now();
}

return 0;
Expand Down Expand Up @@ -819,8 +889,9 @@ int flb_ml_flush_stream_group(struct flb_ml_parser *ml_parser,
msgpack_sbuffer mp_sbuf;
msgpack_packer mp_pck;
msgpack_unpacked result;
struct flb_ml_parser_ins *parser_i = mst->parser;

breakline_prepare(ml_parser, group);
breakline_prepare(parser_i, group);
len = flb_sds_len(group->buf);

/* init msgpack buffer */
Expand Down Expand Up @@ -852,7 +923,7 @@ int flb_ml_flush_stream_group(struct flb_ml_parser *ml_parser,
msgpack_pack_array(&mp_pck, 2);
flb_time_append_to_msgpack(&group->mp_time, &mp_pck, 0);

len = flb_sds_len(ml_parser->key_content);
len = flb_sds_len(parser_i->key_content);
size = map.via.map.size;
msgpack_pack_map(&mp_pck, size);

Expand All @@ -865,9 +936,9 @@ int flb_ml_flush_stream_group(struct flb_ml_parser *ml_parser,
* concatenated multiline buffer
*/
if (k.type == MSGPACK_OBJECT_STR &&
ml_parser->key_content &&
parser_i->key_content &&
k.via.str.size == len &&
strncmp(k.via.str.ptr, ml_parser->key_content, len) == 0) {
strncmp(k.via.str.ptr, parser_i->key_content, len) == 0) {

/* key */
msgpack_pack_object(&mp_pck, k);
Expand All @@ -893,10 +964,10 @@ int flb_ml_flush_stream_group(struct flb_ml_parser *ml_parser,
msgpack_pack_map(&mp_pck, 1);

/* key */
if (ml_parser->key_content) {
len = flb_sds_len(ml_parser->key_content);
if (parser_i->key_content) {
len = flb_sds_len(parser_i->key_content);
msgpack_pack_str(&mp_pck, len);
msgpack_pack_str_body(&mp_pck, ml_parser->key_content, len);
msgpack_pack_str_body(&mp_pck, parser_i->key_content, len);
}
else {
msgpack_pack_str(&mp_pck, 3);
Expand Down
49 changes: 49 additions & 0 deletions src/multiline/flb_ml_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,17 @@ struct flb_ml_parser_ins *flb_ml_parser_instance_create(struct flb_ml *ml,
ins->ml_parser = parser;
mk_list_init(&ins->streams);

/* Copy parent configuration */
if (parser->key_content) {
ins->key_content = flb_sds_create(parser->key_content);
}
if (parser->key_pattern) {
ins->key_pattern = flb_sds_create(parser->key_pattern);
}
if (parser->key_group) {
ins->key_group = flb_sds_create(parser->key_group);
}

/* Append this multiline parser instance to the active multiline group */
ret = flb_ml_group_add_parser(ml, ins);
if (ret != 0) {
Expand All @@ -222,6 +233,34 @@ struct flb_ml_parser_ins *flb_ml_parser_instance_create(struct flb_ml *ml,
return ins;
}

/* Override a fixed parser property for the instance only*/
int flb_ml_parser_instance_set(struct flb_ml_parser_ins *p, char *prop, char *val)
{
if (strcasecmp(prop, "key_content") == 0) {
if (p->key_content) {
flb_sds_destroy(p->key_content);
}
p->key_content = flb_sds_create(val);
}
else if (strcasecmp(prop, "key_pattern") == 0) {
if (p->key_pattern) {
flb_sds_destroy(p->key_pattern);
}
p->key_pattern = flb_sds_create(val);
}
else if (strcasecmp(prop, "key_group") == 0) {
if (p->key_group) {
flb_sds_destroy(p->key_group);
}
p->key_group = flb_sds_create(val);
}
else {
return -1;
}

return 0;
}

int flb_ml_parser_destroy(struct flb_ml_parser *ml_parser)
{
if (!ml_parser) {
Expand Down Expand Up @@ -271,6 +310,16 @@ int flb_ml_parser_instance_destroy(struct flb_ml_parser_ins *ins)
flb_ml_stream_destroy(stream);
}

if (ins->key_content) {
flb_sds_destroy(ins->key_content);
}
if (ins->key_pattern) {
flb_sds_destroy(ins->key_pattern);
}
if (ins->key_group) {
flb_sds_destroy(ins->key_group);
}

flb_free(ins);

return 0;
Expand Down

0 comments on commit 460694b

Please sign in to comment.