diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index 50af298e5f8..8e58350d9a6 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -260,6 +260,8 @@ static int elasticsearch_format(struct flb_config *config, char index_formatted[256]; char es_uuid[37]; flb_sds_t out_buf; + size_t out_buf_len = 0; + flb_sds_t tmp_buf; flb_sds_t id_key_str = NULL; msgpack_unpacked result; msgpack_object root; @@ -346,12 +348,14 @@ static int elasticsearch_format(struct flb_config *config, index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT_WITHOUT_TYPE, + ctx->es_action, es_index); } else { index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT, + ctx->es_action, es_index, ctx->type); } } @@ -458,12 +462,14 @@ static int elasticsearch_format(struct flb_config *config, index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT_WITHOUT_TYPE, + ctx->es_action, es_index); } else { index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT, + ctx->es_action, es_index, ctx->type); } } @@ -509,12 +515,14 @@ static int elasticsearch_format(struct flb_config *config, index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE, + ctx->es_action, es_index, es_uuid); } else { index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT_ID, + ctx->es_action, es_index, ctx->type, es_uuid); } } @@ -525,12 +533,14 @@ static int elasticsearch_format(struct flb_config *config, index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE, + ctx->es_action, es_index, id_key_str); } else { index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT_ID, + ctx->es_action, es_index, ctx->type, id_key_str); } flb_sds_destroy(id_key_str); @@ -548,10 +558,25 @@ static int elasticsearch_format(struct flb_config *config, return -1; } + out_buf_len = flb_sds_len(out_buf); + if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPDATE) == 0) { + tmp_buf = out_buf; + out_buf = flb_sds_create_len(NULL, out_buf_len = out_buf_len + sizeof(ES_BULK_UPDATE_OP_BODY) - 2); + out_buf_len = snprintf(out_buf, out_buf_len, ES_BULK_UPDATE_OP_BODY, tmp_buf); + flb_sds_destroy(tmp_buf); + } + else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPSERT) == 0) { + tmp_buf = out_buf; + out_buf = flb_sds_create_len(NULL, out_buf_len = out_buf_len + sizeof(ES_BULK_UPSERT_OP_BODY) - 2); + out_buf_len = snprintf(out_buf, out_buf_len, ES_BULK_UPSERT_OP_BODY, tmp_buf); + flb_sds_destroy(tmp_buf); + } + ret = es_bulk_append(bulk, j_index, index_len, - out_buf, flb_sds_len(out_buf), + out_buf, out_buf_len, bytes, off_prev); flb_sds_destroy(out_buf); + off_prev = off; if (ret == -1) { /* We likely ran out of memory, abort here */ @@ -1067,6 +1092,11 @@ static struct flb_config_map config_map[] = { "When enabled, generate _id for outgoing records. This prevents duplicate " "records when retrying ES" }, + { + FLB_CONFIG_MAP_STR, "write_operation", "create", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, write_operation), + "Operation to use to write in bulk requests" + }, { FLB_CONFIG_MAP_STR, "id_key", NULL, 0, FLB_TRUE, offsetof(struct flb_elasticsearch, id_key), diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h index 0c4df1b8b67..b57a169ebf1 100644 --- a/plugins/out_es/es.h +++ b/plugins/out_es/es.h @@ -31,6 +31,10 @@ #define FLB_ES_DEFAULT_TIME_KEYF "%Y-%m-%dT%H:%M:%S" #define FLB_ES_DEFAULT_TAG_KEY "flb-key" #define FLB_ES_DEFAULT_HTTP_MAX "512k" +#define FLB_ES_WRITE_OP_INDEX "index" +#define FLB_ES_WRITE_OP_CREATE "create" +#define FLB_ES_WRITE_OP_UPDATE "update" +#define FLB_ES_WRITE_OP_UPSERT "upsert" struct flb_elasticsearch { /* Elasticsearch index (database) and type (table) */ @@ -101,10 +105,15 @@ struct flb_elasticsearch { int time_key_nanos; + /* write operation */ + flb_sds_t write_operation; + /* write operation elasticsearch operation */ + flb_sds_t es_action; + /* id_key */ flb_sds_t id_key; struct flb_record_accessor *ra_id_key; - + /* include_tag_key */ int include_tag_key; flb_sds_t tag_key; diff --git a/plugins/out_es/es_bulk.h b/plugins/out_es/es_bulk.h index 666bd28faff..c5db4c6d9e2 100644 --- a/plugins/out_es/es_bulk.h +++ b/plugins/out_es/es_bulk.h @@ -25,10 +25,12 @@ #define ES_BULK_CHUNK 4096 /* Size of buffer chunks */ #define ES_BULK_HEADER 165 /* ES Bulk API prefix line */ -#define ES_BULK_INDEX_FMT "{\"create\":{\"_index\":\"%s\",\"_type\":\"%s\"}}\n" -#define ES_BULK_INDEX_FMT_ID "{\"create\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\"}}\n" -#define ES_BULK_INDEX_FMT_WITHOUT_TYPE "{\"create\":{\"_index\":\"%s\"}}\n" -#define ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE "{\"create\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n" +#define ES_BULK_INDEX_FMT "{\"%s\":{\"_index\":\"%s\",\"_type\":\"%s\"}}\n" +#define ES_BULK_INDEX_FMT_ID "{\"%s\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\"}}\n" +#define ES_BULK_INDEX_FMT_WITHOUT_TYPE "{\"%s\":{\"_index\":\"%s\"}}\n" +#define ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE "{\"%s\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n" +#define ES_BULK_UPDATE_OP_BODY "{\"doc\":%s}" +#define ES_BULK_UPSERT_OP_BODY "{\"doc_as_upsert\":true,\"doc\":%s}" struct es_bulk { char *ptr; diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c index eca052f0af9..34e749fd9a0 100644 --- a/plugins/out_es/es_conf.c +++ b/plugins/out_es/es_conf.c @@ -232,7 +232,6 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk", path); } - if (ctx->id_key) { ctx->ra_id_key = flb_ra_create(ctx->id_key, FLB_FALSE); if (ctx->ra_id_key == NULL) { @@ -244,6 +243,30 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, } } + if (ctx->write_operation) { + if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_INDEX) == 0) { + ctx->es_action = flb_strdup(FLB_ES_WRITE_OP_INDEX); + } + else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_CREATE) == 0) { + ctx->es_action = flb_strdup(FLB_ES_WRITE_OP_CREATE); + } + else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPDATE) == 0 + || strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPSERT) == 0) { + ctx->es_action = flb_strdup(FLB_ES_WRITE_OP_UPDATE); + } + else { + flb_plg_error(ins, "wrong Write_Operation (should be one of index, create, update, upsert)"); + flb_es_conf_destroy(ctx); + return NULL; + } + if (strcasecmp(ctx->es_action, FLB_ES_WRITE_OP_UPDATE) == 0 + && !ctx->ra_id_key && ctx->generate_id == FLB_FALSE) { + flb_plg_error(ins, "Id_Key or Generate_Id must be set when Write_Operation update or upsert"); + flb_es_conf_destroy(ctx); + return NULL; + } + } + if (ctx->logstash_prefix_key) { if (ctx->logstash_prefix_key[0] != '$') { len = flb_sds_len(ctx->logstash_prefix_key); @@ -402,6 +425,9 @@ int flb_es_conf_destroy(struct flb_elasticsearch *ctx) flb_ra_destroy(ctx->ra_id_key); ctx->ra_id_key = NULL; } + if (ctx->es_action) { + flb_free(ctx->es_action); + } #ifdef FLB_HAVE_AWS if (ctx->base_aws_provider) { diff --git a/tests/runtime/out_elasticsearch.c b/tests/runtime/out_elasticsearch.c index 9ba0c15bcd6..69abbda5e32 100644 --- a/tests/runtime/out_elasticsearch.c +++ b/tests/runtime/out_elasticsearch.c @@ -7,13 +7,77 @@ #include "data/es/json_es.h" /* JSON_ES */ +static void cb_check_write_op_index(void *ctx, int ffd, + int res_ret, void *res_data, + size_t res_size, void *data) +{ + char *p; + char *out_js = res_data; + char *index_line = "{\"index\":{"; + + p = strstr(out_js, index_line); + TEST_CHECK(p == out_js); + + flb_free(res_data); +} + +static void cb_check_write_op_create(void *ctx, int ffd, + int res_ret, void *res_data, + size_t res_size, void *data) +{ + char *p; + char *out_js = res_data; + char *index_line = "{\"create\":{"; + + p = strstr(out_js, index_line); + TEST_CHECK(p == out_js); + + flb_free(res_data); +} + +static void cb_check_write_op_update(void *ctx, int ffd, + int res_ret, void *res_data, + size_t res_size, void *data) +{ + char *p; + char *b; + char *out_js = res_data; + char *index_line = "{\"update\":{"; + char *body = "{\"doc\":"; + + p = strstr(out_js, index_line); + TEST_CHECK(p == out_js); + b = strstr(out_js, body); + TEST_CHECK(b != NULL); + + flb_free(res_data); +} + +static void cb_check_write_op_upsert(void *ctx, int ffd, + int res_ret, void *res_data, + size_t res_size, void *data) +{ + char *p; + char *b; + char *out_js = res_data; + char *index_line = "{\"update\":{"; + char *body = "{\"doc_as_upsert\":true,\"doc\":"; + + p = strstr(out_js, index_line); + TEST_CHECK(p == out_js); + b = strstr(out_js, body); + TEST_CHECK(b != NULL); + + flb_free(res_data); +} + static void cb_check_index_type(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, void *data) { char *p; char *out_js = res_data; - char *index_line = "{\"index\":{\"_index\":\"index_test\",\"_type\":\"type_test\"}"; + char *index_line = "{\"create\":{\"_index\":\"index_test\",\"_type\":\"type_test\"}"; p = strstr(out_js, index_line); TEST_CHECK(p != NULL); @@ -27,7 +91,7 @@ static void cb_check_logstash_format(void *ctx, int ffd, { char *p; char *out_js = res_data; - char *index_line = "{\"index\":{\"_index\":\"prefix-2015-11-24\",\"_type\":\"_doc\"}"; + char *index_line = "{\"create\":{\"_index\":\"prefix-2015-11-24\",\"_type\":\"_doc\"}"; p = strstr(out_js, index_line); TEST_CHECK(p != NULL); @@ -86,6 +150,186 @@ static void cb_check_id_key(void *ctx, int ffd, flb_free(res_data); } +void flb_test_write_operation_index() +{ + int ret; + int size = sizeof(JSON_ES) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Override defaults of index and type */ + flb_output_set(ctx, out_ffd, + "write_operation", "index", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_write_op_index, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_write_operation_create() +{ + int ret; + int size = sizeof(JSON_ES) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Override defaults of index and type */ + flb_output_set(ctx, out_ffd, + "write_operation", "create", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_write_op_create, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + + +void flb_test_write_operation_update() +{ + int ret; + int size = sizeof(JSON_ES) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Override defaults of index and type */ + flb_output_set(ctx, out_ffd, + "Write_Operation", "Update", + "Generate_Id", "True", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_write_op_update, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + + +void flb_test_write_operation_upsert() +{ + int ret; + int size = sizeof(JSON_ES) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Override defaults of index and type */ + flb_output_set(ctx, out_ffd, + "Write_Operation", "Upsert", + "Generate_Id", "True", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_write_op_upsert, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + void flb_test_index_type() { int ret; @@ -493,13 +737,17 @@ void flb_test_long_index() /* Test list */ TEST_LIST = { - {"long_index" , flb_test_long_index }, - {"div0_error" , flb_test_div0 }, - {"index_type" , flb_test_index_type }, - {"logstash_format" , flb_test_logstash_format }, - {"logstash_format_nanos", flb_test_logstash_format_nanos }, - {"tag_key" , flb_test_tag_key }, - {"replace_dots" , flb_test_replace_dots }, - {"id_key" , flb_test_id_key }, + {"long_index" , flb_test_long_index }, + {"div0_error" , flb_test_div0 }, + {"write_operation_index" , flb_test_write_operation_index }, + {"write_operation_create", flb_test_write_operation_create }, + {"write_operation_update", flb_test_write_operation_update }, + {"write_operation_upsert", flb_test_write_operation_upsert }, + {"index_type" , flb_test_index_type }, + {"logstash_format" , flb_test_logstash_format }, + {"logstash_format_nanos" , flb_test_logstash_format_nanos }, + {"tag_key" , flb_test_tag_key }, + {"replace_dots" , flb_test_replace_dots }, + {"id_key" , flb_test_id_key }, {NULL, NULL} };