Skip to content

Commit

Permalink
Changing methods that form the interface of IngestService - Transport…
Browse files Browse the repository at this point in the history
…BulkAction
  • Loading branch information
carlosdelest committed Dec 14, 2023
1 parent 3a96bf9 commit 6e55b65
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,11 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
final long startTime = relativeTime();

boolean hasIndexRequestsWithPipelines = false;
final Metadata metadata = clusterService.state().getMetadata();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
if (indexRequest != null) {
ingestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata);
hasIndexRequestsWithPipelines |= ingestService.hasPipeline(indexRequest);
ingestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest);
hasIndexRequestsWithPipelines |= IngestService.hasPipeline(indexRequest);
}

if (actionRequest instanceof IndexRequest ir) {
Expand Down
21 changes: 10 additions & 11 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -252,20 +252,17 @@ private static Map<String, Processor.Factory> processorFactories(List<IngestPlug
*
* @param originalRequest Original write request received.
* @param indexRequest The {@link org.elasticsearch.action.index.IndexRequest} object to update.
* @param metadata Cluster metadata from where the pipeline information could be derived.
*/
public void resolvePipelinesAndUpdateIndexRequest(
final DocWriteRequest<?> originalRequest,
final IndexRequest indexRequest,
final Metadata metadata
final IndexRequest indexRequest
) {
resolvePipelinesAndUpdateIndexRequest(originalRequest, indexRequest, metadata, System.currentTimeMillis());
resolvePipelinesAndUpdateIndexRequest(originalRequest, indexRequest, System.currentTimeMillis());
}

void resolvePipelinesAndUpdateIndexRequest(
final DocWriteRequest<?> originalRequest,
final IndexRequest indexRequest,
final Metadata metadata,
final long epochMillis
) {
if (indexRequest.isPipelineResolved()) {
Expand All @@ -274,7 +271,8 @@ void resolvePipelinesAndUpdateIndexRequest(

String requestPipeline = indexRequest.getPipeline();

Pipelines pipelines = resolvePipelinesFromMetadata(originalRequest, indexRequest, metadata, epochMillis) //
Metadata metadata = state.metadata();
Pipelines pipelines = resolvePipelinesFromMetadata(originalRequest, indexRequest, epochMillis) //
.or(() -> resolvePipelinesFromIndexTemplates(indexRequest, metadata))
.orElse(Pipelines.NO_PIPELINES_DEFINED);

Expand Down Expand Up @@ -954,7 +952,7 @@ private void executePipelines(
// clear the current pipeline, then re-resolve the pipelines for this request
indexRequest.setPipeline(null);
indexRequest.isPipelineResolved(false);
resolvePipelinesAndUpdateIndexRequest(null, indexRequest, state.metadata());
resolvePipelinesAndUpdateIndexRequest(null, indexRequest);
newPipelines = getAndResetPipelines(indexRequest);

// for backwards compatibility, when a pipeline changes the target index for a document without using the reroute
Expand Down Expand Up @@ -1360,11 +1358,11 @@ record PipelineHolder(PipelineConfiguration configuration, Pipeline pipeline) {
private Optional<Pipelines> resolvePipelinesFromMetadata(
DocWriteRequest<?> originalRequest,
IndexRequest indexRequest,
Metadata metadata,
long epochMillis
) {
IndexMetadata indexMetadata = null;
// start to look for default or final pipelines via settings found in the cluster metadata
Metadata metadata = state.metadata();
if (originalRequest != null) {
indexMetadata = metadata.indices()
.get(IndexNameExpressionResolver.resolveDateMathExpression(originalRequest.index(), epochMillis));
Expand All @@ -1390,12 +1388,13 @@ private Optional<Pipelines> resolvePipelinesFromMetadata(
}

final Settings settings = indexMetadata.getSettings();
List<Pipeline> pluginsPipelines = getPluginsPipelines(indexName);
String writeIndexName = indexMetadata.getIndex().getName();
List<Pipeline> pluginsPipelines = getPluginsPipelines(writeIndexName);
return Optional.of(
new Pipelines(
IndexSettings.DEFAULT_PIPELINE.get(settings),
IndexSettings.FINAL_PIPELINE.get(settings),
pluginsPipelines == null ? NOOP_PIPELINE_NAME : indexName
pluginsPipelines == null ? NOOP_PIPELINE_NAME : writeIndexName
)
);
}
Expand Down Expand Up @@ -1452,7 +1451,7 @@ private static Optional<Pipelines> resolvePipelinesFromIndexTemplates(IndexReque
* <p>
* This method assumes that the pipelines are beforehand resolved.
*/
public boolean hasPipeline(IndexRequest indexRequest) {
public static boolean hasPipeline(IndexRequest indexRequest) {
assert indexRequest.isPipelineResolved();
assert indexRequest.getPipeline() != null;
assert indexRequest.getFinalPipeline() != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.indices.EmptySystemIndices;
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.IngestServiceTests;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockUtils;
Expand All @@ -61,16 +62,20 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.IntConsumer;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -225,7 +230,15 @@ public void setupAction() {
return null;
}).when(clusterService).addStateApplier(any(ClusterStateApplier.class));
// setup the mocked ingest service for capturing calls
ingestService = mock(IngestService.class);
ingestService = spy(IngestServiceTests.createIngestService());
doNothing().when(ingestService).executeBulkRequest(
anyInt(),
any(),
any(),
any(),
any(),
any()
);
action = new TestTransportBulkAction();
singleItemBulkWriteAction = new TestSingleItemBulkWriteAction(action);
reset(transportService); // call on construction of action
Expand All @@ -238,15 +251,15 @@ public void testIngestSkipped() throws Exception {
bulkRequest.add(indexRequest);
ActionTestUtils.execute(action, null, bulkRequest, ActionTestUtils.assertNoFailureListener(response -> {}));
assertTrue(action.isExecuted);
verifyNoMoreInteractions(ingestService);
verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any());
}

public void testSingleItemBulkActionIngestSkipped() throws Exception {
IndexRequest indexRequest = new IndexRequest("index").id("id");
indexRequest.source(Collections.emptyMap());
ActionTestUtils.execute(singleItemBulkWriteAction, null, indexRequest, ActionTestUtils.assertNoFailureListener(response -> {}));
assertTrue(action.isExecuted);
verifyNoMoreInteractions(ingestService);
verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any());
}

public void testIngestLocal() throws Exception {
Expand Down Expand Up @@ -294,7 +307,7 @@ public void testIngestLocal() throws Exception {
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyNoMoreInteractions(transportService);
verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any());
}

public void testSingleItemBulkActionIngestLocal() throws Exception {
Expand Down Expand Up @@ -606,8 +619,7 @@ public void testNotFindDefaultPipelineFromTemplateMatches() {
})
);
assertEquals(IngestService.NOOP_PIPELINE_NAME, indexRequest.getPipeline());
verifyNoMoreInteractions(ingestService);

verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any());
}

public void testFindDefaultPipelineFromTemplateMatch() {
Expand Down

0 comments on commit 6e55b65

Please sign in to comment.