Skip to content

Commit

Permalink
Fix wrong result when executing bulk requests with and without pipeli…
Browse files Browse the repository at this point in the history
  • Loading branch information
gaobinlong authored and danhermann committed Sep 1, 2020
1 parent 5723b92 commit feb38ca
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
"One request has pipeline and another not":
- skip:
version: " - 7.99.99"
reason: "change after backporting"
- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "test_index", "_id": "test_id1"}}'
- '{"f1": "v1", "f2": 42}'
- '{"index": {"_index": "test_index", "_id": "test_id2", "pipeline": "mypipeline"}}'
- '{"f1": "v2", "f2": 47}'

- match: { errors: true }
- match: { items.0.index.result: created }
- match: { items.1.index.status: 400 }
- match: { items.1.index.error.type: illegal_argument_exception }
- match: { items.1.index.error.reason: "pipeline with id [mypipeline] does not exist" }

- do:
count:
index: test_index

- match: {count: 1}
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ protected void doRun() {
onCompletion.accept(originalThread, null);
}
assert counter.get() >= 0;
i++;
continue;
}

Expand All @@ -495,6 +496,7 @@ protected void doRun() {
onCompletion.accept(originalThread, null);
}
assert counter.get() >= 0;
i++;
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,18 +655,26 @@ public String getType() {
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final SetOnce<Boolean> failure = new SetOnce<>();
final IndexRequest indexRequest =
new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline(id).setFinalPipeline("_none");
BulkRequest bulkRequest = new BulkRequest();
final IndexRequest indexRequest1 =
new IndexRequest("_index", "_type", "_id1").source(emptyMap()).setPipeline("_none").setFinalPipeline("_none");
bulkRequest.add(indexRequest1);
IndexRequest indexRequest2 =
new IndexRequest("_index", "_type", "_id2").source(emptyMap()).setPipeline(id).setFinalPipeline("_none");
bulkRequest.add(indexRequest2);

final BiConsumer<Integer, Exception> failureHandler = (slot, e) -> {
assertThat(e.getCause(), instanceOf(IllegalStateException.class));
assertThat(e.getCause().getMessage(), equalTo("error"));
failure.set(true);
assertThat(slot, equalTo(1));
};

@SuppressWarnings("unchecked")
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);

ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
ingestService.executeBulkRequest(bulkRequest.numberOfActions(), bulkRequest.requests(), failureHandler,
completionHandler, indexReq -> {});

assertTrue(failure.get());
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
Expand All @@ -685,13 +693,15 @@ public void testExecuteBulkPipelineDoesNotExist() {

BulkRequest bulkRequest = new BulkRequest();

IndexRequest indexRequest1 = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
IndexRequest indexRequest1 =
new IndexRequest("_index", "_type", "_id1").source(emptyMap()).setPipeline("_none").setFinalPipeline("_none");
bulkRequest.add(indexRequest1);
IndexRequest indexRequest2 = new IndexRequest("_index", "_type", "_id")
.source(Collections.emptyMap())
.setPipeline("does_not_exist")
.setFinalPipeline("_none");
IndexRequest indexRequest2 =
new IndexRequest("_index", "_type", "_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
bulkRequest.add(indexRequest2);
IndexRequest indexRequest3 =
new IndexRequest("_index", "_type", "_id3").source(emptyMap()).setPipeline("does_not_exist").setFinalPipeline("_none");
bulkRequest.add(indexRequest3);
@SuppressWarnings("unchecked")
BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
Expand All @@ -702,7 +712,7 @@ public void testExecuteBulkPipelineDoesNotExist() {
argThat(new CustomTypeSafeMatcher<Integer>("failure handler was not called with the expected arguments") {
@Override
protected boolean matchesSafely(Integer item) {
return item == 1;
return item == 2;
}

}),
Expand Down Expand Up @@ -1201,18 +1211,27 @@ public String getDescription() {
ClusterState previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final IndexRequest indexRequest =
new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");

BulkRequest bulkRequest = new BulkRequest();
final IndexRequest indexRequest1 =
new IndexRequest("_index", "_type", "_id1").source(Collections.emptyMap()).setPipeline("_none").setFinalPipeline("_none");
bulkRequest.add(indexRequest1);

IndexRequest indexRequest2 =
new IndexRequest("_index", "_type", "_id2").source(Collections.emptyMap()).setPipeline("_id").setFinalPipeline("_none");
bulkRequest.add(indexRequest2);

@SuppressWarnings("unchecked")
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final IntConsumer dropHandler = mock(IntConsumer.class);
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, dropHandler);
ingestService.executeBulkRequest(bulkRequest.numberOfActions(), bulkRequest.requests(), failureHandler,
completionHandler, dropHandler);
verify(failureHandler, never()).accept(any(), any());
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
verify(dropHandler, times(1)).accept(0);
verify(dropHandler, times(1)).accept(1);
}

public void testIngestClusterStateListeners_orderOfExecution() {
Expand Down

0 comments on commit feb38ca

Please sign in to comment.