diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index 6abb4b78e28..6a9be715c92 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -594,9 +594,9 @@ static int cb_es_init(struct flb_output_instance *ins, static int elasticsearch_error_check(struct flb_elasticsearch *ctx, struct flb_http_client *c) { - int i; + int i, j, k; int ret; - int check = FLB_TRUE; + int check = FLB_FALSE; int root_type; char *out_buf; size_t off = 0; @@ -605,6 +605,9 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, msgpack_object root; msgpack_object key; msgpack_object val; + msgpack_object item; + msgpack_object item_key; + msgpack_object item_val; /* * Check if our payload is complete: there is such situations where @@ -656,11 +659,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, goto done; } - if (key.via.str.size != 6) { - continue; - } - - if (strncmp(key.via.str.ptr, "errors", 6) == 0) { + if (key.via.str.size == 6 && strncmp(key.via.str.ptr, "errors", 6) == 0) { val = root.via.map.ptr[i].val; if (val.type != MSGPACK_OBJECT_BOOLEAN) { flb_plg_error(ctx->ins, "unexpected 'error' value type=%i", @@ -670,17 +669,72 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, } /* If error == false, we are OK (no errors = FLB_FALSE) */ - if (val.via.boolean) { - /* there is an error */ - check = FLB_TRUE; - goto done; - } - else { + if (!val.via.boolean) { /* no errors */ check = FLB_FALSE; goto done; } } + else if (key.via.str.size == 5 && strncmp(key.via.str.ptr, "items", 5) == 0) { + val = root.via.map.ptr[i].val; + if (val.type != MSGPACK_OBJECT_ARRAY) { + flb_plg_error(ctx->ins, "unexpected 'items' value type=%i", + val.type); + check = FLB_TRUE; + goto done; + } + + for (j = 0; j < val.via.array.size; j++) { + item = val.via.array.ptr[j]; + if (item.type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "unexpected 'item' outer value type=%i", + item.type); + check = FLB_TRUE; + goto done; + } + + if (item.via.map.size != 1) { + flb_plg_error(ctx->ins, "unexpected 'item' size=%i", + item.via.map.size); + check = FLB_TRUE; + goto done; + } + + item = item.via.map.ptr[0].val; + if (item.type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "unexpected 'item' inner value type=%i", + item.type); + check = FLB_TRUE; + goto done; + } + + for (k = 0; k < item.via.map.size; k++) { + item_key = item.via.map.ptr[k].key; + if (item_key.type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "unexpected key type=%i", + item_key.type); + check = FLB_TRUE; + goto done; + } + + if (item_key.via.str.size == 6 && strncmp(item_key.via.str.ptr, "status", 6) == 0) { + item_val = item.via.map.ptr[k].val; + + if (item_val.type != MSGPACK_OBJECT_POSITIVE_INTEGER) { + flb_plg_error(ctx->ins, "unexpected 'status' value type=%i", + item_val.type); + check = FLB_TRUE; + goto done; + } + /* Check for errors other than version conflict (document already exists) */ + if (item_val.via.i64 != 409) { + check = FLB_TRUE; + goto done; + } + } + } + } + } } done: diff --git a/plugins/out_es/es_bulk.h b/plugins/out_es/es_bulk.h index 49de50cf1af..4a4f6f4968b 100644 --- a/plugins/out_es/es_bulk.h +++ b/plugins/out_es/es_bulk.h @@ -25,10 +25,10 @@ #define ES_BULK_CHUNK 4096 /* Size of buffer chunks */ #define ES_BULK_HEADER 165 /* ES Bulk API prefix line */ -#define ES_BULK_INDEX_FMT "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\"}}\n" -#define ES_BULK_INDEX_FMT_ID "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\"}}\n" -#define ES_BULK_INDEX_FMT_WITHOUT_TYPE "{\"index\":{\"_index\":\"%s\"}}\n" -#define ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE "{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n" +#define ES_BULK_INDEX_FMT "{\"create\":{\"_index\":\"%s\",\"_type\":\"%s\"}}\n" +#define ES_BULK_INDEX_FMT_ID "{\"create\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\"}}\n" +#define ES_BULK_INDEX_FMT_WITHOUT_TYPE "{\"create\":{\"_index\":\"%s\"}}\n" +#define ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE "{\"create\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n" struct es_bulk { char *ptr;