From 34e9f8294d20fe2f3d402cb42ea9a2216a2b7e4c Mon Sep 17 00:00:00 2001 From: Takahiro Yamashita Date: Tue, 23 Nov 2021 21:11:37 +0900 Subject: [PATCH] out_es: use snprintf wrapper function for bulk header. We used fixed char array to construct bulk header. If long index comes, it needs larger buffer. out_es uses snprintf wrapper function. If the buffer is small, the api increases the buffer. Signed-off-by: Takahiro Yamashita --- plugins/out_es/es.c | 83 ++++++++++++++++++++++++++------------------- 1 file changed, 49 insertions(+), 34 deletions(-) diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index 26606b94e37..5019086dbac 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -265,7 +265,7 @@ static int elasticsearch_format(struct flb_config *config, msgpack_object root; msgpack_object map; msgpack_object *obj; - char j_index[ES_BULK_HEADER]; + flb_sds_t j_index; struct es_bulk *bulk; struct tm tm; struct flb_time tms; @@ -275,6 +275,12 @@ static int elasticsearch_format(struct flb_config *config, int es_index_custom_len; struct flb_elasticsearch *ctx = plugin_context; + j_index = flb_sds_create_size(ES_BULK_HEADER); + if (j_index == NULL) { + flb_errno(); + return -1; + } + /* Iterate the original buffer and perform adjustments */ msgpack_unpacked_init(&result); @@ -282,6 +288,7 @@ static int elasticsearch_format(struct flb_config *config, ret = msgpack_unpack_next(&result, data, bytes, &off); if (ret != MSGPACK_UNPACK_SUCCESS) { msgpack_unpacked_destroy(&result); + flb_sds_destroy(j_index); return -1; } @@ -292,17 +299,22 @@ static int elasticsearch_format(struct flb_config *config, * doing, we just duplicate the content in a new buffer and cleanup. */ msgpack_unpacked_destroy(&result); + flb_sds_destroy(j_index); return -1; } root = result.data; if (root.via.array.size == 0) { + msgpack_unpacked_destroy(&result); + flb_sds_destroy(j_index); return -1; } /* Create the bulk composer */ bulk = es_bulk_create(bytes); if (!bulk) { + msgpack_unpacked_destroy(&result); + flb_sds_destroy(j_index); return -1; } @@ -331,16 +343,16 @@ static int elasticsearch_format(struct flb_config *config, ctx->index, &tm); es_index = index_formatted; if (ctx->suppress_type_name) { - index_len = snprintf(j_index, - ES_BULK_HEADER, - ES_BULK_INDEX_FMT_WITHOUT_TYPE, - es_index); + index_len = flb_sds_snprintf_realloc(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT_WITHOUT_TYPE, + es_index); } else { - index_len = snprintf(j_index, - ES_BULK_HEADER, - ES_BULK_INDEX_FMT, - es_index, ctx->type); + index_len = flb_sds_snprintf_realloc(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT, + es_index, ctx->type); } } @@ -443,16 +455,16 @@ static int elasticsearch_format(struct flb_config *config, es_index = logstash_index; if (ctx->generate_id == FLB_FALSE) { if (ctx->suppress_type_name) { - index_len = snprintf(j_index, - ES_BULK_HEADER, - ES_BULK_INDEX_FMT_WITHOUT_TYPE, - es_index); + index_len = flb_sds_snprintf_realloc(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT_WITHOUT_TYPE, + es_index); } else { - index_len = snprintf(j_index, - ES_BULK_HEADER, - ES_BULK_INDEX_FMT, - es_index, ctx->type); + index_len = flb_sds_snprintf_realloc(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT, + es_index, ctx->type); } } } @@ -483,6 +495,7 @@ static int elasticsearch_format(struct flb_config *config, msgpack_unpacked_destroy(&result); msgpack_sbuffer_destroy(&tmp_sbuf); es_bulk_destroy(bulk); + flb_sds_destroy(j_index); return -1; } @@ -493,32 +506,32 @@ static int elasticsearch_format(struct flb_config *config, hash[0], hash[1], hash[2], hash[3], hash[4], hash[5], hash[6], hash[7]); if (ctx->suppress_type_name) { - index_len = snprintf(j_index, - ES_BULK_HEADER, - ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE, - es_index, es_uuid); + index_len = flb_sds_snprintf_realloc(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE, + es_index, es_uuid); } else { - index_len = snprintf(j_index, - ES_BULK_HEADER, - ES_BULK_INDEX_FMT_ID, - es_index, ctx->type, es_uuid); + index_len = flb_sds_snprintf_realloc(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT_ID, + es_index, ctx->type, es_uuid); } } if (ctx->ra_id_key) { id_key_str = es_get_id_value(ctx ,&map); if (id_key_str) { if (ctx->suppress_type_name) { - index_len = snprintf(j_index, - ES_BULK_HEADER, - ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE, - es_index, id_key_str); + index_len = flb_sds_snprintf_realloc(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE, + es_index, id_key_str); } else { - index_len = snprintf(j_index, - ES_BULK_HEADER, - ES_BULK_INDEX_FMT_ID, - es_index, ctx->type, id_key_str); + index_len = flb_sds_snprintf_realloc(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT_ID, + es_index, ctx->type, id_key_str); } flb_sds_destroy(id_key_str); id_key_str = NULL; @@ -531,6 +544,7 @@ static int elasticsearch_format(struct flb_config *config, if (!out_buf) { msgpack_unpacked_destroy(&result); es_bulk_destroy(bulk); + flb_sds_destroy(j_index); return -1; } @@ -544,6 +558,7 @@ static int elasticsearch_format(struct flb_config *config, msgpack_unpacked_destroy(&result); *out_size = 0; es_bulk_destroy(bulk); + flb_sds_destroy(j_index); return -1; } } @@ -563,7 +578,7 @@ static int elasticsearch_format(struct flb_config *config, fwrite(*out_data, 1, *out_size, stdout); fflush(stdout); } - + flb_sds_destroy(j_index); return 0; }