diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java index ebb7c65af8fef..f9a833f6fcf1f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java @@ -48,6 +48,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.function.BiConsumer; import java.util.function.Supplier; import static org.hamcrest.Matchers.containsString; @@ -359,9 +360,17 @@ public Map getProcessors(Processor.Parameters paramet new AbstractProcessor(tag, description) { @Override - public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { - ingestDocument.setFieldValue("default", true); - return ingestDocument; + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + // randomize over sync and async execution + randomFrom(parameters.genericExecutor, Runnable::run).accept(() -> { + ingestDocument.setFieldValue("default", true); + handler.accept(ingestDocument, null); + }); + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) { + throw new AssertionError("should not be called"); } @Override diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 24d5e676cb74a..f3aad86db4733 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -497,67 +497,61 @@ private void executePipelines( final BiConsumer onCompletion, final Thread originalThread ) { - while (it.hasNext()) { - final String pipelineId = it.next(); - try { - PipelineHolder holder = pipelines.get(pipelineId); - if (holder == null) { - throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); + assert it.hasNext(); + final String pipelineId = it.next(); + try { + PipelineHolder holder = pipelines.get(pipelineId); + if (holder == null) { + throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); + } + Pipeline pipeline = holder.pipeline; + String originalIndex = indexRequest.indices()[0]; + innerExecute(slot, indexRequest, pipeline, onDropped, e -> { + if (e != null) { + logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]", + pipelineId, indexRequest.index(), indexRequest.id()), e); + onFailure.accept(slot, e); } - Pipeline pipeline = holder.pipeline; - String originalIndex = indexRequest.indices()[0]; - innerExecute(slot, indexRequest, pipeline, onDropped, e -> { - if (e != null) { - logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]", - pipelineId, indexRequest.index(), indexRequest.id()), e); - onFailure.accept(slot, e); - } - Iterator newIt = it; - boolean newHasFinalPipeline = hasFinalPipeline; - String newIndex = indexRequest.indices()[0]; + Iterator newIt = it; + boolean newHasFinalPipeline = hasFinalPipeline; + String newIndex = indexRequest.indices()[0]; - if (Objects.equals(originalIndex, newIndex) == false) { - if (hasFinalPipeline && it.hasNext() == false) { - totalMetrics.ingestFailed(); - onFailure.accept(slot, new IllegalStateException("final pipeline [" + pipelineId + - "] can't change the target index")); + if (Objects.equals(originalIndex, newIndex) == false) { + if (hasFinalPipeline && it.hasNext() == false) { + totalMetrics.ingestFailed(); + onFailure.accept(slot, new IllegalStateException("final pipeline [" + pipelineId + + "] can't change the target index")); + } else { + indexRequest.isPipelineResolved(false); + resolvePipelines(null, indexRequest, state.metadata()); + if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) { + newIt = Collections.singleton(indexRequest.getFinalPipeline()).iterator(); + newHasFinalPipeline = true; } else { - - //Drain old it so it's not looped over - it.forEachRemaining($ -> { - }); - indexRequest.isPipelineResolved(false); - resolvePipelines(null, indexRequest, state.metadata()); - if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) { - newIt = Collections.singleton(indexRequest.getFinalPipeline()).iterator(); - newHasFinalPipeline = true; - } else { - newIt = Collections.emptyIterator(); - } + newIt = Collections.emptyIterator(); } } + } - if (newIt.hasNext()) { - executePipelines(slot, newIt, newHasFinalPipeline, indexRequest, onDropped, onFailure, counter, onCompletion, - originalThread); - } else { - if (counter.decrementAndGet() == 0) { - onCompletion.accept(originalThread, null); - } - assert counter.get() >= 0; + if (newIt.hasNext()) { + executePipelines(slot, newIt, newHasFinalPipeline, indexRequest, onDropped, onFailure, counter, onCompletion, + originalThread); + } else { + if (counter.decrementAndGet() == 0) { + onCompletion.accept(originalThread, null); } - }); - } catch (Exception e) { - logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]", - pipelineId, indexRequest.index(), indexRequest.id()), e); - onFailure.accept(slot, e); - if (counter.decrementAndGet() == 0) { - onCompletion.accept(originalThread, null); + assert counter.get() >= 0; } - assert counter.get() >= 0; - break; + }); + } catch (Exception e) { + logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]", + pipelineId, indexRequest.index(), indexRequest.id()), e); + onFailure.accept(slot, e); + if (counter.decrementAndGet() == 0) { + onCompletion.accept(originalThread, null); } + assert counter.get() >= 0; } }