Skip to content

Commit

Permalink
Merge pull request #310 from nokute78/parser_types
Browse files Browse the repository at this point in the history
parser: support 'types' feature
  • Loading branch information
edsiper authored Jul 4, 2017
2 parents d1cbc86 + 7dbf7ed commit b8d2f2d
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 8 deletions.
27 changes: 25 additions & 2 deletions include/fluent-bit/flb_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,17 @@
#include <fluent-bit/flb_regex.h>
#include <fluent-bit/flb_config.h>
#include <fluent-bit/flb_time.h>
#include <msgpack.h>

#define FLB_PARSER_REGEX 1
#define FLB_PARSER_JSON 2

struct flb_parser_types {
char *key;
int key_len;
int type;
};

struct flb_parser {
/* configuration */
int type; /* parser type */
Expand All @@ -38,6 +45,8 @@ struct flb_parser {
char *time_key; /* field name that contains the time */
int time_keep; /* keep time field */
char *time_frac_secs; /* time format have fractional seconds ? */
struct flb_parser_types *types; /* type casting */
int types_len;

/* internal */
int time_with_year; /* do time_fmt consider a year (%Y) ? */
Expand All @@ -47,6 +56,13 @@ struct flb_parser {
struct mk_list _head;
};

enum {
FLB_PARSER_TYPE_INT = 1,
FLB_PARSER_TYPE_FLOAT,
FLB_PARSER_TYPE_BOOL,
FLB_PARSER_TYPE_STRING,
};

static inline time_t flb_parser_tm2time(const struct tm *src)
{
struct tm tmp;
Expand All @@ -59,7 +75,10 @@ static inline time_t flb_parser_tm2time(const struct tm *src)
struct flb_parser *flb_parser_create(char *name, char *format,
char *p_regex,
char *time_fmt, char *time_key,
int time_keep, struct flb_config *config);
int time_keep,
struct flb_parser_types *types,
int types_len,
struct flb_config *config);
int flb_parser_conf_file(char *file, struct flb_config *config);
void flb_parser_destroy(struct flb_parser *parser);
struct flb_parser *flb_parser_get(char *name, struct flb_config *config);
Expand All @@ -68,5 +87,9 @@ int flb_parser_do(struct flb_parser *parser, char *buf, size_t length,

void flb_parser_exit(struct flb_config *config);
int flb_parser_frac_tzone(char *str, int len, double *frac, int *tmdiff);

int flb_parser_typecast(char *key, int key_len,
char *val, int val_len,
msgpack_packer *pck,
struct flb_parser_types *types,
int types_len);
#endif
167 changes: 165 additions & 2 deletions src/flb_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
#include <fluent-bit/flb_str.h>
#include <fluent-bit/flb_parser.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_utils.h>

#include <sys/types.h>
#include <sys/stat.h>
#include <limits.h>
#include <string.h>

int flb_parser_regex_do(struct flb_parser *parser,
char *buf, size_t length,
Expand All @@ -41,7 +43,10 @@ int flb_parser_json_do(struct flb_parser *parser,
struct flb_parser *flb_parser_create(char *name, char *format,
char *p_regex,
char *time_fmt, char *time_key,
int time_keep, struct flb_config *config)
int time_keep,
struct flb_parser_types *types,
int types_len,
struct flb_config *config)
{
int size;
char *tmp;
Expand Down Expand Up @@ -148,6 +153,8 @@ struct flb_parser *flb_parser_create(char *name, char *format,
p->time_key = flb_strdup(time_key);
}
p->time_keep = time_keep;
p->types = types;
p->types_len = types_len;

mk_list_add(&p->_head, &config->parsers);

Expand All @@ -156,6 +163,7 @@ struct flb_parser *flb_parser_create(char *name, char *format,

void flb_parser_destroy(struct flb_parser *parser)
{
int i = 0;
if (parser->type == FLB_PARSER_REGEX) {
flb_regex_destroy(parser->regex);
flb_free(parser->p_regex);
Expand All @@ -171,6 +179,12 @@ void flb_parser_destroy(struct flb_parser *parser)
if (parser->time_key) {
flb_free(parser->time_key);
}
if (parser->types_len != 0) {
for (i=0; i<parser->types_len; i++){
flb_free(parser->types[i].key);
}
flb_free(parser->types);
}

mk_list_del(&parser->_head);
flb_free(parser);
Expand All @@ -188,6 +202,57 @@ void flb_parser_exit(struct flb_config *config)
}
}

static int proc_types_str(char *types_str, struct flb_parser_types **types)
{
int i = 0;
int types_num = 0;
char *type_str = NULL;
struct mk_list *split;
struct mk_list *head;
struct flb_split_entry *sentry;

split = flb_utils_split(types_str, ' ', 256);
types_num = mk_list_size(split);
*types = flb_malloc(sizeof(struct flb_parser_types) * types_num);

for(i=0; i<types_num; i++){
(*types)[i].key = NULL;
(*types)[i].type = FLB_PARSER_TYPE_STRING;
}
i = 0;
mk_list_foreach(head ,split) {
sentry = mk_list_entry(head, struct flb_split_entry ,_head);
type_str = strchr(sentry->value ,':');

if (type_str == NULL) {
i++;
continue;
}
*type_str = '\0'; /* for strdup */
(*types)[i].key = flb_strdup(sentry->value);
(*types)[i].key_len = strlen(sentry->value);
*types_str = ':';

type_str++;
if (!strcasecmp(type_str, "integer")) {
(*types)[i].type = FLB_PARSER_TYPE_INT;
}
else if(!strcasecmp(type_str, "bool")) {
(*types)[i].type = FLB_PARSER_TYPE_BOOL;
}
else if(!strcasecmp(type_str, "float")){
(*types)[i].type = FLB_PARSER_TYPE_FLOAT;
}
else {
(*types)[i].type = FLB_PARSER_TYPE_STRING;
}
i++;
}
flb_utils_split_free(split);

return i;
}

/* Load parsers from a configuration file */
int flb_parser_conf_file(char *file, struct flb_config *config)
{
Expand All @@ -199,11 +264,14 @@ int flb_parser_conf_file(char *file, struct flb_config *config)
char *regex;
char *time_fmt;
char *time_key;
char *types_str;
int time_keep;
int types_len;
struct mk_rconf *fconf;
struct mk_rconf_section *section;
struct mk_list *head;
struct stat st;
struct flb_parser_types *types;

ret = stat(file, &st);
if (ret == -1 && errno == ENOENT) {
Expand Down Expand Up @@ -234,6 +302,7 @@ int flb_parser_conf_file(char *file, struct flb_config *config)
regex = NULL;
time_fmt = NULL;
time_key = NULL;
types_str = NULL;

section = mk_list_entry(head, struct mk_rconf_section, _head);
if (strcasecmp(section->name, "PARSER") != 0) {
Expand Down Expand Up @@ -277,9 +346,20 @@ int flb_parser_conf_file(char *file, struct flb_config *config)
goto fconf_error;
}

/* Types */
types_str = mk_rconf_section_get_key(section, "Types",
MK_RCONF_STR);
if (types_str != NULL) {
types_len = proc_types_str(types_str, &types);
}
else {
types_len = 0;
}

/* Create the parser context */
if (!flb_parser_create(name, format, regex,
time_fmt, time_key, time_keep, config)) {
time_fmt, time_key, time_keep,
types, types_len, config)) {
goto fconf_error;
}

Expand All @@ -297,6 +377,9 @@ int flb_parser_conf_file(char *file, struct flb_config *config)
if (time_key) {
flb_free(time_key);
}
if (types_str != NULL) {
flb_free(types_str);
}
}

mk_rconf_free(fconf);
Expand Down Expand Up @@ -398,3 +481,83 @@ int flb_parser_frac_tzone(char *str, int len, double *frac, int *tmdiff)

return 0;
}

int flb_parser_typecast(char *key, int key_len,
char *val, int val_len,
msgpack_packer *pck,
struct flb_parser_types *types,
int types_len)
{
int i;
int error = FLB_FALSE;
char tmp_char;
int casted = FLB_FALSE;

for(i=0; i<types_len; i++){
if (types[i].key != NULL
&& key_len == types[i].key_len &&
!strncmp(key, types[i].key, key_len)) {

casted = FLB_TRUE;

msgpack_pack_str(pck, key_len);
msgpack_pack_str_body(pck, key, key_len);

switch (types[i].type) {
case FLB_PARSER_TYPE_INT:
{
long long lval;

/* msgpack char is not null terminated.
So backup and fill null char,
convert int,
rewind char.
*/
tmp_char = val[val_len];
val[val_len] = '\0';
lval = atoll(val);
val[val_len] = tmp_char;
msgpack_pack_int64(pck, lval);
}
break;
case FLB_PARSER_TYPE_FLOAT:
{
double dval;
tmp_char = val[val_len];
val[val_len] = '\0';
dval = atof(val);
val[val_len] = tmp_char;
msgpack_pack_double(pck, dval);
}
break;
case FLB_PARSER_TYPE_BOOL:
if (!strncasecmp(val, "true", 4)) {
msgpack_pack_true(pck);
}
else if(!strncasecmp(val, "false", 5)){
msgpack_pack_false(pck);
}
else {
error = FLB_TRUE;
}
break;
default:
error = FLB_TRUE;
}
if (error == FLB_TRUE) {
flb_warn("[PARSER] key=%s cast error. save as string.", key);
msgpack_pack_str(pck, val_len);
msgpack_pack_str_body(pck, val, val_len);
}
break;
}
}

if (casted == FLB_FALSE) {
msgpack_pack_str(pck, key_len);
msgpack_pack_str_body(pck, key, key_len);
msgpack_pack_str(pck, val_len);
msgpack_pack_str_body(pck, val, val_len);
}
return 0;
}
17 changes: 13 additions & 4 deletions src/flb_parser_regex.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,19 @@ static void cb_results(unsigned char *name, unsigned char *value,
}
}

msgpack_pack_str(pcb->pck, len);
msgpack_pack_str_body(pcb->pck, (char *) name, len);
msgpack_pack_str(pcb->pck, vlen);
msgpack_pack_str_body(pcb->pck, (char *) value, vlen);
if (parser->types_len != 0) {
flb_parser_typecast((char*)name, len,
(char*)value, vlen,
pcb->pck,
parser->types,
parser->types_len);
}
else {
msgpack_pack_str(pcb->pck, len);
msgpack_pack_str_body(pcb->pck, (char *) name, len);
msgpack_pack_str(pcb->pck, vlen);
msgpack_pack_str_body(pcb->pck, (char *) value, vlen);
}
}

int flb_parser_regex_do(struct flb_parser *parser,
Expand Down

0 comments on commit b8d2f2d

Please sign in to comment.