Skip to content

Commit

Permalink
Fix ingest NPE for empty pipeline
Browse files Browse the repository at this point in the history
Signed-off-by: Liyun Xiu <[email protected]>
  • Loading branch information
chishui committed Jul 31, 2024
1 parent 5c19809 commit 623ef8a
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
10 changes: 9 additions & 1 deletion server/src/main/java/org/opensearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ private void innerBatchExecute(
Consumer<List<IngestDocumentWrapper>> handler
) {
if (pipeline.getProcessors().isEmpty()) {
handler.accept(null);
handler.accept(toIngestDocumentWrappers(slots, indexRequests));
return;
}

Expand Down Expand Up @@ -1271,6 +1271,14 @@ private static IngestDocumentWrapper toIngestDocumentWrapper(int slot, IndexRequ
return new IngestDocumentWrapper(slot, toIngestDocument(indexRequest), null);
}

private static List<IngestDocumentWrapper> toIngestDocumentWrappers(List<Integer> slots, List<IndexRequest> indexRequests) {
List<IngestDocumentWrapper> ingestDocumentWrappers = new ArrayList<>();
for (int i = 0; i < slots.size(); ++i) {
ingestDocumentWrappers.add(toIngestDocumentWrapper(slots.get(i), indexRequests.get(i)));
}
return ingestDocumentWrappers;
}

private static Map<Integer, IndexRequest> createSlotIndexRequestMap(List<Integer> slots, List<IndexRequest> indexRequests) {
Map<Integer, IndexRequest> slotIndexRequestMap = new HashMap<>();
for (int i = 0; i < slots.size(); ++i) {
Expand Down
37 changes: 37 additions & 0 deletions server/src/test/java/org/opensearch/ingest/IngestServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1995,6 +1995,43 @@ public void testExecuteBulkRequestInBatchWithDefaultBatchSize() {
verify(mockCompoundProcessor, never()).execute(any(), any());
}

public void testExecuteEmptyPipelineInBatch() throws Exception {
IngestService ingestService = createWithProcessors(emptyMap());
PutPipelineRequest putRequest = new PutPipelineRequest(
"_id",
new BytesArray("{\"processors\": [], \"description\": \"_description\"}"),
MediaTypeRegistry.JSON
);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
ClusterState previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
BulkRequest bulkRequest = new BulkRequest();
IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
bulkRequest.add(indexRequest1);
IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
bulkRequest.add(indexRequest2);
IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3").source(emptyMap()).setPipeline("_none").setFinalPipeline("_id");
bulkRequest.add(indexRequest3);
IndexRequest indexRequest4 = new IndexRequest("_index").id("_id4").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
bulkRequest.add(indexRequest4);
@SuppressWarnings("unchecked")
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(
4,
bulkRequest.requests(),
failureHandler,
completionHandler,
indexReq -> {},
Names.WRITE,
mockBulkRequest
);
verify(failureHandler, never()).accept(any(), any());
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
}

public void testPrepareBatches_same_index_pipeline() {
IngestService.IndexRequestWrapper wrapper1 = createIndexRequestWrapper("index1", Collections.singletonList("p1"));
IngestService.IndexRequestWrapper wrapper2 = createIndexRequestWrapper("index1", Collections.singletonList("p1"));
Expand Down

0 comments on commit 623ef8a

Please sign in to comment.