diff --git a/docs/reference/ingest/ingest-node.asciidoc b/docs/reference/ingest/ingest-node.asciidoc index e1b349b84bd1a..32e63b9448366 100644 --- a/docs/reference/ingest/ingest-node.asciidoc +++ b/docs/reference/ingest/ingest-node.asciidoc @@ -378,7 +378,7 @@ The `if` condition can be more then a simple equality check. The full power of the <> is available and running in the {painless}/painless-ingest-processor-context.html[ingest processor context]. -IMPORTANT: The value of ctx is read-only in `if` conditions. +IMPORTANT: The value of ctx is read-only in `if` conditions. A more complex `if` condition that drops the document (i.e. not index it) unless it has a multi-valued tag field with at least one value that contains the characters @@ -722,8 +722,9 @@ The `ignore_failure` can be set on any processor and defaults to `false`. You may want to retrieve the actual error message that was thrown by a failed processor. To do so you can access metadata fields called -`on_failure_message`, `on_failure_processor_type`, and `on_failure_processor_tag`. These fields are only accessible -from within the context of an `on_failure` block. +`on_failure_message`, `on_failure_processor_type`, `on_failure_processor_tag` and +`on_failure_pipeline` (in case an error occurred inside a pipeline processor). +These fields are only accessible from within the context of an `on_failure` block. Here is an updated version of the example that you saw earlier. But instead of setting the error message manually, the example leverages the `on_failure_message` diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml index 5df08b7cf90d0..7f4af85cccf91 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml @@ -107,4 +107,4 @@ teardown: pipeline: "outer" body: {} - match: { error.root_cause.0.type: "ingest_processor_exception" } -- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: inner" } +- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: outer" } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index 966190ee0ad4e..3f49682183ee4 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -54,7 +54,7 @@ void executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean v handler.accept(new SimulateDocumentVerboseResult(processorResultList), e); }); } else { - pipeline.execute(ingestDocument, (result, e) -> { + ingestDocument.executePipeline(pipeline, (result, e) -> { if (e == null) { handler.accept(new SimulateDocumentBaseResult(result), null); } else { diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 504795d8d39a3..9cc414c5a15d6 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -40,6 +40,7 @@ public class CompoundProcessor implements Processor { public static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message"; public static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type"; public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag"; + public static final String ON_FAILURE_PIPELINE_FIELD = "on_failure_pipeline"; private final boolean ignoreFailure; private final List processors; @@ -144,7 +145,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume innerExecute(currentProcessor + 1, ingestDocument, handler); } else { IngestProcessorException compoundProcessorException = - newCompoundProcessorException(e, processor.getType(), processor.getTag()); + newCompoundProcessorException(e, processor, ingestDocument); if (onFailureProcessors.isEmpty()) { handler.accept(null, compoundProcessorException); } else { @@ -177,7 +178,7 @@ void executeOnFailureAsync(int currentOnFailureProcessor, IngestDocument ingestD onFailureProcessor.execute(ingestDocument, (result, e) -> { if (e != null) { removeFailureMetadata(ingestDocument); - handler.accept(null, newCompoundProcessorException(e, onFailureProcessor.getType(), onFailureProcessor.getTag())); + handler.accept(null, newCompoundProcessorException(e, onFailureProcessor, ingestDocument)); return; } if (result == null) { @@ -192,12 +193,17 @@ void executeOnFailureAsync(int currentOnFailureProcessor, IngestDocument ingestD private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) { List processorTypeHeader = cause.getHeader("processor_type"); List processorTagHeader = cause.getHeader("processor_tag"); + List processorOriginHeader = cause.getHeader("pipeline_origin"); String failedProcessorType = (processorTypeHeader != null) ? processorTypeHeader.get(0) : null; String failedProcessorTag = (processorTagHeader != null) ? processorTagHeader.get(0) : null; + String failedPipelineId = (processorOriginHeader != null) ? processorOriginHeader.get(0) : null; Map ingestMetadata = ingestDocument.getIngestMetadata(); ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getRootCause().getMessage()); ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType); ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag); + if (failedPipelineId != null) { + ingestMetadata.put(ON_FAILURE_PIPELINE_FIELD, failedPipelineId); + } } private void removeFailureMetadata(IngestDocument ingestDocument) { @@ -205,21 +211,28 @@ private void removeFailureMetadata(IngestDocument ingestDocument) { ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD); ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD); ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD); + ingestMetadata.remove(ON_FAILURE_PIPELINE_FIELD); } - private IngestProcessorException newCompoundProcessorException(Exception e, String processorType, String processorTag) { + static IngestProcessorException newCompoundProcessorException(Exception e, Processor processor, IngestDocument document) { if (e instanceof IngestProcessorException && ((IngestProcessorException) e).getHeader("processor_type") != null) { return (IngestProcessorException) e; } IngestProcessorException exception = new IngestProcessorException(e); + String processorType = processor.getType(); if (processorType != null) { exception.addHeader("processor_type", processorType); } + String processorTag = processor.getTag(); if (processorTag != null) { exception.addHeader("processor_tag", processorTag); } + List pipelineStack = document.getPipelineStack(); + if (pipelineStack.size() > 1) { + exception.addHeader("pipeline_origin", pipelineStack); + } return exception; } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 34299cb475a27..4183b48dc5923 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -38,7 +38,7 @@ import java.util.Date; import java.util.EnumMap; import java.util.HashMap; -import java.util.IdentityHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -60,7 +60,7 @@ public final class IngestDocument { private final Map ingestMetadata; // Contains all pipelines that have been executed for this document - private final Set executedPipelines = Collections.newSetFromMap(new IdentityHashMap<>()); + private final Set executedPipelines = new LinkedHashSet<>(); public IngestDocument(String index, String type, String id, String routing, Long version, VersionType versionType, Map source) { @@ -647,9 +647,9 @@ private static Object deepCopy(Object value) { * @param handler handles the result or failure */ public void executePipeline(Pipeline pipeline, BiConsumer handler) { - if (executedPipelines.add(pipeline)) { + if (executedPipelines.add(pipeline.getId())) { pipeline.execute(this, (result, e) -> { - executedPipelines.remove(pipeline); + executedPipelines.remove(pipeline.getId()); handler.accept(result, e); }); } else { @@ -657,6 +657,15 @@ public void executePipeline(Pipeline pipeline, BiConsumer getPipelineStack() { + List pipelineStack = new ArrayList<>(executedPipelines); + Collections.reverse(pipelineStack); + return pipelineStack; + } + @Override public boolean equals(Object obj) { if (obj == this) { return true; } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index e61ec5bad8c04..e4515f1339286 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -501,7 +501,7 @@ private void innerExecute(int slot, IndexRequest indexRequest, Pipeline pipeline VersionType versionType = indexRequest.versionType(); Map sourceAsMap = indexRequest.sourceAsMap(); IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap); - pipeline.execute(ingestDocument, (result, e) -> { + ingestDocument.executePipeline(pipeline, (result, e) -> { long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos); totalMetrics.postIngest(ingestTimeInMillis); if (e != null) { diff --git a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java index b3b8ee9762dc1..09d995adc6593 100644 --- a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java @@ -28,12 +28,15 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongSupplier; +import static java.util.Collections.singletonList; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; @@ -118,8 +121,8 @@ public void testSingleProcessorWithOnFailureProcessor() throws Exception { LongSupplier relativeTimeProvider = mock(LongSupplier.class); when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1)); - CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1), - Collections.singletonList(processor2), relativeTimeProvider); + CompoundProcessor compoundProcessor = new CompoundProcessor(false, singletonList(processor1), + singletonList(processor2), relativeTimeProvider); compoundProcessor.execute(ingestDocument, (result, e) -> {}); verify(relativeTimeProvider, times(2)).getAsLong(); @@ -150,8 +153,8 @@ public String getTag() { LongSupplier relativeTimeProvider = mock(LongSupplier.class); when(relativeTimeProvider.getAsLong()).thenReturn(0L); - CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1), - Collections.singletonList(processor2), relativeTimeProvider); + CompoundProcessor compoundProcessor = new CompoundProcessor(false, singletonList(processor1), + singletonList(processor2), relativeTimeProvider); IngestDocument[] result = new IngestDocument[1]; compoundProcessor.execute(ingestDocument, (r, e) -> result[0] = r); assertThat(result[0], nullValue()); @@ -178,10 +181,10 @@ public void testSingleProcessorWithNestedFailures() throws Exception { }); LongSupplier relativeTimeProvider = mock(LongSupplier.class); when(relativeTimeProvider.getAsLong()).thenReturn(0L); - CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(false, Collections.singletonList(processorToFail), - Collections.singletonList(lastProcessor), relativeTimeProvider); - CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), - Collections.singletonList(compoundOnFailProcessor), relativeTimeProvider); + CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(false, singletonList(processorToFail), + singletonList(lastProcessor), relativeTimeProvider); + CompoundProcessor compoundProcessor = new CompoundProcessor(false, singletonList(processor), + singletonList(compoundOnFailProcessor), relativeTimeProvider); compoundProcessor.execute(ingestDocument, (result, e) -> {}); assertThat(processorToFail.getInvokedCounter(), equalTo(1)); @@ -203,8 +206,8 @@ public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exceptio CompoundProcessor failCompoundProcessor = new CompoundProcessor(relativeTimeProvider, firstProcessor); - CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), - Collections.singletonList(secondProcessor), relativeTimeProvider); + CompoundProcessor compoundProcessor = new CompoundProcessor(false, singletonList(failCompoundProcessor), + singletonList(secondProcessor), relativeTimeProvider); compoundProcessor.execute(ingestDocument, (result, e) -> {}); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); @@ -226,11 +229,11 @@ public void testCompoundProcessorExceptionFail() throws Exception { LongSupplier relativeTimeProvider = mock(LongSupplier.class); when(relativeTimeProvider.getAsLong()).thenReturn(0L); - CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor), - Collections.singletonList(failProcessor), relativeTimeProvider); + CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, singletonList(firstProcessor), + singletonList(failProcessor), relativeTimeProvider); - CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), - Collections.singletonList(secondProcessor), relativeTimeProvider); + CompoundProcessor compoundProcessor = new CompoundProcessor(false, singletonList(failCompoundProcessor), + singletonList(secondProcessor), relativeTimeProvider); compoundProcessor.execute(ingestDocument, (result, e) -> {}); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); @@ -252,11 +255,11 @@ public void testCompoundProcessorExceptionFailInOnFailure() throws Exception { LongSupplier relativeTimeProvider = mock(LongSupplier.class); when(relativeTimeProvider.getAsLong()).thenReturn(0L); - CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor), - Collections.singletonList(new CompoundProcessor(relativeTimeProvider, failProcessor))); + CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, singletonList(firstProcessor), + singletonList(new CompoundProcessor(relativeTimeProvider, failProcessor))); - CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), - Collections.singletonList(secondProcessor), relativeTimeProvider); + CompoundProcessor compoundProcessor = new CompoundProcessor(false, singletonList(failCompoundProcessor), + singletonList(secondProcessor), relativeTimeProvider); compoundProcessor.execute(ingestDocument, (result, e) -> {}); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); @@ -271,7 +274,7 @@ public void testBreakOnFailure() throws Exception { LongSupplier relativeTimeProvider = mock(LongSupplier.class); when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor pipeline = new CompoundProcessor(false, Arrays.asList(firstProcessor, secondProcessor), - Collections.singletonList(onFailureProcessor), relativeTimeProvider); + singletonList(onFailureProcessor), relativeTimeProvider); pipeline.execute(ingestDocument, (result, e) -> {}); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(0)); @@ -279,6 +282,82 @@ public void testBreakOnFailure() throws Exception { assertStats(pipeline, 1, 1, 0); } + public void testFailureProcessorIsInvokedOnFailure() { + TestProcessor onFailureProcessor = new TestProcessor(null, "on_failure", ingestDocument -> { + Map ingestMetadata = ingestDocument.getIngestMetadata(); + assertThat(ingestMetadata.entrySet(), hasSize(4)); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("failure!")); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test-processor")); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), nullValue()); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PIPELINE_FIELD), equalTo("2")); + }); + + Pipeline pipeline2 = new Pipeline("2", null, null, new CompoundProcessor(new TestProcessor(new RuntimeException("failure!")))); + Pipeline pipeline1 = new Pipeline("1", null, null, new CompoundProcessor(false, singletonList(new AbstractProcessor(null) { + @Override + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + ingestDocument.executePipeline(pipeline2, handler); + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new AssertionError(); + } + + @Override + public String getType() { + return "pipeline"; + } + }), singletonList(onFailureProcessor))); + + ingestDocument.executePipeline(pipeline1, (document, e) -> { + assertThat(document, notNullValue()); + assertThat(e, nullValue()); + }); + assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1)); + } + + public void testNewCompoundProcessorException() { + TestProcessor processor = new TestProcessor("my_tag", "my_type", new RuntimeException()); + IngestProcessorException ingestProcessorException1 = + CompoundProcessor.newCompoundProcessorException(new RuntimeException(), processor, ingestDocument); + assertThat(ingestProcessorException1.getHeader("processor_tag"), equalTo(singletonList("my_tag"))); + assertThat(ingestProcessorException1.getHeader("processor_type"), equalTo(singletonList("my_type"))); + assertThat(ingestProcessorException1.getHeader("pipeline_origin"), nullValue()); + + IngestProcessorException ingestProcessorException2 = + CompoundProcessor.newCompoundProcessorException(ingestProcessorException1, processor, ingestDocument); + assertThat(ingestProcessorException2, sameInstance(ingestProcessorException1)); + } + + public void testNewCompoundProcessorExceptionPipelineOrigin() { + Pipeline pipeline2 = new Pipeline("2", null, null, + new CompoundProcessor(new TestProcessor("my_tag", "my_type", new RuntimeException()))); + Pipeline pipeline1 = new Pipeline("1", null, null, new CompoundProcessor(new AbstractProcessor(null) { + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + ingestDocument.executePipeline(pipeline2, handler); + } + + @Override + public String getType() { + return "my_type2"; + } + })); + + Exception[] holder = new Exception[1]; + ingestDocument.executePipeline(pipeline1, (document, e) -> holder[0] = e); + IngestProcessorException ingestProcessorException = (IngestProcessorException) holder[0]; + assertThat(ingestProcessorException.getHeader("processor_tag"), equalTo(singletonList("my_tag"))); + assertThat(ingestProcessorException.getHeader("processor_type"), equalTo(singletonList("my_type"))); + assertThat(ingestProcessorException.getHeader("pipeline_origin"), equalTo(Arrays.asList("2", "1"))); + } + private void assertStats(CompoundProcessor compoundProcessor, long count, long failed, long time) { assertStats(0, compoundProcessor, 0L, count, failed, time); } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index 2e3a23cd3be7e..65327f26b9b89 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; @@ -39,12 +40,14 @@ import org.elasticsearch.client.Requests; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -68,7 +71,7 @@ protected Settings nodeSettings(int nodeOrdinal) { @Override protected Collection> nodePlugins() { - return Arrays.asList(IngestTestPlugin.class); + return Collections.singleton(ExtendedIngestTestPlugin.class); } public void testSimulate() throws Exception { @@ -293,4 +296,157 @@ public void testWithDedicatedMaster() throws Exception { assertFalse(item.isFailed()); assertEquals("auto-generated", item.getResponse().getId()); } + + public void testPipelineOriginHeader() throws Exception { + { + XContentBuilder source = jsonBuilder().startObject(); + { + source.startArray("processors"); + source.startObject(); + { + source.startObject("pipeline"); + source.field("name", "2"); + source.endObject(); + } + source.endObject(); + source.endArray(); + } + source.endObject(); + PutPipelineRequest putPipelineRequest = + new PutPipelineRequest("1", BytesReference.bytes(source), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + } + { + XContentBuilder source = jsonBuilder().startObject(); + { + source.startArray("processors"); + source.startObject(); + { + source.startObject("pipeline"); + source.field("name", "3"); + source.endObject(); + } + source.endObject(); + source.endArray(); + } + source.endObject(); + PutPipelineRequest putPipelineRequest = + new PutPipelineRequest("2", BytesReference.bytes(source), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + } + { + XContentBuilder source = jsonBuilder().startObject(); + { + source.startArray("processors"); + source.startObject(); + { + source.startObject("fail"); + source.endObject(); + } + source.endObject(); + source.endArray(); + } + source.endObject(); + PutPipelineRequest putPipelineRequest = + new PutPipelineRequest("3", BytesReference.bytes(source), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + } + + Exception e = expectThrows(Exception.class, () -> { + IndexRequest indexRequest = new IndexRequest("test"); + indexRequest.source("{}", XContentType.JSON); + indexRequest.setPipeline("1"); + client().index(indexRequest).get(); + }); + IngestProcessorException ingestException = (IngestProcessorException) ExceptionsHelper.unwrap(e, IngestProcessorException.class); + assertThat(ingestException.getHeader("processor_type"), equalTo(Collections.singletonList("fail"))); + assertThat(ingestException.getHeader("pipeline_origin"), equalTo(Arrays.asList("3", "2", "1"))); + } + + public void testPipelineProcessorOnFailure() throws Exception { + { + XContentBuilder source = jsonBuilder().startObject(); + { + source.startArray("processors"); + source.startObject(); + { + source.startObject("pipeline"); + source.field("name", "2"); + source.endObject(); + } + source.endObject(); + source.endArray(); + } + { + source.startArray("on_failure"); + source.startObject(); + { + source.startObject("onfailure_processor"); + source.endObject(); + } + source.endObject(); + source.endArray(); + } + source.endObject(); + PutPipelineRequest putPipelineRequest = + new PutPipelineRequest("1", BytesReference.bytes(source), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + } + { + XContentBuilder source = jsonBuilder().startObject(); + { + source.startArray("processors"); + source.startObject(); + { + source.startObject("pipeline"); + source.field("name", "3"); + source.endObject(); + } + source.endObject(); + source.endArray(); + } + source.endObject(); + PutPipelineRequest putPipelineRequest = + new PutPipelineRequest("2", BytesReference.bytes(source), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + } + { + XContentBuilder source = jsonBuilder().startObject(); + { + source.startArray("processors"); + source.startObject(); + { + source.startObject("fail"); + source.endObject(); + } + source.endObject(); + source.endArray(); + } + source.endObject(); + PutPipelineRequest putPipelineRequest = + new PutPipelineRequest("3", BytesReference.bytes(source), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + } + + client().prepareIndex("test", "_doc").setId("1").setSource("{}", XContentType.JSON).setPipeline("1").get(); + Map inserted = client().prepareGet("test", "_doc", "1") + .get().getSourceAsMap(); + assertThat(inserted.get("readme"), equalTo("pipeline with id [3] is a bad pipeline")); + } + + public static class ExtendedIngestTestPlugin extends IngestTestPlugin { + + @Override + public Map getProcessors(Processor.Parameters parameters) { + Map factories = new HashMap<>(super.getProcessors(parameters)); + factories.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService)); + factories.put("fail", (processorFactories, tag, config) -> new TestProcessor(tag, "fail", new RuntimeException())); + factories.put("onfailure_processor", (processorFactories, tag, config) -> new TestProcessor(tag, "fail", document -> { + String onFailurePipeline = document.getFieldValue("_ingest.on_failure_pipeline", String.class); + document.setFieldValue("readme", "pipeline with id [" + onFailurePipeline + "] is a bad pipeline"); + })); + return factories; + } + } + }