Skip to content

Commit

Permalink
paerser: add new logfmt parser (#871)
Browse files Browse the repository at this point in the history
Signed-off-by: Manuel Luis Sanmartín Rozada <[email protected]>
  • Loading branch information
manuelluis authored and edsiper committed Nov 2, 2018
1 parent d91ce96 commit 920a189
Show file tree
Hide file tree
Showing 4 changed files with 321 additions and 0 deletions.
1 change: 1 addition & 0 deletions include/fluent-bit/flb_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#define FLB_PARSER_REGEX 1
#define FLB_PARSER_JSON 2
#define FLB_PARSER_LTSV 3
#define FLB_PARSER_LOGFMT 4

struct flb_parser_types {
char *key;
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ if(FLB_REGEX)
flb_parser_json.c
flb_parser_decoder.c
flb_parser_ltsv.c
flb_parser_logfmt.c
)
endif()

Expand Down
12 changes: 12 additions & 0 deletions src/flb_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ int flb_parser_ltsv_do(struct flb_parser *parser,
void **out_buf, size_t *out_size,
struct flb_time *out_time);

int flb_parser_logfmt_do(struct flb_parser *parser,
char *buf, size_t length,
void **out_buf, size_t *out_size,
struct flb_time *out_time);

struct flb_parser *flb_parser_create(char *name, char *format,
char *p_regex,
char *time_fmt, char *time_key,
Expand Down Expand Up @@ -144,6 +149,9 @@ struct flb_parser *flb_parser_create(char *name, char *format,
else if (strcmp(format, "ltsv") == 0) {
p->type = FLB_PARSER_LTSV;
}
else if (strcmp(format, "logfmt") == 0) {
p->type = FLB_PARSER_LOGFMT;
}
else {
flb_error("[parser:%s] Invalid format %s", name, format);
flb_free(p);
Expand Down Expand Up @@ -588,6 +596,10 @@ int flb_parser_do(struct flb_parser *parser, char *buf, size_t length,
return flb_parser_ltsv_do(parser, buf, length,
out_buf, out_size, out_time);
}
else if (parser->type == FLB_PARSER_LOGFMT) {
return flb_parser_logfmt_do(parser, buf, length,
out_buf, out_size, out_time);
}

return -1;
}
Expand Down
307 changes: 307 additions & 0 deletions src/flb_parser_logfmt.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2015-2018 Treasure Data Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#define _GNU_SOURCE
#include <time.h>

#include <fluent-bit/flb_parser.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_parser_decoder.h>
#include <fluent-bit/flb_unescape.h>
#include <fluent-bit/flb_mem.h>

/*
* https://brandur.org/logfmt
* https://godoc.org/github.com/kr/logfmt
*
* ident_byte = any byte greater than ' ', excluding '=' and '"'
* string_byte = any byte excluding '"' and '\'
* garbage = !ident_byte
* ident = ident_byte, { ident byte }
* key = ident
* value = ident | '"', { string_byte | '\', '"' }, '"'
* pair = key, '=', value | key, '=' | key
* message = { garbage, pair }, garbage
*/

static char ident_byte[256] = {
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
};

static int logfmt_parser(struct flb_parser *parser,
char *in_buf, size_t in_size,
msgpack_packer *tmp_pck,
char *time_key, size_t time_key_len,
time_t *time_lookup, double *tmfrac,
size_t *map_size)
{
int ret;
struct tm tm = {0};
unsigned char *key = NULL;
size_t key_len = 0;
unsigned char *value = NULL;
size_t value_len = 0;
unsigned char *c = (unsigned char *)in_buf;
unsigned char *end = c + in_size;
int last_byte;
int do_pack = FLB_TRUE;
int value_escape = FLB_FALSE;

/* if map_size is 0 only count the number of k:v */
if (*map_size == 0) {
do_pack = FLB_FALSE;
}

while (c < end) {
/* garbage */
while (!ident_byte[*c] && (c < end)) {
c++;
}
if (c == end) {
break;
}
/* key */
key = c;
while (ident_byte[*c] && (c < end)) {
c++;
}
key_len = c - key;
/* value */
value_len = 0;
value_escape = FLB_FALSE;
if (*c == '=') {
c++;
if (c < end) {
if (*c == '"') {
c++;
value = c;
while (c < end) {
if (*c != '\\' && *c!= '"') {
c++;
}
else if (*c == '\\') {
value_escape = FLB_TRUE;
c++;
if (c == end) {
break;
}
c++;
}
else {
break;
}
}
value_len = c - value;
if (c < end && *c == '\"') {
c++;
}
}
else {
value = c;
while (ident_byte[*c] && (c < end)) {
c++;
}
value_len = c - value;
}
}
}

if (key_len > 0) {
int time_found = FLB_FALSE;

if (parser->time_fmt && key_len == time_key_len &&
value_len > 0 &&
!strncmp((char *)key, time_key, key_len)) {
if (do_pack) {
ret = flb_parser_time_lookup((char *) value, value_len,
0, parser, &tm, tmfrac);
if (ret == -1) {
flb_error("[parser:%s] Invalid time format %s.",
parser->name, parser->time_fmt);
return -1;
}
*time_lookup = flb_parser_tm2time(&tm);
}
time_found = FLB_TRUE;
}

if (time_found == FLB_FALSE || parser->time_keep == FLB_TRUE) {
if (do_pack) {
if (parser->types_len != 0) {
flb_parser_typecast((char*) key, key_len,
(char*) value, value_len,
tmp_pck,
parser->types,
parser->types_len);
}
else {
msgpack_pack_str(tmp_pck, key_len);
msgpack_pack_str_body(tmp_pck, (char *)key, key_len);
if (value_len == 0) {
msgpack_pack_true(tmp_pck);
}
else {
if (value_escape == FLB_TRUE) {
int out_len;
char *out_str;

out_str = flb_malloc(value_len + 1);
if (out_str == NULL) {
flb_errno();
return -1;
}
out_str[0] = 0;
flb_unescape_string_utf8((char *)value,
value_len,
out_str);
out_len = strlen(out_str);

msgpack_pack_str(tmp_pck, out_len);
msgpack_pack_str_body(tmp_pck,
(char *)out_str,
out_len);

flb_free(out_str);
}
else {
msgpack_pack_str(tmp_pck, value_len);
msgpack_pack_str_body(tmp_pck,
(char *)value,
value_len);
}
}
}
}
else {
(*map_size)++;
}
}
}

if (c == end) {
break;
}

if (*c == '\r') {
c++;
if (c == end) {
break;
}
if (*c == '\n') {
c++;
}
break;
}
if (*c == '\n') {
c++;
break;
}
}
last_byte = (char *)c - in_buf;

return last_byte;
}

int flb_parser_logfmt_do(struct flb_parser *parser,
char *in_buf, size_t in_size,
void **out_buf, size_t *out_size,
struct flb_time *out_time)
{
int ret;
time_t time_lookup;
double tmfrac = 0;
struct flb_time *t;
msgpack_sbuffer tmp_sbuf;
msgpack_packer tmp_pck;
char *dec_out_buf;
size_t dec_out_size;
size_t map_size;
char *time_key;
size_t time_key_len;
int last_byte;

if (parser->time_key) {
time_key = parser->time_key;
}
else {
time_key = "time";
}
time_key_len = strlen(time_key);
time_lookup = time(NULL);

/* count the number of key value pairs */
map_size = 0;
logfmt_parser(parser, in_buf, in_size, NULL,
time_key, time_key_len,
&time_lookup, &tmfrac, &map_size);
if (map_size == 0) {
return -1;
}

/* Prepare new outgoing buffer */
msgpack_sbuffer_init(&tmp_sbuf);
msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);
msgpack_pack_map(&tmp_pck, map_size);

last_byte = logfmt_parser(parser, in_buf, in_size, &tmp_pck,
time_key, time_key_len,
&time_lookup, &tmfrac, &map_size);
if (last_byte < 0) {
msgpack_sbuffer_destroy(&tmp_sbuf);
return last_byte;
}

/* Export results */
*out_buf = tmp_sbuf.data;
*out_size = tmp_sbuf.size;

t = out_time;
t->tm.tv_sec = time_lookup;
t->tm.tv_nsec = (tmfrac * 1000000000);

/* Check if some decoder was specified */
if (parser->decoders) {
ret = flb_parser_decoder_do(parser->decoders,
tmp_sbuf.data, tmp_sbuf.size,
&dec_out_buf, &dec_out_size);
if (ret == 0) {
*out_buf = dec_out_buf;
*out_size = dec_out_size;
msgpack_sbuffer_destroy(&tmp_sbuf);
}
}

return last_byte;
}

0 comments on commit 920a189

Please sign in to comment.