Skip to content

Commit

Permalink
out_es: use snprintf wrapper function for bulk header.
Browse files Browse the repository at this point in the history
We used fixed char array to construct bulk header.
If long index comes, it needs larger buffer.

out_es uses snprintf wrapper function.
If the buffer is small, the api increases the buffer.

Signed-off-by: Takahiro Yamashita <[email protected]>
  • Loading branch information
nokute78 authored and edsiper committed Dec 18, 2021
1 parent a2ecae9 commit 34e9f82
Showing 1 changed file with 49 additions and 34 deletions.
83 changes: 49 additions & 34 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ static int elasticsearch_format(struct flb_config *config,
msgpack_object root;
msgpack_object map;
msgpack_object *obj;
char j_index[ES_BULK_HEADER];
flb_sds_t j_index;
struct es_bulk *bulk;
struct tm tm;
struct flb_time tms;
Expand All @@ -275,13 +275,20 @@ static int elasticsearch_format(struct flb_config *config,
int es_index_custom_len;
struct flb_elasticsearch *ctx = plugin_context;

j_index = flb_sds_create_size(ES_BULK_HEADER);
if (j_index == NULL) {
flb_errno();
return -1;
}

/* Iterate the original buffer and perform adjustments */
msgpack_unpacked_init(&result);

/* Perform some format validation */
ret = msgpack_unpack_next(&result, data, bytes, &off);
if (ret != MSGPACK_UNPACK_SUCCESS) {
msgpack_unpacked_destroy(&result);
flb_sds_destroy(j_index);
return -1;
}

Expand All @@ -292,17 +299,22 @@ static int elasticsearch_format(struct flb_config *config,
* doing, we just duplicate the content in a new buffer and cleanup.
*/
msgpack_unpacked_destroy(&result);
flb_sds_destroy(j_index);
return -1;
}

root = result.data;
if (root.via.array.size == 0) {
msgpack_unpacked_destroy(&result);
flb_sds_destroy(j_index);
return -1;
}

/* Create the bulk composer */
bulk = es_bulk_create(bytes);
if (!bulk) {
msgpack_unpacked_destroy(&result);
flb_sds_destroy(j_index);
return -1;
}

Expand Down Expand Up @@ -331,16 +343,16 @@ static int elasticsearch_format(struct flb_config *config,
ctx->index, &tm);
es_index = index_formatted;
if (ctx->suppress_type_name) {
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT_WITHOUT_TYPE,
es_index);
index_len = flb_sds_snprintf_realloc(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT_WITHOUT_TYPE,
es_index);
}
else {
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT,
es_index, ctx->type);
index_len = flb_sds_snprintf_realloc(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT,
es_index, ctx->type);
}
}

Expand Down Expand Up @@ -443,16 +455,16 @@ static int elasticsearch_format(struct flb_config *config,
es_index = logstash_index;
if (ctx->generate_id == FLB_FALSE) {
if (ctx->suppress_type_name) {
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT_WITHOUT_TYPE,
es_index);
index_len = flb_sds_snprintf_realloc(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT_WITHOUT_TYPE,
es_index);
}
else {
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT,
es_index, ctx->type);
index_len = flb_sds_snprintf_realloc(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT,
es_index, ctx->type);
}
}
}
Expand Down Expand Up @@ -483,6 +495,7 @@ static int elasticsearch_format(struct flb_config *config,
msgpack_unpacked_destroy(&result);
msgpack_sbuffer_destroy(&tmp_sbuf);
es_bulk_destroy(bulk);
flb_sds_destroy(j_index);
return -1;
}

Expand All @@ -493,32 +506,32 @@ static int elasticsearch_format(struct flb_config *config,
hash[0], hash[1], hash[2], hash[3],
hash[4], hash[5], hash[6], hash[7]);
if (ctx->suppress_type_name) {
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE,
es_index, es_uuid);
index_len = flb_sds_snprintf_realloc(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE,
es_index, es_uuid);
}
else {
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT_ID,
es_index, ctx->type, es_uuid);
index_len = flb_sds_snprintf_realloc(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT_ID,
es_index, ctx->type, es_uuid);
}
}
if (ctx->ra_id_key) {
id_key_str = es_get_id_value(ctx ,&map);
if (id_key_str) {
if (ctx->suppress_type_name) {
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE,
es_index, id_key_str);
index_len = flb_sds_snprintf_realloc(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE,
es_index, id_key_str);
}
else {
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT_ID,
es_index, ctx->type, id_key_str);
index_len = flb_sds_snprintf_realloc(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT_ID,
es_index, ctx->type, id_key_str);
}
flb_sds_destroy(id_key_str);
id_key_str = NULL;
Expand All @@ -531,6 +544,7 @@ static int elasticsearch_format(struct flb_config *config,
if (!out_buf) {
msgpack_unpacked_destroy(&result);
es_bulk_destroy(bulk);
flb_sds_destroy(j_index);
return -1;
}

Expand All @@ -544,6 +558,7 @@ static int elasticsearch_format(struct flb_config *config,
msgpack_unpacked_destroy(&result);
*out_size = 0;
es_bulk_destroy(bulk);
flb_sds_destroy(j_index);
return -1;
}
}
Expand All @@ -563,7 +578,7 @@ static int elasticsearch_format(struct flb_config *config,
fwrite(*out_data, 1, *out_size, stdout);
fflush(stdout);
}

flb_sds_destroy(j_index);
return 0;
}

Expand Down

0 comments on commit 34e9f82

Please sign in to comment.