From 6e55b65f713e6bcf3a7c7426b2c03825485b4f9f Mon Sep 17 00:00:00 2001 From: carlosdelest Date: Thu, 14 Dec 2023 18:05:29 +0100 Subject: [PATCH] Changing methods that form the interface of IngestService - TransportBulkAction --- .../action/bulk/TransportBulkAction.java | 5 ++-- .../elasticsearch/ingest/IngestService.java | 21 ++++++++-------- .../bulk/TransportBulkActionIngestTests.java | 24 ++++++++++++++----- 3 files changed, 30 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 565b0de7096c8..2913a7dd8944d 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -303,12 +303,11 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec final long startTime = relativeTime(); boolean hasIndexRequestsWithPipelines = false; - final Metadata metadata = clusterService.state().getMetadata(); for (DocWriteRequest actionRequest : bulkRequest.requests) { IndexRequest indexRequest = getIndexWriteRequest(actionRequest); if (indexRequest != null) { - ingestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata); - hasIndexRequestsWithPipelines |= ingestService.hasPipeline(indexRequest); + ingestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest); + hasIndexRequestsWithPipelines |= IngestService.hasPipeline(indexRequest); } if (actionRequest instanceof IndexRequest ir) { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index b6e767872912e..6e40670195b32 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -252,20 +252,17 @@ private static Map processorFactories(List originalRequest, - final IndexRequest indexRequest, - final Metadata metadata + final IndexRequest indexRequest ) { - resolvePipelinesAndUpdateIndexRequest(originalRequest, indexRequest, metadata, System.currentTimeMillis()); + resolvePipelinesAndUpdateIndexRequest(originalRequest, indexRequest, System.currentTimeMillis()); } void resolvePipelinesAndUpdateIndexRequest( final DocWriteRequest originalRequest, final IndexRequest indexRequest, - final Metadata metadata, final long epochMillis ) { if (indexRequest.isPipelineResolved()) { @@ -274,7 +271,8 @@ void resolvePipelinesAndUpdateIndexRequest( String requestPipeline = indexRequest.getPipeline(); - Pipelines pipelines = resolvePipelinesFromMetadata(originalRequest, indexRequest, metadata, epochMillis) // + Metadata metadata = state.metadata(); + Pipelines pipelines = resolvePipelinesFromMetadata(originalRequest, indexRequest, epochMillis) // .or(() -> resolvePipelinesFromIndexTemplates(indexRequest, metadata)) .orElse(Pipelines.NO_PIPELINES_DEFINED); @@ -954,7 +952,7 @@ private void executePipelines( // clear the current pipeline, then re-resolve the pipelines for this request indexRequest.setPipeline(null); indexRequest.isPipelineResolved(false); - resolvePipelinesAndUpdateIndexRequest(null, indexRequest, state.metadata()); + resolvePipelinesAndUpdateIndexRequest(null, indexRequest); newPipelines = getAndResetPipelines(indexRequest); // for backwards compatibility, when a pipeline changes the target index for a document without using the reroute @@ -1360,11 +1358,11 @@ record PipelineHolder(PipelineConfiguration configuration, Pipeline pipeline) { private Optional resolvePipelinesFromMetadata( DocWriteRequest originalRequest, IndexRequest indexRequest, - Metadata metadata, long epochMillis ) { IndexMetadata indexMetadata = null; // start to look for default or final pipelines via settings found in the cluster metadata + Metadata metadata = state.metadata(); if (originalRequest != null) { indexMetadata = metadata.indices() .get(IndexNameExpressionResolver.resolveDateMathExpression(originalRequest.index(), epochMillis)); @@ -1390,12 +1388,13 @@ private Optional resolvePipelinesFromMetadata( } final Settings settings = indexMetadata.getSettings(); - List pluginsPipelines = getPluginsPipelines(indexName); + String writeIndexName = indexMetadata.getIndex().getName(); + List pluginsPipelines = getPluginsPipelines(writeIndexName); return Optional.of( new Pipelines( IndexSettings.DEFAULT_PIPELINE.get(settings), IndexSettings.FINAL_PIPELINE.get(settings), - pluginsPipelines == null ? NOOP_PIPELINE_NAME : indexName + pluginsPipelines == null ? NOOP_PIPELINE_NAME : writeIndexName ) ); } @@ -1452,7 +1451,7 @@ private static Optional resolvePipelinesFromIndexTemplates(IndexReque *

* This method assumes that the pipelines are beforehand resolved. */ - public boolean hasPipeline(IndexRequest indexRequest) { + public static boolean hasPipeline(IndexRequest indexRequest) { assert indexRequest.isPipelineResolved(); assert indexRequest.getPipeline() != null; assert indexRequest.getFinalPipeline() != null; diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index f30bceada65d9..d7fa88e8d7828 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -42,6 +42,7 @@ import org.elasticsearch.indices.EmptySystemIndices; import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.ingest.IngestServiceTests; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockUtils; @@ -61,6 +62,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; +import java.util.function.IntConsumer; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.sameInstance; @@ -68,9 +70,12 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -225,7 +230,15 @@ public void setupAction() { return null; }).when(clusterService).addStateApplier(any(ClusterStateApplier.class)); // setup the mocked ingest service for capturing calls - ingestService = mock(IngestService.class); + ingestService = spy(IngestServiceTests.createIngestService()); + doNothing().when(ingestService).executeBulkRequest( + anyInt(), + any(), + any(), + any(), + any(), + any() + ); action = new TestTransportBulkAction(); singleItemBulkWriteAction = new TestSingleItemBulkWriteAction(action); reset(transportService); // call on construction of action @@ -238,7 +251,7 @@ public void testIngestSkipped() throws Exception { bulkRequest.add(indexRequest); ActionTestUtils.execute(action, null, bulkRequest, ActionTestUtils.assertNoFailureListener(response -> {})); assertTrue(action.isExecuted); - verifyNoMoreInteractions(ingestService); + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any()); } public void testSingleItemBulkActionIngestSkipped() throws Exception { @@ -246,7 +259,7 @@ public void testSingleItemBulkActionIngestSkipped() throws Exception { indexRequest.source(Collections.emptyMap()); ActionTestUtils.execute(singleItemBulkWriteAction, null, indexRequest, ActionTestUtils.assertNoFailureListener(response -> {})); assertTrue(action.isExecuted); - verifyNoMoreInteractions(ingestService); + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any()); } public void testIngestLocal() throws Exception { @@ -294,7 +307,7 @@ public void testIngestLocal() throws Exception { completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); assertTrue(action.isExecuted); assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one - verifyNoMoreInteractions(transportService); + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any()); } public void testSingleItemBulkActionIngestLocal() throws Exception { @@ -606,8 +619,7 @@ public void testNotFindDefaultPipelineFromTemplateMatches() { }) ); assertEquals(IngestService.NOOP_PIPELINE_NAME, indexRequest.getPipeline()); - verifyNoMoreInteractions(ingestService); - + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any()); } public void testFindDefaultPipelineFromTemplateMatch() {