From f587cd3affb2796c01c072d05f7301ac5c91f30d Mon Sep 17 00:00:00 2001 From: Takahiro Yamashita Date: Sun, 18 Jul 2021 21:52:49 +0900 Subject: [PATCH] out_es: estimate bulk size to less reallocation(#3775) Signed-off-by: Takahiro Yamashita --- plugins/out_es/es.c | 7 +++++-- plugins/out_es/es_bulk.c | 35 +++++++++++++++++++++++++---------- plugins/out_es/es_bulk.h | 5 +++-- 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index 6a9be715c92..5aca4dc5a67 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -252,6 +252,7 @@ static int elasticsearch_format(struct flb_config *config, int index_len = 0; size_t s = 0; size_t off = 0; + size_t off_prev = 0; char *p; char *es_index; char logstash_index[256]; @@ -300,7 +301,7 @@ static int elasticsearch_format(struct flb_config *config, } /* Create the bulk composer */ - bulk = es_bulk_create(); + bulk = es_bulk_create(bytes); if (!bulk) { return -1; } @@ -534,8 +535,10 @@ static int elasticsearch_format(struct flb_config *config, } ret = es_bulk_append(bulk, j_index, index_len, - out_buf, flb_sds_len(out_buf)); + out_buf, flb_sds_len(out_buf), + bytes, off_prev); flb_sds_destroy(out_buf); + off_prev = off; if (ret == -1) { /* We likely ran out of memory, abort here */ msgpack_unpacked_destroy(&result); diff --git a/plugins/out_es/es_bulk.c b/plugins/out_es/es_bulk.c index ff7e09d2abf..6d6f383c2bf 100644 --- a/plugins/out_es/es_bulk.c +++ b/plugins/out_es/es_bulk.c @@ -25,24 +25,27 @@ #include #include "es_bulk.h" -struct es_bulk *es_bulk_create() +struct es_bulk *es_bulk_create(size_t estimated_size) { struct es_bulk *b; + if (estimated_size < ES_BULK_CHUNK) { + estimated_size = ES_BULK_CHUNK; + } + b = flb_malloc(sizeof(struct es_bulk)); if (!b) { perror("calloc"); return NULL; } - - b->ptr = flb_malloc(ES_BULK_CHUNK); - if (!b->ptr) { + b->ptr = flb_malloc(estimated_size); + if (b->ptr == NULL) { perror("malloc"); flb_free(b); return NULL; } - b->size = ES_BULK_CHUNK; + b->size = estimated_size; b->len = 0; return b; @@ -57,10 +60,11 @@ void es_bulk_destroy(struct es_bulk *bulk) } int es_bulk_append(struct es_bulk *bulk, char *index, int i_len, - char *json, size_t j_len) + char *json, size_t j_len, + size_t whole_size, size_t converted_size) { int available; - int new_size; + int append_size; int required; char *ptr; @@ -68,14 +72,25 @@ int es_bulk_append(struct es_bulk *bulk, char *index, int i_len, available = (bulk->size - bulk->len); if (available < required) { - new_size = bulk->size + available + required + ES_BULK_CHUNK; - ptr = flb_realloc(bulk->ptr, new_size); + /* + * estimate a converted size of json + * calculate + * 1. rest of msgpack data size + * 2. ratio from bulk json size and processed msgpack size. + */ + append_size = (whole_size - converted_size) /* rest of size to convert */ + * (bulk->size / converted_size); /* = json size / msgpack size */ + if (append_size < ES_BULK_CHUNK) { + /* append at least ES_BULK_CHUNK size */ + append_size = ES_BULK_CHUNK; + } + ptr = flb_realloc(bulk->ptr, bulk->size + append_size); if (!ptr) { flb_errno(); return -1; } bulk->ptr = ptr; - bulk->size = new_size; + bulk->size += append_size; } memcpy(bulk->ptr + bulk->len, index, i_len); diff --git a/plugins/out_es/es_bulk.h b/plugins/out_es/es_bulk.h index 4a4f6f4968b..666bd28faff 100644 --- a/plugins/out_es/es_bulk.h +++ b/plugins/out_es/es_bulk.h @@ -36,9 +36,10 @@ struct es_bulk { uint32_t size; }; -struct es_bulk *es_bulk_create(); +struct es_bulk *es_bulk_create(size_t estimated_size); int es_bulk_append(struct es_bulk *bulk, char *index, int i_len, - char *json, size_t j_len); + char *json, size_t j_len, + size_t whole_size, size_t curr_size); void es_bulk_destroy(struct es_bulk *bulk); #endif