Skip to content

Commit

Permalink
Index is strftime formatted, fixes fluent#454
Browse files Browse the repository at this point in the history
  • Loading branch information
onorua committed Aug 13, 2018
1 parent adec302 commit fba29d0
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 3 deletions.
23 changes: 20 additions & 3 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ static char *elasticsearch_format(void *data, size_t bytes,
char *es_index;
char logstash_index[256];
char time_formatted[256];
char index_formatted[256];
char es_uuid[37];
msgpack_unpacked result;
msgpack_object root;
Expand Down Expand Up @@ -189,10 +190,15 @@ static char *elasticsearch_format(void *data, size_t bytes,

/* If logstash format and id generation is disabled, pre-generate index line for all records. */
if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) {
flb_time_get(&tms);
gmtime_r(&tms.tm.tv_sec, &tm);
s = strftime(index_formatted, sizeof(index_formatted) - 1,
ctx->index, &tm);
es_index = index_formatted;
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT,
ctx->index, ctx->type);
es_index, ctx->type);
}

while (msgpack_unpack_next(&result, data, bytes, &off)) {
Expand All @@ -206,12 +212,20 @@ static char *elasticsearch_format(void *data, size_t bytes,
continue;
}

/* some broken clients may have time drift up to year 1970
* this will generate corresponding index in Elasticsearch
* in order to prevent generating millions of indexes
* we can set to always use current time for index generation */
if (ctx->current_time_index == FLB_TRUE) {
flb_time_get(&tms);
} else {
flb_time_pop_from_msgpack(&tms, &result, &obj);
}
/*
* Timestamp: Elasticsearch only support fractional seconds in
* milliseconds unit, not nanoseconds, so we take our nsec value and
* change it representation.
*/
flb_time_pop_from_msgpack(&tms, &result, &obj);
tms.tm.tv_nsec = (tms.tm.tv_nsec / 1000000);

map = root.via.array.ptr[1];
Expand Down Expand Up @@ -243,7 +257,10 @@ static char *elasticsearch_format(void *data, size_t bytes,
msgpack_pack_str(&tmp_pck, s);
msgpack_pack_str_body(&tmp_pck, time_formatted, s);

es_index = ctx->index;
// make sure we handle index time format for index
s = strftime(index_formatted, sizeof(index_formatted) - 1,
ctx->index, &tm);
es_index = index_formatted;
if (ctx->logstash_format == FLB_TRUE) {
/* Compose Index header */
p = logstash_index + ctx->logstash_prefix_len;
Expand Down
1 change: 1 addition & 0 deletions plugins/out_es/es.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ struct flb_elasticsearch {
/* enabled/disabled */
int logstash_format;
int generate_id;
int current_time_index;

/* prefix */
int logstash_prefix_len;
Expand Down
8 changes: 8 additions & 0 deletions plugins/out_es/es_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,14 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins,
ctx->generate_id = FLB_FALSE;
}

/* Use current time for index generation instead of message record */
tmp = flb_output_get_property("current_time_index", ins);
if (tmp) {
ctx->current_time_index = bool_value(tmp);
} else {
ctx->current_time_index = FLB_FALSE;
}

return ctx;
}

Expand Down

0 comments on commit fba29d0

Please sign in to comment.