Skip to content

Commit

Permalink
out_es: ensure integrity of already recorded logs
Browse files Browse the repository at this point in the history
Since ElasticSearch 7.5, the "create_doc" index privilege was introduced,
which ensures a role can only add new logs, but never modify or delete
previously recorded ones.

However, the "index" op_type has the semantic of changing a document if
it already exists with the same "_id". Therefore, any requests with the
"index" op_type are denied for a role whose only privilege is
"create_doc".

We solve this by replacing all "index" operations by the "create"
operation. However, this has the side effect of producing status 409
errors whenever a previously successful operation is retried and the
Generate_ID option is turned on. Therefore, we change the
"elasticsearch_error_check" function to ignore this kind of error.

Signed-off-by: Paulo Matias <[email protected]>
Signed-off-by: Fujimoto Seiji <[email protected]>
  • Loading branch information
thotypous authored and fujimotos committed May 3, 2021
1 parent c7dc097 commit 7f0db9e
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 17 deletions.
80 changes: 67 additions & 13 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -594,9 +594,9 @@ static int cb_es_init(struct flb_output_instance *ins,
static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
struct flb_http_client *c)
{
int i;
int i, j, k;
int ret;
int check = FLB_TRUE;
int check = FLB_FALSE;
int root_type;
char *out_buf;
size_t off = 0;
Expand All @@ -605,6 +605,9 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
msgpack_object root;
msgpack_object key;
msgpack_object val;
msgpack_object item;
msgpack_object item_key;
msgpack_object item_val;

/*
* Check if our payload is complete: there is such situations where
Expand Down Expand Up @@ -656,11 +659,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
goto done;
}

if (key.via.str.size != 6) {
continue;
}

if (strncmp(key.via.str.ptr, "errors", 6) == 0) {
if (key.via.str.size == 6 && strncmp(key.via.str.ptr, "errors", 6) == 0) {
val = root.via.map.ptr[i].val;
if (val.type != MSGPACK_OBJECT_BOOLEAN) {
flb_plg_error(ctx->ins, "unexpected 'error' value type=%i",
Expand All @@ -670,17 +669,72 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
}

/* If error == false, we are OK (no errors = FLB_FALSE) */
if (val.via.boolean) {
/* there is an error */
check = FLB_TRUE;
goto done;
}
else {
if (!val.via.boolean) {
/* no errors */
check = FLB_FALSE;
goto done;
}
}
else if (key.via.str.size == 5 && strncmp(key.via.str.ptr, "items", 5) == 0) {
val = root.via.map.ptr[i].val;
if (val.type != MSGPACK_OBJECT_ARRAY) {
flb_plg_error(ctx->ins, "unexpected 'items' value type=%i",
val.type);
check = FLB_TRUE;
goto done;
}

for (j = 0; j < val.via.array.size; j++) {
item = val.via.array.ptr[j];
if (item.type != MSGPACK_OBJECT_MAP) {
flb_plg_error(ctx->ins, "unexpected 'item' outer value type=%i",
item.type);
check = FLB_TRUE;
goto done;
}

if (item.via.map.size != 1) {
flb_plg_error(ctx->ins, "unexpected 'item' size=%i",
item.via.map.size);
check = FLB_TRUE;
goto done;
}

item = item.via.map.ptr[0].val;
if (item.type != MSGPACK_OBJECT_MAP) {
flb_plg_error(ctx->ins, "unexpected 'item' inner value type=%i",
item.type);
check = FLB_TRUE;
goto done;
}

for (k = 0; k < item.via.map.size; k++) {
item_key = item.via.map.ptr[k].key;
if (item_key.type != MSGPACK_OBJECT_STR) {
flb_plg_error(ctx->ins, "unexpected key type=%i",
item_key.type);
check = FLB_TRUE;
goto done;
}

if (item_key.via.str.size == 6 && strncmp(item_key.via.str.ptr, "status", 6) == 0) {
item_val = item.via.map.ptr[k].val;

if (item_val.type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
flb_plg_error(ctx->ins, "unexpected 'status' value type=%i",
item_val.type);
check = FLB_TRUE;
goto done;
}
/* Check for errors other than version conflict (document already exists) */
if (item_val.via.i64 != 409) {
check = FLB_TRUE;
goto done;
}
}
}
}
}
}

done:
Expand Down
8 changes: 4 additions & 4 deletions plugins/out_es/es_bulk.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@

#define ES_BULK_CHUNK 4096 /* Size of buffer chunks */
#define ES_BULK_HEADER 165 /* ES Bulk API prefix line */
#define ES_BULK_INDEX_FMT "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\"}}\n"
#define ES_BULK_INDEX_FMT_ID "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\"}}\n"
#define ES_BULK_INDEX_FMT_WITHOUT_TYPE "{\"index\":{\"_index\":\"%s\"}}\n"
#define ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE "{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n"
#define ES_BULK_INDEX_FMT "{\"create\":{\"_index\":\"%s\",\"_type\":\"%s\"}}\n"
#define ES_BULK_INDEX_FMT_ID "{\"create\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\"}}\n"
#define ES_BULK_INDEX_FMT_WITHOUT_TYPE "{\"create\":{\"_index\":\"%s\"}}\n"
#define ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE "{\"create\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n"

struct es_bulk {
char *ptr;
Expand Down

0 comments on commit 7f0db9e

Please sign in to comment.