diff --git a/include/fluent-bit/flb_parser.h b/include/fluent-bit/flb_parser.h index a10ee0652ec..b260f069b83 100644 --- a/include/fluent-bit/flb_parser.h +++ b/include/fluent-bit/flb_parser.h @@ -30,6 +30,7 @@ #define FLB_PARSER_REGEX 1 #define FLB_PARSER_JSON 2 #define FLB_PARSER_LTSV 3 +#define FLB_PARSER_LOGFMT 4 struct flb_parser_types { char *key; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 21e0433bb0a..dd67ec94261 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -58,6 +58,7 @@ if(FLB_REGEX) flb_parser_json.c flb_parser_decoder.c flb_parser_ltsv.c + flb_parser_logfmt.c ) endif() diff --git a/src/flb_parser.c b/src/flb_parser.c index 4a324b1fbd4..13d59313b6c 100644 --- a/src/flb_parser.c +++ b/src/flb_parser.c @@ -97,6 +97,11 @@ int flb_parser_ltsv_do(struct flb_parser *parser, void **out_buf, size_t *out_size, struct flb_time *out_time); +int flb_parser_logfmt_do(struct flb_parser *parser, + char *buf, size_t length, + void **out_buf, size_t *out_size, + struct flb_time *out_time); + struct flb_parser *flb_parser_create(char *name, char *format, char *p_regex, char *time_fmt, char *time_key, @@ -144,6 +149,9 @@ struct flb_parser *flb_parser_create(char *name, char *format, else if (strcmp(format, "ltsv") == 0) { p->type = FLB_PARSER_LTSV; } + else if (strcmp(format, "logfmt") == 0) { + p->type = FLB_PARSER_LOGFMT; + } else { flb_error("[parser:%s] Invalid format %s", name, format); flb_free(p); @@ -588,6 +596,10 @@ int flb_parser_do(struct flb_parser *parser, char *buf, size_t length, return flb_parser_ltsv_do(parser, buf, length, out_buf, out_size, out_time); } + else if (parser->type == FLB_PARSER_LOGFMT) { + return flb_parser_logfmt_do(parser, buf, length, + out_buf, out_size, out_time); + } return -1; } diff --git a/src/flb_parser_logfmt.c b/src/flb_parser_logfmt.c new file mode 100644 index 00000000000..5c6f6cb1568 --- /dev/null +++ b/src/flb_parser_logfmt.c @@ -0,0 +1,307 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define _GNU_SOURCE +#include + +#include +#include +#include +#include +#include +#include + +/* + * https://brandur.org/logfmt + * https://godoc.org/github.com/kr/logfmt + * + * ident_byte = any byte greater than ' ', excluding '=' and '"' + * string_byte = any byte excluding '"' and '\' + * garbage = !ident_byte + * ident = ident_byte, { ident byte } + * key = ident + * value = ident | '"', { string_byte | '\', '"' }, '"' + * pair = key, '=', value | key, '=' | key + * message = { garbage, pair }, garbage + */ + +static char ident_byte[256] = { + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 +}; + +static int logfmt_parser(struct flb_parser *parser, + char *in_buf, size_t in_size, + msgpack_packer *tmp_pck, + char *time_key, size_t time_key_len, + time_t *time_lookup, double *tmfrac, + size_t *map_size) +{ + int ret; + struct tm tm = {0}; + unsigned char *key = NULL; + size_t key_len = 0; + unsigned char *value = NULL; + size_t value_len = 0; + unsigned char *c = (unsigned char *)in_buf; + unsigned char *end = c + in_size; + int last_byte; + int do_pack = FLB_TRUE; + int value_escape = FLB_FALSE; + + /* if map_size is 0 only count the number of k:v */ + if (*map_size == 0) { + do_pack = FLB_FALSE; + } + + while (c < end) { + /* garbage */ + while (!ident_byte[*c] && (c < end)) { + c++; + } + if (c == end) { + break; + } + /* key */ + key = c; + while (ident_byte[*c] && (c < end)) { + c++; + } + key_len = c - key; + /* value */ + value_len = 0; + value_escape = FLB_FALSE; + if (*c == '=') { + c++; + if (c < end) { + if (*c == '"') { + c++; + value = c; + while (c < end) { + if (*c != '\\' && *c!= '"') { + c++; + } + else if (*c == '\\') { + value_escape = FLB_TRUE; + c++; + if (c == end) { + break; + } + c++; + } + else { + break; + } + } + value_len = c - value; + if (c < end && *c == '\"') { + c++; + } + } + else { + value = c; + while (ident_byte[*c] && (c < end)) { + c++; + } + value_len = c - value; + } + } + } + + if (key_len > 0) { + int time_found = FLB_FALSE; + + if (parser->time_fmt && key_len == time_key_len && + value_len > 0 && + !strncmp((char *)key, time_key, key_len)) { + if (do_pack) { + ret = flb_parser_time_lookup((char *) value, value_len, + 0, parser, &tm, tmfrac); + if (ret == -1) { + flb_error("[parser:%s] Invalid time format %s.", + parser->name, parser->time_fmt); + return -1; + } + *time_lookup = flb_parser_tm2time(&tm); + } + time_found = FLB_TRUE; + } + + if (time_found == FLB_FALSE || parser->time_keep == FLB_TRUE) { + if (do_pack) { + if (parser->types_len != 0) { + flb_parser_typecast((char*) key, key_len, + (char*) value, value_len, + tmp_pck, + parser->types, + parser->types_len); + } + else { + msgpack_pack_str(tmp_pck, key_len); + msgpack_pack_str_body(tmp_pck, (char *)key, key_len); + if (value_len == 0) { + msgpack_pack_true(tmp_pck); + } + else { + if (value_escape == FLB_TRUE) { + int out_len; + char *out_str; + + out_str = flb_malloc(value_len + 1); + if (out_str == NULL) { + flb_errno(); + return -1; + } + out_str[0] = 0; + flb_unescape_string_utf8((char *)value, + value_len, + out_str); + out_len = strlen(out_str); + + msgpack_pack_str(tmp_pck, out_len); + msgpack_pack_str_body(tmp_pck, + (char *)out_str, + out_len); + + flb_free(out_str); + } + else { + msgpack_pack_str(tmp_pck, value_len); + msgpack_pack_str_body(tmp_pck, + (char *)value, + value_len); + } + } + } + } + else { + (*map_size)++; + } + } + } + + if (c == end) { + break; + } + + if (*c == '\r') { + c++; + if (c == end) { + break; + } + if (*c == '\n') { + c++; + } + break; + } + if (*c == '\n') { + c++; + break; + } + } + last_byte = (char *)c - in_buf; + + return last_byte; +} + +int flb_parser_logfmt_do(struct flb_parser *parser, + char *in_buf, size_t in_size, + void **out_buf, size_t *out_size, + struct flb_time *out_time) +{ + int ret; + time_t time_lookup; + double tmfrac = 0; + struct flb_time *t; + msgpack_sbuffer tmp_sbuf; + msgpack_packer tmp_pck; + char *dec_out_buf; + size_t dec_out_size; + size_t map_size; + char *time_key; + size_t time_key_len; + int last_byte; + + if (parser->time_key) { + time_key = parser->time_key; + } + else { + time_key = "time"; + } + time_key_len = strlen(time_key); + time_lookup = time(NULL); + + /* count the number of key value pairs */ + map_size = 0; + logfmt_parser(parser, in_buf, in_size, NULL, + time_key, time_key_len, + &time_lookup, &tmfrac, &map_size); + if (map_size == 0) { + return -1; + } + + /* Prepare new outgoing buffer */ + msgpack_sbuffer_init(&tmp_sbuf); + msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); + msgpack_pack_map(&tmp_pck, map_size); + + last_byte = logfmt_parser(parser, in_buf, in_size, &tmp_pck, + time_key, time_key_len, + &time_lookup, &tmfrac, &map_size); + if (last_byte < 0) { + msgpack_sbuffer_destroy(&tmp_sbuf); + return last_byte; + } + + /* Export results */ + *out_buf = tmp_sbuf.data; + *out_size = tmp_sbuf.size; + + t = out_time; + t->tm.tv_sec = time_lookup; + t->tm.tv_nsec = (tmfrac * 1000000000); + + /* Check if some decoder was specified */ + if (parser->decoders) { + ret = flb_parser_decoder_do(parser->decoders, + tmp_sbuf.data, tmp_sbuf.size, + &dec_out_buf, &dec_out_size); + if (ret == 0) { + *out_buf = dec_out_buf; + *out_size = dec_out_size; + msgpack_sbuffer_destroy(&tmp_sbuf); + } + } + + return last_byte; +}