Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parser: support 'types' feature #310

Merged
merged 1 commit into from
Jul 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions include/fluent-bit/flb_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,17 @@
#include <fluent-bit/flb_regex.h>
#include <fluent-bit/flb_config.h>
#include <fluent-bit/flb_time.h>
#include <msgpack.h>

#define FLB_PARSER_REGEX 1
#define FLB_PARSER_JSON 2

struct flb_parser_types {
char *key;
int key_len;
int type;
};

struct flb_parser {
/* configuration */
int type; /* parser type */
Expand All @@ -38,6 +45,8 @@ struct flb_parser {
char *time_key; /* field name that contains the time */
int time_keep; /* keep time field */
char *time_frac_secs; /* time format have fractional seconds ? */
struct flb_parser_types *types; /* type casting */
int types_len;

/* internal */
int time_with_year; /* do time_fmt consider a year (%Y) ? */
Expand All @@ -47,6 +56,13 @@ struct flb_parser {
struct mk_list _head;
};

enum {
FLB_PARSER_TYPE_INT = 1,
FLB_PARSER_TYPE_FLOAT,
FLB_PARSER_TYPE_BOOL,
FLB_PARSER_TYPE_STRING,
};

static inline time_t flb_parser_tm2time(const struct tm *src)
{
struct tm tmp;
Expand All @@ -59,7 +75,10 @@ static inline time_t flb_parser_tm2time(const struct tm *src)
struct flb_parser *flb_parser_create(char *name, char *format,
char *p_regex,
char *time_fmt, char *time_key,
int time_keep, struct flb_config *config);
int time_keep,
struct flb_parser_types *types,
int types_len,
struct flb_config *config);
int flb_parser_conf_file(char *file, struct flb_config *config);
void flb_parser_destroy(struct flb_parser *parser);
struct flb_parser *flb_parser_get(char *name, struct flb_config *config);
Expand All @@ -68,5 +87,9 @@ int flb_parser_do(struct flb_parser *parser, char *buf, size_t length,

void flb_parser_exit(struct flb_config *config);
int flb_parser_frac_tzone(char *str, int len, double *frac, int *tmdiff);

int flb_parser_typecast(char *key, int key_len,
char *val, int val_len,
msgpack_packer *pck,
struct flb_parser_types *types,
int types_len);
#endif
167 changes: 165 additions & 2 deletions src/flb_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
#include <fluent-bit/flb_str.h>
#include <fluent-bit/flb_parser.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_utils.h>

#include <sys/types.h>
#include <sys/stat.h>
#include <limits.h>
#include <string.h>

int flb_parser_regex_do(struct flb_parser *parser,
char *buf, size_t length,
Expand All @@ -41,7 +43,10 @@ int flb_parser_json_do(struct flb_parser *parser,
struct flb_parser *flb_parser_create(char *name, char *format,
char *p_regex,
char *time_fmt, char *time_key,
int time_keep, struct flb_config *config)
int time_keep,
struct flb_parser_types *types,
int types_len,
struct flb_config *config)
{
int size;
char *tmp;
Expand Down Expand Up @@ -148,6 +153,8 @@ struct flb_parser *flb_parser_create(char *name, char *format,
p->time_key = flb_strdup(time_key);
}
p->time_keep = time_keep;
p->types = types;
p->types_len = types_len;

mk_list_add(&p->_head, &config->parsers);

Expand All @@ -156,6 +163,7 @@ struct flb_parser *flb_parser_create(char *name, char *format,

void flb_parser_destroy(struct flb_parser *parser)
{
int i = 0;
if (parser->type == FLB_PARSER_REGEX) {
flb_regex_destroy(parser->regex);
flb_free(parser->p_regex);
Expand All @@ -171,6 +179,12 @@ void flb_parser_destroy(struct flb_parser *parser)
if (parser->time_key) {
flb_free(parser->time_key);
}
if (parser->types_len != 0) {
for (i=0; i<parser->types_len; i++){
flb_free(parser->types[i].key);
}
flb_free(parser->types);
}

mk_list_del(&parser->_head);
flb_free(parser);
Expand All @@ -188,6 +202,57 @@ void flb_parser_exit(struct flb_config *config)
}
}

static int proc_types_str(char *types_str, struct flb_parser_types **types)
{
int i = 0;
int types_num = 0;
char *type_str = NULL;
struct mk_list *split;
struct mk_list *head;
struct flb_split_entry *sentry;

split = flb_utils_split(types_str, ' ', 256);
types_num = mk_list_size(split);
*types = flb_malloc(sizeof(struct flb_parser_types) * types_num);

for(i=0; i<types_num; i++){
(*types)[i].key = NULL;
(*types)[i].type = FLB_PARSER_TYPE_STRING;
}
i = 0;
mk_list_foreach(head ,split) {
sentry = mk_list_entry(head, struct flb_split_entry ,_head);
type_str = strchr(sentry->value ,':');

if (type_str == NULL) {
i++;
continue;
}
*type_str = '\0'; /* for strdup */
(*types)[i].key = flb_strdup(sentry->value);
(*types)[i].key_len = strlen(sentry->value);
*types_str = ':';

type_str++;
if (!strcasecmp(type_str, "integer")) {
(*types)[i].type = FLB_PARSER_TYPE_INT;
}
else if(!strcasecmp(type_str, "bool")) {
(*types)[i].type = FLB_PARSER_TYPE_BOOL;
}
else if(!strcasecmp(type_str, "float")){
(*types)[i].type = FLB_PARSER_TYPE_FLOAT;
}
else {
(*types)[i].type = FLB_PARSER_TYPE_STRING;
}
i++;
}
flb_utils_split_free(split);

return i;
}

/* Load parsers from a configuration file */
int flb_parser_conf_file(char *file, struct flb_config *config)
{
Expand All @@ -199,11 +264,14 @@ int flb_parser_conf_file(char *file, struct flb_config *config)
char *regex;
char *time_fmt;
char *time_key;
char *types_str;
int time_keep;
int types_len;
struct mk_rconf *fconf;
struct mk_rconf_section *section;
struct mk_list *head;
struct stat st;
struct flb_parser_types *types;

ret = stat(file, &st);
if (ret == -1 && errno == ENOENT) {
Expand Down Expand Up @@ -234,6 +302,7 @@ int flb_parser_conf_file(char *file, struct flb_config *config)
regex = NULL;
time_fmt = NULL;
time_key = NULL;
types_str = NULL;

section = mk_list_entry(head, struct mk_rconf_section, _head);
if (strcasecmp(section->name, "PARSER") != 0) {
Expand Down Expand Up @@ -277,9 +346,20 @@ int flb_parser_conf_file(char *file, struct flb_config *config)
goto fconf_error;
}

/* Types */
types_str = mk_rconf_section_get_key(section, "Types",
MK_RCONF_STR);
if (types_str != NULL) {
types_len = proc_types_str(types_str, &types);
}
else {
types_len = 0;
}

/* Create the parser context */
if (!flb_parser_create(name, format, regex,
time_fmt, time_key, time_keep, config)) {
time_fmt, time_key, time_keep,
types, types_len, config)) {
goto fconf_error;
}

Expand All @@ -297,6 +377,9 @@ int flb_parser_conf_file(char *file, struct flb_config *config)
if (time_key) {
flb_free(time_key);
}
if (types_str != NULL) {
flb_free(types_str);
}
}

mk_rconf_free(fconf);
Expand Down Expand Up @@ -398,3 +481,83 @@ int flb_parser_frac_tzone(char *str, int len, double *frac, int *tmdiff)

return 0;
}

int flb_parser_typecast(char *key, int key_len,
char *val, int val_len,
msgpack_packer *pck,
struct flb_parser_types *types,
int types_len)
{
int i;
int error = FLB_FALSE;
char tmp_char;
int casted = FLB_FALSE;

for(i=0; i<types_len; i++){
if (types[i].key != NULL
&& key_len == types[i].key_len &&
!strncmp(key, types[i].key, key_len)) {

casted = FLB_TRUE;

msgpack_pack_str(pck, key_len);
msgpack_pack_str_body(pck, key, key_len);

switch (types[i].type) {
case FLB_PARSER_TYPE_INT:
{
long long lval;

/* msgpack char is not null terminated.
So backup and fill null char,
convert int,
rewind char.
*/
tmp_char = val[val_len];
val[val_len] = '\0';
lval = atoll(val);
val[val_len] = tmp_char;
msgpack_pack_int64(pck, lval);
}
break;
case FLB_PARSER_TYPE_FLOAT:
{
double dval;
tmp_char = val[val_len];
val[val_len] = '\0';
dval = atof(val);
val[val_len] = tmp_char;
msgpack_pack_double(pck, dval);
}
break;
case FLB_PARSER_TYPE_BOOL:
if (!strncasecmp(val, "true", 4)) {
msgpack_pack_true(pck);
}
else if(!strncasecmp(val, "false", 5)){
msgpack_pack_false(pck);
}
else {
error = FLB_TRUE;
}
break;
default:
error = FLB_TRUE;
}
if (error == FLB_TRUE) {
flb_warn("[PARSER] key=%s cast error. save as string.", key);
msgpack_pack_str(pck, val_len);
msgpack_pack_str_body(pck, val, val_len);
}
break;
}
}

if (casted == FLB_FALSE) {
msgpack_pack_str(pck, key_len);
msgpack_pack_str_body(pck, key, key_len);
msgpack_pack_str(pck, val_len);
msgpack_pack_str_body(pck, val, val_len);
}
return 0;
}
17 changes: 13 additions & 4 deletions src/flb_parser_regex.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,19 @@ static void cb_results(unsigned char *name, unsigned char *value,
}
}

msgpack_pack_str(pcb->pck, len);
msgpack_pack_str_body(pcb->pck, (char *) name, len);
msgpack_pack_str(pcb->pck, vlen);
msgpack_pack_str_body(pcb->pck, (char *) value, vlen);
if (parser->types_len != 0) {
flb_parser_typecast((char*)name, len,
(char*)value, vlen,
pcb->pck,
parser->types,
parser->types_len);
}
else {
msgpack_pack_str(pcb->pck, len);
msgpack_pack_str_body(pcb->pck, (char *) name, len);
msgpack_pack_str(pcb->pck, vlen);
msgpack_pack_str_body(pcb->pck, (char *) value, vlen);
}
}

int flb_parser_regex_do(struct flb_parser *parser,
Expand Down