diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index aaa92c7b7d782..83c6e73795d2e 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -18,7 +18,6 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; -import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; @@ -45,7 +44,6 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; @@ -719,87 +717,6 @@ void validatePipeline(Map ingestInfos, String pipelin ExceptionsHelper.rethrowAndSuppress(exceptions); } - public void executeBulkRequest( - final int numberOfActionRequests, - final Iterable> actionRequests, - final IntConsumer onDropped, - final BiConsumer onFailure, - final BiConsumer onCompletion, - final String executorName - ) { - assert numberOfActionRequests > 0 : "numberOfActionRequests must be greater than 0 but was [" + numberOfActionRequests + "]"; - - threadPool.executor(executorName).execute(new AbstractRunnable() { - - @Override - public void onFailure(Exception e) { - onCompletion.accept(null, e); - } - - @Override - protected void doRun() { - final Thread originalThread = Thread.currentThread(); - try (var refs = new RefCountingRunnable(() -> onCompletion.accept(originalThread, null))) { - int slot = 0; - for (DocWriteRequest actionRequest : actionRequests) { - IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest); - if (indexRequest != null) { - processIndexRequest(indexRequest, slot, refs, onDropped, onFailure); - } - slot++; - } - } - } - }); - } - - private void executePipelines( - DocWriteRequest actionRequest, - final int slot, - final Releasable ref, - IndexRequest indexRequest, - PipelineIterator pipelines, - IntConsumer onDropped, - BiConsumer onFailure - ) { - // start the stopwatch and acquire a ref to indicate that we're working on this document - final long startTimeInNanos = System.nanoTime(); - totalMetrics.preIngest(); - // the document listener gives us three-way logic: a document can fail processing (1), or it can - // be successfully processed. a successfully processed document can be kept (2) or dropped (3). - final ActionListener documentListener = ActionListener.runAfter(new ActionListener<>() { - @Override - public void onResponse(Boolean kept) { - assert kept != null; - if (kept == false) { - onDropped.accept(slot); - } - } - - @Override - public void onFailure(Exception e) { - totalMetrics.ingestFailed(); - onFailure.accept(slot, e); - } - }, () -> { - // regardless of success or failure, we always stop the ingest "stopwatch" and release the ref to indicate - // that we're finished with this document - final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos; - totalMetrics.postIngest(ingestTimeInNanos); - ref.close(); - }); - DocumentParsingObserver documentParsingObserver = documentParsingObserverSupplier.get(); - - IngestDocument ingestDocument = newIngestDocument(indexRequest, documentParsingObserver); - - executePipelines(pipelines, indexRequest, ingestDocument, documentListener); - indexRequest.setPipelinesHaveRun(); - - assert actionRequest.index() != null; - documentParsingObserver.setIndexName(actionRequest.index()); - documentParsingObserver.close(); - } - /** * Returns the pipelines of the request, and updates the request so that it no longer references * any pipelines (both the default and final pipeline are set to the noop pipeline). diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 33bc30bf11ed1..a87c159453629 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -283,7 +283,8 @@ public void testIngestLocal() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest( + verify(ingestService).processBulkRequest( + any(), eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), any(), @@ -325,7 +326,8 @@ public void testSingleItemBulkActionIngestLocal() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest( + verify(ingestService).processBulkRequest( + any(), eq(1), bulkDocsItr.capture(), any(), @@ -371,7 +373,8 @@ public void testIngestSystemLocal() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest( + verify(ingestService).processBulkRequest( + any(), eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), any(), @@ -408,7 +411,7 @@ public void testIngestForward() throws Exception { ActionTestUtils.execute(action, null, bulkRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any()); + verify(ingestService, never()).processBulkRequest(any(), anyInt(), any(), any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -448,7 +451,7 @@ public void testSingleItemBulkActionIngestForward() throws Exception { ActionTestUtils.execute(singleItemBulkWriteAction, null, indexRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any()); + verify(ingestService, never()).processBulkRequest(any(), anyInt(), any(), any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -528,7 +531,8 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest( + verify(ingestService).processBulkRequest( + any(), eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), any(), @@ -576,7 +580,8 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception { assertFalse(action.indexCreated); // no index yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest( + verify(ingestService).processBulkRequest( + any(), eq(1), bulkDocsItr.capture(), any(), @@ -670,7 +675,8 @@ public void testFindDefaultPipelineFromTemplateMatch() { ); assertEquals("pipeline2", indexRequest.getPipeline()); - verify(ingestService).executeBulkRequest( + verify(ingestService).processBulkRequest( + any(), eq(1), bulkDocsItr.capture(), any(), @@ -714,7 +720,8 @@ public void testFindDefaultPipelineFromV2TemplateMatch() { ); assertEquals("pipeline2", indexRequest.getPipeline()); - verify(ingestService).executeBulkRequest( + verify(ingestService).processBulkRequest( + any(), eq(1), bulkDocsItr.capture(), any(), @@ -741,7 +748,8 @@ public void testIngestCallbackExceptionHandled() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest( + verify(ingestService).processBulkRequest( + any(), eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), any(), @@ -778,7 +786,8 @@ private void validateDefaultPipeline(IndexRequest indexRequest) { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest( + verify(ingestService).processBulkRequest( + any(), eq(1), bulkDocsItr.capture(), any(), diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 3b114cf0a618e..3514176f92fd9 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -207,7 +207,15 @@ public void testExecuteIndexPipelineDoesNotExist() { @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.processBulkRequest( + EsExecutors.DIRECT_EXECUTOR_SERVICE, + 1, + List.of(indexRequest), + indexReq -> {}, + failureHandler, + completionHandler, + Names.WRITE + ); assertTrue(failure.get()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1106,7 +1114,8 @@ public String getType() { @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest( + ingestService.processBulkRequest( + EsExecutors.DIRECT_EXECUTOR_SERVICE, bulkRequest.numberOfActions(), bulkRequest.requests(), indexReq -> {}, @@ -1149,7 +1158,8 @@ public void testExecuteBulkPipelineDoesNotExist() { BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest( + ingestService.processBulkRequest( + EsExecutors.DIRECT_EXECUTOR_SERVICE, bulkRequest.numberOfActions(), bulkRequest.requests(), indexReq -> {}, @@ -1213,7 +1223,8 @@ public void close() { BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest( + ingestService.processBulkRequest( + EsExecutors.DIRECT_EXECUTOR_SERVICE, bulkRequest.numberOfActions(), bulkRequest.requests(), indexReq -> {}, @@ -1246,7 +1257,15 @@ public void testExecuteSuccess() { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.processBulkRequest( + EsExecutors.DIRECT_EXECUTOR_SERVICE, + 1, + List.of(indexRequest), + indexReq -> {}, + failureHandler, + completionHandler, + Names.WRITE + ); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } @@ -1279,7 +1298,15 @@ public void testDynamicTemplates() throws Exception { CountDownLatch latch = new CountDownLatch(1); final BiConsumer failureHandler = (v, e) -> { throw new AssertionError("must never fail", e); }; final BiConsumer completionHandler = (t, e) -> latch.countDown(); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.processBulkRequest( + EsExecutors.DIRECT_EXECUTOR_SERVICE, + 1, + List.of(indexRequest), + indexReq -> {}, + failureHandler, + completionHandler, + Names.WRITE + ); latch.await(); assertThat(indexRequest.getDynamicTemplates(), equalTo(Map.of("foo", "bar", "foo.bar", "baz"))); } @@ -1300,7 +1327,15 @@ public void testExecuteEmptyPipeline() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.processBulkRequest( + EsExecutors.DIRECT_EXECUTOR_SERVICE, + 1, + List.of(indexRequest), + indexReq -> {}, + failureHandler, + completionHandler, + Names.WRITE + ); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } @@ -1354,7 +1389,15 @@ public void testExecutePropagateAllMetadataUpdates() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.processBulkRequest( + EsExecutors.DIRECT_EXECUTOR_SERVICE, + 1, + List.of(indexRequest), + indexReq -> {}, + failureHandler, + completionHandler, + Names.WRITE + ); verify(processor).execute(any(), any()); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1403,7 +1446,15 @@ public void testExecuteFailure() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.processBulkRequest( + EsExecutors.DIRECT_EXECUTOR_SERVICE, + 1, + List.of(indexRequest), + indexReq -> {}, + failureHandler, + completionHandler, + Names.WRITE + ); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class)); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1452,7 +1503,15 @@ public void testExecuteSuccessWithOnFailure() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.processBulkRequest( + EsExecutors.DIRECT_EXECUTOR_SERVICE, + 1, + List.of(indexRequest), + indexReq -> {}, + failureHandler, + completionHandler, + Names.WRITE + ); verify(failureHandler, never()).accept(eq(0), any(IngestProcessorException.class)); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } @@ -1495,7 +1554,15 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.processBulkRequest( + EsExecutors.DIRECT_EXECUTOR_SERVICE, + 1, + List.of(indexRequest), + indexReq -> {}, + failureHandler, + completionHandler, + Names.WRITE + ); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class)); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1549,7 +1616,8 @@ public void testBulkRequestExecutionWithFailures() throws Exception { BiConsumer requestItemErrorHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest( + ingestService.processBulkRequest( + EsExecutors.DIRECT_EXECUTOR_SERVICE, numRequest, bulkRequest.requests(), indexReq -> {}, @@ -1607,7 +1675,8 @@ public void testBulkRequestExecution() throws Exception { BiConsumer requestItemErrorHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest( + ingestService.processBulkRequest( + EsExecutors.DIRECT_EXECUTOR_SERVICE, numRequest, bulkRequest.requests(), indexReq -> {}, @@ -1720,7 +1789,15 @@ public String execute() { final IndexRequest indexRequest = new IndexRequest("_index"); indexRequest.setPipeline("_id1").setFinalPipeline("_id2"); indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE); + ingestService.processBulkRequest( + EsExecutors.DIRECT_EXECUTOR_SERVICE, + 1, + List.of(indexRequest), + indexReq -> {}, + (integer, e) -> {}, + (thread, e) -> {}, + Names.WRITE + ); { final IngestStats ingestStats = ingestService.stats(); @@ -1791,7 +1868,15 @@ public void testStats() throws Exception { final IndexRequest indexRequest = new IndexRequest("_index"); indexRequest.setPipeline("_id1").setFinalPipeline("_none"); indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.processBulkRequest( + EsExecutors.DIRECT_EXECUTOR_SERVICE, + 1, + List.of(indexRequest), + indexReq -> {}, + failureHandler, + completionHandler, + Names.WRITE + ); final IngestStats afterFirstRequestStats = ingestService.stats(); assertThat(afterFirstRequestStats.pipelineStats().size(), equalTo(2)); @@ -1808,7 +1893,15 @@ public void testStats() throws Exception { assertProcessorStats(0, afterFirstRequestStats, "_id2", 0, 0, 0); indexRequest.setPipeline("_id2"); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.processBulkRequest( + EsExecutors.DIRECT_EXECUTOR_SERVICE, + 1, + List.of(indexRequest), + indexReq -> {}, + failureHandler, + completionHandler, + Names.WRITE + ); final IngestStats afterSecondRequestStats = ingestService.stats(); assertThat(afterSecondRequestStats.pipelineStats().size(), equalTo(2)); // total @@ -1830,7 +1923,15 @@ public void testStats() throws Exception { clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.processBulkRequest( + EsExecutors.DIRECT_EXECUTOR_SERVICE, + 1, + List.of(indexRequest), + indexReq -> {}, + failureHandler, + completionHandler, + Names.WRITE + ); final IngestStats afterThirdRequestStats = ingestService.stats(); assertThat(afterThirdRequestStats.pipelineStats().size(), equalTo(2)); // total @@ -1853,7 +1954,15 @@ public void testStats() throws Exception { clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.processBulkRequest( + EsExecutors.DIRECT_EXECUTOR_SERVICE, + 1, + List.of(indexRequest), + indexReq -> {}, + failureHandler, + completionHandler, + Names.WRITE + ); final IngestStats afterForthRequestStats = ingestService.stats(); assertThat(afterForthRequestStats.pipelineStats().size(), equalTo(2)); // total @@ -1941,7 +2050,8 @@ public String getDescription() { final BiConsumer completionHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final IntConsumer dropHandler = mock(IntConsumer.class); - ingestService.executeBulkRequest( + ingestService.processBulkRequest( + EsExecutors.DIRECT_EXECUTOR_SERVICE, bulkRequest.numberOfActions(), bulkRequest.requests(), dropHandler, @@ -2029,7 +2139,15 @@ public void testCBORParsing() throws Exception { .setPipeline("_id") .setFinalPipeline("_none"); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE); + ingestService.processBulkRequest( + EsExecutors.DIRECT_EXECUTOR_SERVICE, + 1, + List.of(indexRequest), + indexReq -> {}, + (integer, e) -> {}, + (thread, e) -> {}, + Names.WRITE + ); } assertThat(reference.get(), is(instanceOf(byte[].class))); @@ -2100,7 +2218,15 @@ public void testSetsRawTimestamp() { bulkRequest.add(indexRequest6); bulkRequest.add(indexRequest7); bulkRequest.add(indexRequest8); - ingestService.executeBulkRequest(8, bulkRequest.requests(), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE); + ingestService.processBulkRequest( + EsExecutors.DIRECT_EXECUTOR_SERVICE, + 8, + bulkRequest.requests(), + indexReq -> {}, + (integer, e) -> {}, + (thread, e) -> {}, + Names.WRITE + ); assertThat(indexRequest1.getRawTimestamp(), nullValue()); assertThat(indexRequest2.getRawTimestamp(), nullValue());