Skip to content

Commit

Permalink
out_es: support write_operation option (fluent#4079)
Browse files Browse the repository at this point in the history
* out_es: support write_operation option

Signed-off-by: Zhanibek Adilbekov <[email protected]>
  • Loading branch information
Zhanibek Adilbekov authored and 0Delta committed Jan 20, 2022
1 parent 00a7edd commit ef47a5f
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 17 deletions.
32 changes: 31 additions & 1 deletion plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ static int elasticsearch_format(struct flb_config *config,
char index_formatted[256];
char es_uuid[37];
flb_sds_t out_buf;
size_t out_buf_len = 0;
flb_sds_t tmp_buf;
flb_sds_t id_key_str = NULL;
msgpack_unpacked result;
msgpack_object root;
Expand Down Expand Up @@ -346,12 +348,14 @@ static int elasticsearch_format(struct flb_config *config,
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT_WITHOUT_TYPE,
ctx->es_action,
es_index);
}
else {
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT,
ctx->es_action,
es_index, ctx->type);
}
}
Expand Down Expand Up @@ -458,12 +462,14 @@ static int elasticsearch_format(struct flb_config *config,
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT_WITHOUT_TYPE,
ctx->es_action,
es_index);
}
else {
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT,
ctx->es_action,
es_index, ctx->type);
}
}
Expand Down Expand Up @@ -509,12 +515,14 @@ static int elasticsearch_format(struct flb_config *config,
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE,
ctx->es_action,
es_index, es_uuid);
}
else {
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT_ID,
ctx->es_action,
es_index, ctx->type, es_uuid);
}
}
Expand All @@ -525,12 +533,14 @@ static int elasticsearch_format(struct flb_config *config,
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE,
ctx->es_action,
es_index, id_key_str);
}
else {
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT_ID,
ctx->es_action,
es_index, ctx->type, id_key_str);
}
flb_sds_destroy(id_key_str);
Expand All @@ -548,10 +558,25 @@ static int elasticsearch_format(struct flb_config *config,
return -1;
}

out_buf_len = flb_sds_len(out_buf);
if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPDATE) == 0) {
tmp_buf = out_buf;
out_buf = flb_sds_create_len(NULL, out_buf_len = out_buf_len + sizeof(ES_BULK_UPDATE_OP_BODY) - 2);
out_buf_len = snprintf(out_buf, out_buf_len, ES_BULK_UPDATE_OP_BODY, tmp_buf);
flb_sds_destroy(tmp_buf);
}
else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPSERT) == 0) {
tmp_buf = out_buf;
out_buf = flb_sds_create_len(NULL, out_buf_len = out_buf_len + sizeof(ES_BULK_UPSERT_OP_BODY) - 2);
out_buf_len = snprintf(out_buf, out_buf_len, ES_BULK_UPSERT_OP_BODY, tmp_buf);
flb_sds_destroy(tmp_buf);
}

ret = es_bulk_append(bulk, j_index, index_len,
out_buf, flb_sds_len(out_buf),
out_buf, out_buf_len,
bytes, off_prev);
flb_sds_destroy(out_buf);

off_prev = off;
if (ret == -1) {
/* We likely ran out of memory, abort here */
Expand Down Expand Up @@ -1067,6 +1092,11 @@ static struct flb_config_map config_map[] = {
"When enabled, generate _id for outgoing records. This prevents duplicate "
"records when retrying ES"
},
{
FLB_CONFIG_MAP_STR, "write_operation", "create",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, write_operation),
"Operation to use to write in bulk requests"
},
{
FLB_CONFIG_MAP_STR, "id_key", NULL,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, id_key),
Expand Down
11 changes: 10 additions & 1 deletion plugins/out_es/es.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
#define FLB_ES_DEFAULT_TIME_KEYF "%Y-%m-%dT%H:%M:%S"
#define FLB_ES_DEFAULT_TAG_KEY "flb-key"
#define FLB_ES_DEFAULT_HTTP_MAX "512k"
#define FLB_ES_WRITE_OP_INDEX "index"
#define FLB_ES_WRITE_OP_CREATE "create"
#define FLB_ES_WRITE_OP_UPDATE "update"
#define FLB_ES_WRITE_OP_UPSERT "upsert"

struct flb_elasticsearch {
/* Elasticsearch index (database) and type (table) */
Expand Down Expand Up @@ -101,10 +105,15 @@ struct flb_elasticsearch {
int time_key_nanos;


/* write operation */
flb_sds_t write_operation;
/* write operation elasticsearch operation */
flb_sds_t es_action;

/* id_key */
flb_sds_t id_key;
struct flb_record_accessor *ra_id_key;

/* include_tag_key */
int include_tag_key;
flb_sds_t tag_key;
Expand Down
10 changes: 6 additions & 4 deletions plugins/out_es/es_bulk.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@

#define ES_BULK_CHUNK 4096 /* Size of buffer chunks */
#define ES_BULK_HEADER 165 /* ES Bulk API prefix line */
#define ES_BULK_INDEX_FMT "{\"create\":{\"_index\":\"%s\",\"_type\":\"%s\"}}\n"
#define ES_BULK_INDEX_FMT_ID "{\"create\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\"}}\n"
#define ES_BULK_INDEX_FMT_WITHOUT_TYPE "{\"create\":{\"_index\":\"%s\"}}\n"
#define ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE "{\"create\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n"
#define ES_BULK_INDEX_FMT "{\"%s\":{\"_index\":\"%s\",\"_type\":\"%s\"}}\n"
#define ES_BULK_INDEX_FMT_ID "{\"%s\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\"}}\n"
#define ES_BULK_INDEX_FMT_WITHOUT_TYPE "{\"%s\":{\"_index\":\"%s\"}}\n"
#define ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE "{\"%s\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n"
#define ES_BULK_UPDATE_OP_BODY "{\"doc\":%s}"
#define ES_BULK_UPSERT_OP_BODY "{\"doc_as_upsert\":true,\"doc\":%s}"

struct es_bulk {
char *ptr;
Expand Down
28 changes: 27 additions & 1 deletion plugins/out_es/es_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins,
snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk", path);
}


if (ctx->id_key) {
ctx->ra_id_key = flb_ra_create(ctx->id_key, FLB_FALSE);
if (ctx->ra_id_key == NULL) {
Expand All @@ -244,6 +243,30 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins,
}
}

if (ctx->write_operation) {
if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_INDEX) == 0) {
ctx->es_action = flb_strdup(FLB_ES_WRITE_OP_INDEX);
}
else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_CREATE) == 0) {
ctx->es_action = flb_strdup(FLB_ES_WRITE_OP_CREATE);
}
else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPDATE) == 0
|| strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPSERT) == 0) {
ctx->es_action = flb_strdup(FLB_ES_WRITE_OP_UPDATE);
}
else {
flb_plg_error(ins, "wrong Write_Operation (should be one of index, create, update, upsert)");
flb_es_conf_destroy(ctx);
return NULL;
}
if (strcasecmp(ctx->es_action, FLB_ES_WRITE_OP_UPDATE) == 0
&& !ctx->ra_id_key && ctx->generate_id == FLB_FALSE) {
flb_plg_error(ins, "Id_Key or Generate_Id must be set when Write_Operation update or upsert");
flb_es_conf_destroy(ctx);
return NULL;
}
}

if (ctx->logstash_prefix_key) {
if (ctx->logstash_prefix_key[0] != '$') {
len = flb_sds_len(ctx->logstash_prefix_key);
Expand Down Expand Up @@ -402,6 +425,9 @@ int flb_es_conf_destroy(struct flb_elasticsearch *ctx)
flb_ra_destroy(ctx->ra_id_key);
ctx->ra_id_key = NULL;
}
if (ctx->es_action) {
flb_free(ctx->es_action);
}

#ifdef FLB_HAVE_AWS
if (ctx->base_aws_provider) {
Expand Down
Loading

0 comments on commit ef47a5f

Please sign in to comment.