Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix wrong result when executing bulk requests with and without pipeline #60818

Merged
merged 7 commits into from
Aug 19, 2020
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
"One request has pipeline and another not":
- skip:
version: " - 7.99.99"
reason: "change after backporting"
- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "test_index", "_id": "test_id1"}}'
- '{"f1": "v1", "f2": 42}'
- '{"index": {"_index": "test_index", "_id": "test_id2", "pipeline": "mypipeline"}}'
- '{"f1": "v2", "f2": 47}'

- match: { errors: true }
- match: { items.0.index.result: created }
- match: { items.1.index.status: 400 }
- match: { items.1.index.error.type: illegal_argument_exception }
- match: { items.1.index.error.reason: "pipeline with id [mypipeline] does not exist" }

- do:
count:
index: test_index

- match: {count: 1}
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ protected void doRun() {
onCompletion.accept(originalThread, null);
}
assert counter.get() >= 0;
i++;
continue;
}

Expand All @@ -494,6 +495,7 @@ protected void doRun() {
onCompletion.accept(originalThread, null);
}
assert counter.get() >= 0;
i++;
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,17 +636,27 @@ public String getType() {
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final SetOnce<Boolean> failure = new SetOnce<>();
final IndexRequest indexRequest = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline(id).setFinalPipeline("_none");

BulkRequest bulkRequest = new BulkRequest();
final IndexRequest indexRequest1 =
new IndexRequest("_index").id("_id1").source(emptyMap()).setPipeline("_none").setFinalPipeline("_none");
bulkRequest.add(indexRequest1);
IndexRequest indexRequest2 =
new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline(id).setFinalPipeline("_none");
bulkRequest.add(indexRequest2);

final BiConsumer<Integer, Exception> failureHandler = (slot, e) -> {
assertThat(e.getCause(), instanceOf(IllegalStateException.class));
assertThat(e.getCause().getMessage(), equalTo("error"));
failure.set(true);
assertThat(slot, equalTo(1));
};

@SuppressWarnings("unchecked")
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);

ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
ingestService.executeBulkRequest(bulkRequest.numberOfActions(), bulkRequest.requests(), failureHandler,
completionHandler, indexReq -> {});

assertTrue(failure.get());
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
Expand All @@ -665,11 +675,15 @@ public void testExecuteBulkPipelineDoesNotExist() {

BulkRequest bulkRequest = new BulkRequest();

IndexRequest indexRequest1 = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
IndexRequest indexRequest1 =
new IndexRequest("_index").id("_id1").source(emptyMap()).setPipeline("_none").setFinalPipeline("_none");
bulkRequest.add(indexRequest1);
IndexRequest indexRequest2 =
new IndexRequest("_index").id("_id").source(Collections.emptyMap()).setPipeline("does_not_exist").setFinalPipeline("_none");
new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
bulkRequest.add(indexRequest2);
IndexRequest indexRequest3 =
new IndexRequest("_index").id("_id3").source(Collections.emptyMap()).setPipeline("does_not_exist").setFinalPipeline("_none");
bulkRequest.add(indexRequest3);
@SuppressWarnings("unchecked")
BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
Expand All @@ -680,7 +694,7 @@ public void testExecuteBulkPipelineDoesNotExist() {
argThat(new CustomTypeSafeMatcher<>("failure handler was not called with the expected arguments") {
@Override
protected boolean matchesSafely(Integer item) {
return item == 1;
return item == 2;
}

}),
Expand Down Expand Up @@ -1176,18 +1190,27 @@ public String getDescription() {
ClusterState previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final IndexRequest indexRequest =
new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");

BulkRequest bulkRequest = new BulkRequest();
final IndexRequest indexRequest1 =
new IndexRequest("_index").id("_id1").source(Collections.emptyMap()).setPipeline("_none").setFinalPipeline("_none");
bulkRequest.add(indexRequest1);

IndexRequest indexRequest2 =
new IndexRequest("_index").id("_id2").source(Collections.emptyMap()).setPipeline("_id").setFinalPipeline("_none");
bulkRequest.add(indexRequest2);

@SuppressWarnings("unchecked")
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final IntConsumer dropHandler = mock(IntConsumer.class);
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, dropHandler);
ingestService.executeBulkRequest(bulkRequest.numberOfActions(), bulkRequest.requests(), failureHandler,
completionHandler, dropHandler);
verify(failureHandler, never()).accept(any(), any());
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
verify(dropHandler, times(1)).accept(0);
verify(dropHandler, times(1)).accept(1);
}

public void testIngestClusterStateListeners_orderOfExecution() {
Expand Down