Skip to content

Commit

Permalink
out_es: add current_time_index option (PR #512, #454)
Browse files Browse the repository at this point in the history
The following patch introduce a new boolean configuration option
called 'current_time_index'. When enabled all records and index
will take the current time instead of record time.

Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
onorua authored and edsiper committed Mar 20, 2019
1 parent d89dc36 commit 29b331b
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 7 deletions.
48 changes: 42 additions & 6 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ static char *elasticsearch_format(void *data, size_t bytes,
char *es_index;
char logstash_index[256];
char time_formatted[256];
char index_formatted[256];
char es_uuid[37];
msgpack_unpacked result;
msgpack_object root;
Expand Down Expand Up @@ -186,19 +187,43 @@ static char *elasticsearch_format(void *data, size_t bytes,
msgpack_unpacked_destroy(&result);
msgpack_unpacked_init(&result);

/* Copy logstash prefix if logstash format is enabled */
if (ctx->logstash_format == FLB_TRUE) {
memcpy(logstash_index, ctx->logstash_prefix, ctx->logstash_prefix_len);
logstash_index[ctx->logstash_prefix_len] = '\0';
}

/* If logstash format and id generation is disabled, pre-generate index line for all records. */
/*
* If logstash format and id generation are disabled, pre-generate
* the index line for all records.
*
* The header stored in 'j_index' will be used for the all records on
* this payload.
*/
if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) {
flb_time_get(&tms);
gmtime_r(&tms.tm.tv_sec, &tm);
s = strftime(index_formatted, sizeof(index_formatted) - 1,
ctx->index, &tm);
es_index = index_formatted;

index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT,
ctx->index, ctx->type);
es_index, ctx->type);
}

/*
* Some broken clients may have time drift up to year 1970
* this will generate corresponding index in Elasticsearch
* in order to prevent generating millions of indexes
* we can set to always use current time for index generation
*/
if (ctx->current_time_index == FLB_TRUE) {
flb_time_get(&tms);
}

/* Iterate each record and do further formatting */
while (msgpack_unpack_next(&result, data, bytes, &off)) {
if (result.data.type != MSGPACK_OBJECT_ARRAY) {
continue;
Expand All @@ -210,12 +235,16 @@ static char *elasticsearch_format(void *data, size_t bytes,
continue;
}

/* Only pop time from record if current_time_index is disabled */
if (ctx->current_time_index == FLB_FALSE) {
flb_time_pop_from_msgpack(&tms, &result, &obj);
}

/*
* Timestamp: Elasticsearch only support fractional seconds in
* milliseconds unit, not nanoseconds, so we take our nsec value and
* change it representation.
*/
flb_time_pop_from_msgpack(&tms, &result, &obj);
tms.tm.tv_nsec = (tms.tm.tv_nsec / 1000000);

map = root.via.array.ptr[1];
Expand Down Expand Up @@ -266,6 +295,12 @@ static char *elasticsearch_format(void *data, size_t bytes,
es_index, ctx->type);
}
}
else if (ctx->current_time_index == FLB_TRUE) {
/* Make sure we handle index time format for index */
s = strftime(index_formatted, sizeof(index_formatted) - 1,
ctx->index, &tm);
es_index = index_formatted;
}

/* Tag Key */
if (ctx->include_tag_key == FLB_TRUE) {
Expand All @@ -286,8 +321,10 @@ static char *elasticsearch_format(void *data, size_t bytes,

if (ctx->generate_id == FLB_TRUE) {
MurmurHash3_x64_128(tmp_sbuf.data, tmp_sbuf.size, 42, hash);
snprintf(es_uuid, sizeof(es_uuid), "%04x%04x-%04x-%04x-%04x-%04x%04x%04x",
hash[0], hash[1], hash[2], hash[3], hash[4], hash[5], hash[6], hash[7]);
snprintf(es_uuid, sizeof(es_uuid),
"%04x%04x-%04x-%04x-%04x-%04x%04x%04x",
hash[0], hash[1], hash[2], hash[3],
hash[4], hash[5], hash[6], hash[7]);
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT_ID,
Expand All @@ -304,7 +341,6 @@ static char *elasticsearch_format(void *data, size_t bytes,
return NULL;
}

/* Append JSON on Index buf */
ret = es_bulk_append(bulk, j_index, index_len, json_buf, json_size);
flb_free(json_buf);
if (ret == -1) {
Expand Down
1 change: 1 addition & 0 deletions plugins/out_es/es.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ struct flb_elasticsearch {
/* enabled/disabled */
int logstash_format;
int generate_id;
int current_time_index;

/* prefix */
int logstash_prefix_len;
Expand Down
2 changes: 1 addition & 1 deletion plugins/out_es/es_bulk.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ int es_bulk_append(struct es_bulk *bulk, char *index, int i_len,
new_size = bulk->size + available + required + ES_BULK_CHUNK;
ptr = flb_realloc(bulk->ptr, new_size);
if (!ptr) {
perror("realloc");
flb_errno();
return -1;
}
bulk->ptr = ptr;
Expand Down
10 changes: 10 additions & 0 deletions plugins/out_es/es_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,16 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins,
ctx->replace_dots = FLB_FALSE;
}

/* Use current time for index generation instead of message record */
tmp = flb_output_get_property("current_time_index", ins);
if (tmp) {
ctx->current_time_index = flb_utils_bool(tmp);
}
else {
ctx->current_time_index = FLB_FALSE;
}


/* Trace output */
tmp = flb_output_get_property("Trace_Output", ins);
if (tmp) {
Expand Down

0 comments on commit 29b331b

Please sign in to comment.