Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_es: estimate bulk size to less reallocation(#3775) #3788

Merged
merged 1 commit into from
Jul 19, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
@@ -252,6 +252,7 @@ static int elasticsearch_format(struct flb_config *config,
int index_len = 0;
size_t s = 0;
size_t off = 0;
size_t off_prev = 0;
char *p;
char *es_index;
char logstash_index[256];
@@ -300,7 +301,7 @@ static int elasticsearch_format(struct flb_config *config,
}

/* Create the bulk composer */
bulk = es_bulk_create();
bulk = es_bulk_create(bytes);
if (!bulk) {
return -1;
}
@@ -534,8 +535,10 @@ static int elasticsearch_format(struct flb_config *config,
}

ret = es_bulk_append(bulk, j_index, index_len,
out_buf, flb_sds_len(out_buf));
out_buf, flb_sds_len(out_buf),
bytes, off_prev);
flb_sds_destroy(out_buf);
off_prev = off;
if (ret == -1) {
/* We likely ran out of memory, abort here */
msgpack_unpacked_destroy(&result);
35 changes: 25 additions & 10 deletions plugins/out_es/es_bulk.c
Original file line number Diff line number Diff line change
@@ -25,24 +25,27 @@
#include <fluent-bit.h>
#include "es_bulk.h"

struct es_bulk *es_bulk_create()
struct es_bulk *es_bulk_create(size_t estimated_size)
{
struct es_bulk *b;

if (estimated_size < ES_BULK_CHUNK) {
estimated_size = ES_BULK_CHUNK;
}

b = flb_malloc(sizeof(struct es_bulk));
if (!b) {
perror("calloc");
return NULL;
}

b->ptr = flb_malloc(ES_BULK_CHUNK);
if (!b->ptr) {
b->ptr = flb_malloc(estimated_size);
if (b->ptr == NULL) {
perror("malloc");
flb_free(b);
return NULL;
}

b->size = ES_BULK_CHUNK;
b->size = estimated_size;
b->len = 0;

return b;
@@ -57,25 +60,37 @@ void es_bulk_destroy(struct es_bulk *bulk)
}

int es_bulk_append(struct es_bulk *bulk, char *index, int i_len,
char *json, size_t j_len)
char *json, size_t j_len,
size_t whole_size, size_t converted_size)
{
int available;
int new_size;
int append_size;
int required;
char *ptr;

required = j_len + ES_BULK_HEADER + 1;
available = (bulk->size - bulk->len);

if (available < required) {
new_size = bulk->size + available + required + ES_BULK_CHUNK;
ptr = flb_realloc(bulk->ptr, new_size);
/*
* estimate a converted size of json
* calculate
* 1. rest of msgpack data size
* 2. ratio from bulk json size and processed msgpack size.
*/
append_size = (whole_size - converted_size) /* rest of size to convert */
* (bulk->size / converted_size); /* = json size / msgpack size */
if (append_size < ES_BULK_CHUNK) {
/* append at least ES_BULK_CHUNK size */
append_size = ES_BULK_CHUNK;
}
ptr = flb_realloc(bulk->ptr, bulk->size + append_size);
if (!ptr) {
flb_errno();
return -1;
}
bulk->ptr = ptr;
bulk->size = new_size;
bulk->size += append_size;
}

memcpy(bulk->ptr + bulk->len, index, i_len);
5 changes: 3 additions & 2 deletions plugins/out_es/es_bulk.h
Original file line number Diff line number Diff line change
@@ -36,9 +36,10 @@ struct es_bulk {
uint32_t size;
};

struct es_bulk *es_bulk_create();
struct es_bulk *es_bulk_create(size_t estimated_size);
int es_bulk_append(struct es_bulk *bulk, char *index, int i_len,
char *json, size_t j_len);
char *json, size_t j_len,
size_t whole_size, size_t curr_size);
void es_bulk_destroy(struct es_bulk *bulk);

#endif