Skip to content

Commit

Permalink
out_es: estimate bulk size to less reallocation(#3775)
Browse files Browse the repository at this point in the history
Signed-off-by: Takahiro Yamashita <[email protected]>
  • Loading branch information
nokute78 committed Jul 19, 2021
1 parent f72a273 commit f587cd3
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 14 deletions.
7 changes: 5 additions & 2 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ static int elasticsearch_format(struct flb_config *config,
int index_len = 0;
size_t s = 0;
size_t off = 0;
size_t off_prev = 0;
char *p;
char *es_index;
char logstash_index[256];
Expand Down Expand Up @@ -300,7 +301,7 @@ static int elasticsearch_format(struct flb_config *config,
}

/* Create the bulk composer */
bulk = es_bulk_create();
bulk = es_bulk_create(bytes);
if (!bulk) {
return -1;
}
Expand Down Expand Up @@ -534,8 +535,10 @@ static int elasticsearch_format(struct flb_config *config,
}

ret = es_bulk_append(bulk, j_index, index_len,
out_buf, flb_sds_len(out_buf));
out_buf, flb_sds_len(out_buf),
bytes, off_prev);
flb_sds_destroy(out_buf);
off_prev = off;
if (ret == -1) {
/* We likely ran out of memory, abort here */
msgpack_unpacked_destroy(&result);
Expand Down
35 changes: 25 additions & 10 deletions plugins/out_es/es_bulk.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,27 @@
#include <fluent-bit.h>
#include "es_bulk.h"

struct es_bulk *es_bulk_create()
struct es_bulk *es_bulk_create(size_t estimated_size)
{
struct es_bulk *b;

if (estimated_size < ES_BULK_CHUNK) {
estimated_size = ES_BULK_CHUNK;
}

b = flb_malloc(sizeof(struct es_bulk));
if (!b) {
perror("calloc");
return NULL;
}

b->ptr = flb_malloc(ES_BULK_CHUNK);
if (!b->ptr) {
b->ptr = flb_malloc(estimated_size);
if (b->ptr == NULL) {
perror("malloc");
flb_free(b);
return NULL;
}

b->size = ES_BULK_CHUNK;
b->size = estimated_size;
b->len = 0;

return b;
Expand All @@ -57,25 +60,37 @@ void es_bulk_destroy(struct es_bulk *bulk)
}

int es_bulk_append(struct es_bulk *bulk, char *index, int i_len,
char *json, size_t j_len)
char *json, size_t j_len,
size_t whole_size, size_t converted_size)
{
int available;
int new_size;
int append_size;
int required;
char *ptr;

required = j_len + ES_BULK_HEADER + 1;
available = (bulk->size - bulk->len);

if (available < required) {
new_size = bulk->size + available + required + ES_BULK_CHUNK;
ptr = flb_realloc(bulk->ptr, new_size);
/*
* estimate a converted size of json
* calculate
* 1. rest of msgpack data size
* 2. ratio from bulk json size and processed msgpack size.
*/
append_size = (whole_size - converted_size) /* rest of size to convert */
* (bulk->size / converted_size); /* = json size / msgpack size */
if (append_size < ES_BULK_CHUNK) {
/* append at least ES_BULK_CHUNK size */
append_size = ES_BULK_CHUNK;
}
ptr = flb_realloc(bulk->ptr, bulk->size + append_size);
if (!ptr) {
flb_errno();
return -1;
}
bulk->ptr = ptr;
bulk->size = new_size;
bulk->size += append_size;
}

memcpy(bulk->ptr + bulk->len, index, i_len);
Expand Down
5 changes: 3 additions & 2 deletions plugins/out_es/es_bulk.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ struct es_bulk {
uint32_t size;
};

struct es_bulk *es_bulk_create();
struct es_bulk *es_bulk_create(size_t estimated_size);
int es_bulk_append(struct es_bulk *bulk, char *index, int i_len,
char *json, size_t j_len);
char *json, size_t j_len,
size_t whole_size, size_t curr_size);
void es_bulk_destroy(struct es_bulk *bulk);

#endif

0 comments on commit f587cd3

Please sign in to comment.