Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Index is strftime formatted, fixes #454 #512

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ static char *elasticsearch_format(void *data, size_t bytes,
char *es_index;
char logstash_index[256];
char time_formatted[256];
char index_formatted[256];
char es_uuid[37];
msgpack_unpacked result;
msgpack_object root;
Expand Down Expand Up @@ -191,14 +192,6 @@ static char *elasticsearch_format(void *data, size_t bytes,
logstash_index[ctx->logstash_prefix_len] = '\0';
}

/* If logstash format and id generation is disabled, pre-generate index line for all records. */
if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) {
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT,
ctx->index, ctx->type);
}

while (msgpack_unpack_next(&result, data, bytes, &off)) {
if (result.data.type != MSGPACK_OBJECT_ARRAY) {
continue;
Expand All @@ -210,12 +203,20 @@ static char *elasticsearch_format(void *data, size_t bytes,
continue;
}

/* some broken clients may have time drift up to year 1970
* this will generate corresponding index in Elasticsearch
* in order to prevent generating millions of indexes
* we can set to always use current time for index generation */
if (ctx->current_time_index == FLB_TRUE) {
flb_time_get(&tms);
} else {
flb_time_pop_from_msgpack(&tms, &result, &obj);
}
/*
* Timestamp: Elasticsearch only support fractional seconds in
* milliseconds unit, not nanoseconds, so we take our nsec value and
* change it representation.
*/
flb_time_pop_from_msgpack(&tms, &result, &obj);
tms.tm.tv_nsec = (tms.tm.tv_nsec / 1000000);

map = root.via.array.ptr[1];
Expand Down Expand Up @@ -247,7 +248,6 @@ static char *elasticsearch_format(void *data, size_t bytes,
msgpack_pack_str(&tmp_pck, s);
msgpack_pack_str_body(&tmp_pck, time_formatted, s);

es_index = ctx->index;
if (ctx->logstash_format == FLB_TRUE) {
/* Compose Index header */
p = logstash_index + ctx->logstash_prefix_len;
Expand All @@ -265,6 +265,15 @@ static char *elasticsearch_format(void *data, size_t bytes,
ES_BULK_INDEX_FMT,
es_index, ctx->type);
}
} else {
// make sure we handle index time format for index
s = strftime(index_formatted, sizeof(index_formatted) - 1,
ctx->index, &tm);
es_index = index_formatted;
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT,
es_index, ctx->type);
}

/* Tag Key */
Expand Down
1 change: 1 addition & 0 deletions plugins/out_es/es.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ struct flb_elasticsearch {
/* enabled/disabled */
int logstash_format;
int generate_id;
int current_time_index;

/* prefix */
int logstash_prefix_len;
Expand Down
8 changes: 8 additions & 0 deletions plugins/out_es/es_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,14 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins,
ctx->replace_dots = FLB_FALSE;
}

/* Use current time for index generation instead of message record */
tmp = flb_output_get_property("current_time_index", ins);
if (tmp) {
ctx->current_time_index = bool_value(tmp);
} else {
ctx->current_time_index = FLB_FALSE;
}

/* Trace output */
tmp = flb_output_get_property("Trace_Output", ins);
if (tmp) {
Expand Down