From 7929ec14de92a1b641f6412890a8354ef4e6326c Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 14 Nov 2019 11:28:16 +0100 Subject: [PATCH] Introduce on_failure_pipeline ingest metadata In case an exception occurs inside a pipeline processor, the pipeline stack is kept around as header in the exception. Then in the on_failure processor the id of the pipeline the exception occurred is made accessible via the `on_failure_pipeline` ingest metadata. Closes #44920 --- docs/reference/ingest/ingest-node.asciidoc | 7 +- .../test/ingest/210_pipeline_processor.yml | 2 +- .../ingest/SimulateExecutionService.java | 2 +- .../ingest/CompoundProcessor.java | 19 ++- .../elasticsearch/ingest/IngestDocument.java | 17 +- .../elasticsearch/ingest/IngestService.java | 2 +- .../ingest/CompoundProcessorTests.java | 38 +++++ .../elasticsearch/ingest/IngestClientIT.java | 154 +++++++++++++++++- 8 files changed, 226 insertions(+), 15 deletions(-) diff --git a/docs/reference/ingest/ingest-node.asciidoc b/docs/reference/ingest/ingest-node.asciidoc index 4b0016d39a837..0da0fd19e16ef 100644 --- a/docs/reference/ingest/ingest-node.asciidoc +++ b/docs/reference/ingest/ingest-node.asciidoc @@ -376,7 +376,7 @@ The `if` condition can be more then a simple equality check. The full power of the <> is available and running in the {painless}/painless-ingest-processor-context.html[ingest processor context]. -IMPORTANT: The value of ctx is read-only in `if` conditions. +IMPORTANT: The value of ctx is read-only in `if` conditions. A more complex `if` condition that drops the document (i.e. not index it) unless it has a multi-valued tag field with at least one value that contains the characters @@ -718,8 +718,9 @@ The `ignore_failure` can be set on any processor and defaults to `false`. You may want to retrieve the actual error message that was thrown by a failed processor. To do so you can access metadata fields called -`on_failure_message`, `on_failure_processor_type`, and `on_failure_processor_tag`. These fields are only accessible -from within the context of an `on_failure` block. +`on_failure_message`, `on_failure_processor_type`, `on_failure_processor_tag` and +`on_failure_pipeline` (in case an error occurred inside a pipeline processor). +These fields are only accessible from within the context of an `on_failure` block. Here is an updated version of the example that you saw earlier. But instead of setting the error message manually, the example leverages the `on_failure_message` diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml index e375d195bfbc9..2b79890fa6daf 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml @@ -107,4 +107,4 @@ teardown: pipeline: "outer" body: {} - match: { error.root_cause.0.type: "exception" } -- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: inner" } +- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: outer" } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index 070e99cc5c775..79de0d0c2a7fd 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -54,7 +54,7 @@ void executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean v handler.accept(new SimulateDocumentVerboseResult(processorResultList), e); }); } else { - pipeline.execute(ingestDocument, (result, e) -> { + ingestDocument.executePipeline(pipeline, (result, e) -> { if (e == null) { handler.accept(new SimulateDocumentBaseResult(result), null); } else { diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index b8c9eb07f4b62..7633763b01bab 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -40,6 +40,7 @@ public class CompoundProcessor implements Processor { public static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message"; public static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type"; public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag"; + public static final String ON_FAILURE_PIPELINE_FIELD = "on_failure_pipeline"; private final boolean ignoreFailure; private final List processors; @@ -144,7 +145,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume innerExecute(currentProcessor + 1, ingestDocument, handler); } else { ElasticsearchException compoundProcessorException = - newCompoundProcessorException(e, processor.getType(), processor.getTag()); + newCompoundProcessorException(e, processor, ingestDocument); if (onFailureProcessors.isEmpty()) { handler.accept(null, compoundProcessorException); } else { @@ -177,7 +178,7 @@ void executeOnFailureAsync(int currentOnFailureProcessor, IngestDocument ingestD onFailureProcessor.execute(ingestDocument, (result, e) -> { if (e != null) { removeFailureMetadata(ingestDocument); - handler.accept(null, newCompoundProcessorException(e, onFailureProcessor.getType(), onFailureProcessor.getTag())); + handler.accept(null, newCompoundProcessorException(e, onFailureProcessor, ingestDocument)); return; } if (result == null) { @@ -192,12 +193,17 @@ void executeOnFailureAsync(int currentOnFailureProcessor, IngestDocument ingestD private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) { List processorTypeHeader = cause.getHeader("processor_type"); List processorTagHeader = cause.getHeader("processor_tag"); + List processorOriginHeader = cause.getHeader("pipeline_origin"); String failedProcessorType = (processorTypeHeader != null) ? processorTypeHeader.get(0) : null; String failedProcessorTag = (processorTagHeader != null) ? processorTagHeader.get(0) : null; + String failedPipelineId = (processorOriginHeader != null) ? processorOriginHeader.get(0) : null; Map ingestMetadata = ingestDocument.getIngestMetadata(); ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getRootCause().getMessage()); ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType); ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag); + if (failedPipelineId != null) { + ingestMetadata.put(ON_FAILURE_PIPELINE_FIELD, failedPipelineId); + } } private void removeFailureMetadata(IngestDocument ingestDocument) { @@ -205,21 +211,28 @@ private void removeFailureMetadata(IngestDocument ingestDocument) { ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD); ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD); ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD); + ingestMetadata.remove(ON_FAILURE_PIPELINE_FIELD); } - private ElasticsearchException newCompoundProcessorException(Exception e, String processorType, String processorTag) { + private ElasticsearchException newCompoundProcessorException(Exception e, Processor processor, IngestDocument document) { if (e instanceof ElasticsearchException && ((ElasticsearchException) e).getHeader("processor_type") != null) { return (ElasticsearchException) e; } ElasticsearchException exception = new ElasticsearchException(e); + String processorType = processor.getType(); if (processorType != null) { exception.addHeader("processor_type", processorType); } + String processorTag = processor.getTag(); if (processorTag != null) { exception.addHeader("processor_tag", processorTag); } + List pipelineStack = document.getPipelineStack(); + if (pipelineStack.size() > 1) { + exception.addHeader("pipeline_origin", pipelineStack); + } return exception; } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 6c8cacf14cdf5..aabb6890a7b29 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -38,7 +38,7 @@ import java.util.Date; import java.util.EnumMap; import java.util.HashMap; -import java.util.IdentityHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -60,7 +60,7 @@ public final class IngestDocument { private final Map ingestMetadata; // Contains all pipelines that have been executed for this document - private final Set executedPipelines = Collections.newSetFromMap(new IdentityHashMap<>()); + private final Set executedPipelines = new LinkedHashSet<>(); public IngestDocument(String index, String id, String routing, Long version, VersionType versionType, Map source) { @@ -646,9 +646,9 @@ private static Object deepCopy(Object value) { * @param handler handles the result or failure */ public void executePipeline(Pipeline pipeline, BiConsumer handler) { - if (executedPipelines.add(pipeline)) { + if (executedPipelines.add(pipeline.getId())) { pipeline.execute(this, (result, e) -> { - executedPipelines.remove(pipeline); + executedPipelines.remove(pipeline.getId()); handler.accept(result, e); }); } else { @@ -656,6 +656,15 @@ public void executePipeline(Pipeline pipeline, BiConsumer getPipelineStack() { + List pipelineStack = new ArrayList<>(executedPipelines); + Collections.reverse(pipelineStack); + return pipelineStack; + } + @Override public boolean equals(Object obj) { if (obj == this) { return true; } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index b17b530aca9f0..69e1cd576de46 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -470,7 +470,7 @@ private void innerExecute(int slot, IndexRequest indexRequest, Pipeline pipeline VersionType versionType = indexRequest.versionType(); Map sourceAsMap = indexRequest.sourceAsMap(); IngestDocument ingestDocument = new IngestDocument(index, id, routing, version, versionType, sourceAsMap); - pipeline.execute(ingestDocument, (result, e) -> { + ingestDocument.executePipeline(pipeline, (result, e) -> { long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos); totalMetrics.postIngest(ingestTimeInMillis); if (e != null) { diff --git a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java index b3b8ee9762dc1..3c11e4837d98e 100644 --- a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java @@ -26,14 +26,17 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongSupplier; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; @@ -279,6 +282,41 @@ public void testBreakOnFailure() throws Exception { assertStats(pipeline, 1, 1, 0); } + public void testFailurePipelineField() { + TestProcessor onFailureProcessor = new TestProcessor(null, "on_failure", ingestDocument -> { + Map ingestMetadata = ingestDocument.getIngestMetadata(); + assertThat(ingestMetadata.entrySet(), hasSize(4)); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("failure!")); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test-processor")); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), nullValue()); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PIPELINE_FIELD), equalTo("2")); + }); + + Pipeline pipeline2 = new Pipeline("2", null, null, new CompoundProcessor(new TestProcessor(new RuntimeException("failure!")))); + Pipeline pipeline1 = new Pipeline("1", null, null, new CompoundProcessor(false, List.of(new AbstractProcessor(null) { + @Override + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + ingestDocument.executePipeline(pipeline2, handler); + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new AssertionError(); + } + + @Override + public String getType() { + return "pipeline"; + } + }), List.of(onFailureProcessor))); + + ingestDocument.executePipeline(pipeline1, (document, e) -> { + assertThat(document, notNullValue()); + assertThat(e, nullValue()); + }); + assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1)); + } + private void assertStats(CompoundProcessor compoundProcessor, long count, long failed, long time) { assertStats(0, compoundProcessor, 0L, count, failed, time); } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index 762f2ba5eb937..f8bed8d9815a7 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -39,13 +39,14 @@ import org.elasticsearch.client.Requests; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -68,7 +69,7 @@ protected Settings nodeSettings(int nodeOrdinal) { @Override protected Collection> nodePlugins() { - return Arrays.asList(IngestTestPlugin.class); + return List.of(ExtendedIngestTestPlugin.class); } public void testSimulate() throws Exception { @@ -293,4 +294,153 @@ public void testWithDedicatedMaster() throws Exception { assertFalse(item.isFailed()); assertEquals("auto-generated", item.getResponse().getId()); } + + public void testPipelineOriginHeader() throws Exception { + { + XContentBuilder source = jsonBuilder().startObject(); + { + source.startArray("processors"); + source.startObject(); + { + source.startObject("pipeline"); + source.field("name", "2"); + source.endObject(); + } + source.endObject(); + source.endArray(); + } + source.endObject(); + PutPipelineRequest putPipelineRequest = + new PutPipelineRequest("1", BytesReference.bytes(source), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + } + { + XContentBuilder source = jsonBuilder().startObject(); + { + source.startArray("processors"); + source.startObject(); + { + source.startObject("pipeline"); + source.field("name", "3"); + source.endObject(); + } + source.endObject(); + source.endArray(); + } + source.endObject(); + PutPipelineRequest putPipelineRequest = + new PutPipelineRequest("2", BytesReference.bytes(source), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + } + { + XContentBuilder source = jsonBuilder().startObject(); + { + source.startArray("processors"); + source.startObject(); + { + source.startObject("fail"); + source.endObject(); + } + source.endObject(); + source.endArray(); + } + source.endObject(); + PutPipelineRequest putPipelineRequest = + new PutPipelineRequest("3", BytesReference.bytes(source), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + } + + ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> { + client().prepareIndex("test").setSource("{}", XContentType.JSON).setPipeline("1").get(); + }); + assertThat(e.getHeader("processor_type"), equalTo(List.of("fail"))); + assertThat(e.getHeader("pipeline_origin"), equalTo(List.of("3", "2", "1"))); + } + + public void testPipelineProcessorOnFailure() throws Exception { + { + XContentBuilder source = jsonBuilder().startObject(); + { + source.startArray("processors"); + source.startObject(); + { + source.startObject("pipeline"); + source.field("name", "2"); + source.endObject(); + } + source.endObject(); + source.endArray(); + } + { + source.startArray("on_failure"); + source.startObject(); + { + source.startObject("onfailure_processor"); + source.endObject(); + } + source.endObject(); + source.endArray(); + } + source.endObject(); + PutPipelineRequest putPipelineRequest = + new PutPipelineRequest("1", BytesReference.bytes(source), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + } + { + XContentBuilder source = jsonBuilder().startObject(); + { + source.startArray("processors"); + source.startObject(); + { + source.startObject("pipeline"); + source.field("name", "3"); + source.endObject(); + } + source.endObject(); + source.endArray(); + } + source.endObject(); + PutPipelineRequest putPipelineRequest = + new PutPipelineRequest("2", BytesReference.bytes(source), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + } + { + XContentBuilder source = jsonBuilder().startObject(); + { + source.startArray("processors"); + source.startObject(); + { + source.startObject("fail"); + source.endObject(); + } + source.endObject(); + source.endArray(); + } + source.endObject(); + PutPipelineRequest putPipelineRequest = + new PutPipelineRequest("3", BytesReference.bytes(source), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + } + + client().prepareIndex("test").setId("1").setSource("{}", XContentType.JSON).setPipeline("1").get(); + Map inserted = client().prepareGet("test", "1") + .get().getSourceAsMap(); + assertThat(inserted.get("readme"), equalTo("pipeline with id [3] is a bad pipeline")); + } + + public static class ExtendedIngestTestPlugin extends IngestTestPlugin { + + @Override + public Map getProcessors(Processor.Parameters parameters) { + Map factories = new HashMap<>(super.getProcessors(parameters)); + factories.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService)); + factories.put("fail", (processorFactories, tag, config) -> new TestProcessor(tag, "fail", new RuntimeException())); + factories.put("onfailure_processor", (processorFactories, tag, config) -> new TestProcessor(tag, "fail", document -> { + String onFailurePipeline = document.getFieldValue("_ingest.on_failure_pipeline", String.class); + document.setFieldValue("readme", "pipeline with id [" + onFailurePipeline + "] is a bad pipeline"); + })); + return factories; + } + } + }