From 19e480582f9eb6936f8c95569a14323897748dbc Mon Sep 17 00:00:00 2001 From: bellengao Date: Wed, 19 Aug 2020 19:54:58 +0800 Subject: [PATCH] Fix wrong result when executing bulk requests with and without pipeline (#60818) --- .../rest-api-spec/test/bulk/90_pipeline.yml | 25 +++++++++++ .../elasticsearch/ingest/IngestService.java | 2 + .../ingest/IngestServiceTests.java | 45 +++++++++++++------ 3 files changed, 59 insertions(+), 13 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/bulk/90_pipeline.yml diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/90_pipeline.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/90_pipeline.yml new file mode 100644 index 0000000000000..9079cf1cfa079 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/90_pipeline.yml @@ -0,0 +1,25 @@ +--- +"One request has pipeline and another not": + - skip: + version: " - 7.99.99" + reason: "change after backporting" + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "test_index", "_id": "test_id1"}}' + - '{"f1": "v1", "f2": 42}' + - '{"index": {"_index": "test_index", "_id": "test_id2", "pipeline": "mypipeline"}}' + - '{"f1": "v2", "f2": 47}' + + - match: { errors: true } + - match: { items.0.index.result: created } + - match: { items.1.index.status: 400 } + - match: { items.1.index.error.type: illegal_argument_exception } + - match: { items.1.index.error.reason: "pipeline with id [mypipeline] does not exist" } + + - do: + count: + index: test_index + + - match: {count: 1} diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 9bf1eba8dc0ed..2b3dab3e5e009 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -473,6 +473,7 @@ protected void doRun() { onCompletion.accept(originalThread, null); } assert counter.get() >= 0; + i++; continue; } @@ -495,6 +496,7 @@ protected void doRun() { onCompletion.accept(originalThread, null); } assert counter.get() >= 0; + i++; continue; } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 8d420f806fdc0..4c8e87fd0add6 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -655,18 +655,26 @@ public String getType() { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final SetOnce failure = new SetOnce<>(); - final IndexRequest indexRequest = - new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline(id).setFinalPipeline("_none"); + BulkRequest bulkRequest = new BulkRequest(); + final IndexRequest indexRequest1 = + new IndexRequest("_index", "_type", "_id1").source(emptyMap()).setPipeline("_none").setFinalPipeline("_none"); + bulkRequest.add(indexRequest1); + IndexRequest indexRequest2 = + new IndexRequest("_index", "_type", "_id2").source(emptyMap()).setPipeline(id).setFinalPipeline("_none"); + bulkRequest.add(indexRequest2); + final BiConsumer failureHandler = (slot, e) -> { assertThat(e.getCause(), instanceOf(IllegalStateException.class)); assertThat(e.getCause().getMessage(), equalTo("error")); failure.set(true); + assertThat(slot, equalTo(1)); }; @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(bulkRequest.numberOfActions(), bulkRequest.requests(), failureHandler, + completionHandler, indexReq -> {}); assertTrue(failure.get()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -685,13 +693,15 @@ public void testExecuteBulkPipelineDoesNotExist() { BulkRequest bulkRequest = new BulkRequest(); - IndexRequest indexRequest1 = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); + IndexRequest indexRequest1 = + new IndexRequest("_index", "_type", "_id1").source(emptyMap()).setPipeline("_none").setFinalPipeline("_none"); bulkRequest.add(indexRequest1); - IndexRequest indexRequest2 = new IndexRequest("_index", "_type", "_id") - .source(Collections.emptyMap()) - .setPipeline("does_not_exist") - .setFinalPipeline("_none"); + IndexRequest indexRequest2 = + new IndexRequest("_index", "_type", "_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); bulkRequest.add(indexRequest2); + IndexRequest indexRequest3 = + new IndexRequest("_index", "_type", "_id3").source(Collections.emptyMap()).setPipeline("does_not_exist").setFinalPipeline("_none"); + bulkRequest.add(indexRequest3); @SuppressWarnings("unchecked") BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") @@ -702,7 +712,7 @@ public void testExecuteBulkPipelineDoesNotExist() { argThat(new CustomTypeSafeMatcher("failure handler was not called with the expected arguments") { @Override protected boolean matchesSafely(Integer item) { - return item == 1; + return item == 2; } }), @@ -1201,18 +1211,27 @@ public String getDescription() { ClusterState previousClusterState = clusterState; clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - final IndexRequest indexRequest = - new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + + BulkRequest bulkRequest = new BulkRequest(); + final IndexRequest indexRequest1 = + new IndexRequest("_index", "_type", "_id1").source(Collections.emptyMap()).setPipeline("_none").setFinalPipeline("_none"); + bulkRequest.add(indexRequest1); + + IndexRequest indexRequest2 = + new IndexRequest("_index", "_type", "_id2").source(Collections.emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + bulkRequest.add(indexRequest2); + @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final IntConsumer dropHandler = mock(IntConsumer.class); - ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, dropHandler); + ingestService.executeBulkRequest(bulkRequest.numberOfActions(), bulkRequest.requests(), failureHandler, + completionHandler, dropHandler); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); - verify(dropHandler, times(1)).accept(0); + verify(dropHandler, times(1)).accept(1); } public void testIngestClusterStateListeners_orderOfExecution() {