diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index bedc336f72a..1d170ee6b24 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -135,6 +135,7 @@ static char *elasticsearch_format(void *data, size_t bytes, char *es_index; char logstash_index[256]; char time_formatted[256]; + char index_formatted[256]; char es_uuid[37]; msgpack_unpacked result; msgpack_object root; @@ -191,14 +192,6 @@ static char *elasticsearch_format(void *data, size_t bytes, logstash_index[ctx->logstash_prefix_len] = '\0'; } - /* If logstash format and id generation is disabled, pre-generate index line for all records. */ - if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) { - index_len = snprintf(j_index, - ES_BULK_HEADER, - ES_BULK_INDEX_FMT, - ctx->index, ctx->type); - } - while (msgpack_unpack_next(&result, data, bytes, &off)) { if (result.data.type != MSGPACK_OBJECT_ARRAY) { continue; @@ -210,12 +203,20 @@ static char *elasticsearch_format(void *data, size_t bytes, continue; } + /* some broken clients may have time drift up to year 1970 + * this will generate corresponding index in Elasticsearch + * in order to prevent generating millions of indexes + * we can set to always use current time for index generation */ + if (ctx->current_time_index == FLB_TRUE) { + flb_time_get(&tms); + } else { + flb_time_pop_from_msgpack(&tms, &result, &obj); + } /* * Timestamp: Elasticsearch only support fractional seconds in * milliseconds unit, not nanoseconds, so we take our nsec value and * change it representation. */ - flb_time_pop_from_msgpack(&tms, &result, &obj); tms.tm.tv_nsec = (tms.tm.tv_nsec / 1000000); map = root.via.array.ptr[1]; @@ -247,7 +248,6 @@ static char *elasticsearch_format(void *data, size_t bytes, msgpack_pack_str(&tmp_pck, s); msgpack_pack_str_body(&tmp_pck, time_formatted, s); - es_index = ctx->index; if (ctx->logstash_format == FLB_TRUE) { /* Compose Index header */ p = logstash_index + ctx->logstash_prefix_len; @@ -265,6 +265,15 @@ static char *elasticsearch_format(void *data, size_t bytes, ES_BULK_INDEX_FMT, es_index, ctx->type); } + } else { + // make sure we handle index time format for index + s = strftime(index_formatted, sizeof(index_formatted) - 1, + ctx->index, &tm); + es_index = index_formatted; + index_len = snprintf(j_index, + ES_BULK_HEADER, + ES_BULK_INDEX_FMT, + es_index, ctx->type); } /* Tag Key */ diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h index 06d6192cef1..1bd6357a2ac 100644 --- a/plugins/out_es/es.h +++ b/plugins/out_es/es.h @@ -58,6 +58,7 @@ struct flb_elasticsearch { /* enabled/disabled */ int logstash_format; int generate_id; + int current_time_index; /* prefix */ int logstash_prefix_len; diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c index 7fb52a96c0d..616eb10db93 100644 --- a/plugins/out_es/es_conf.c +++ b/plugins/out_es/es_conf.c @@ -253,6 +253,14 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, ctx->replace_dots = FLB_FALSE; } + /* Use current time for index generation instead of message record */ + tmp = flb_output_get_property("current_time_index", ins); + if (tmp) { + ctx->current_time_index = bool_value(tmp); + } else { + ctx->current_time_index = FLB_FALSE; + } + /* Trace output */ tmp = flb_output_get_property("Trace_Output", ins); if (tmp) {