Skip to content

Commit

Permalink
Introduce on_failure_pipeline ingest metadata inside on_failure block (
Browse files Browse the repository at this point in the history
…elastic#49076)

In case an exception occurs inside a pipeline processor,
the pipeline stack is kept around as header in the exception.
Then in the on_failure processor the id of the pipeline the
exception occurred is made accessible via the `on_failure_pipeline`
ingest metadata.

Closes elastic#44920
  • Loading branch information
martijnvg committed Nov 26, 2019
1 parent 2243743 commit 6bbeaa5
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 15 deletions.
7 changes: 4 additions & 3 deletions docs/reference/ingest/ingest-node.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ The `if` condition can be more then a simple equality check.
The full power of the <<modules-scripting-painless, Painless Scripting Language>> is available and
running in the {painless}/painless-ingest-processor-context.html[ingest processor context].

IMPORTANT: The value of ctx is read-only in `if` conditions.
IMPORTANT: The value of ctx is read-only in `if` conditions.

A more complex `if` condition that drops the document (i.e. not index it)
unless it has a multi-valued tag field with at least one value that contains the characters
Expand Down Expand Up @@ -722,8 +722,9 @@ The `ignore_failure` can be set on any processor and defaults to `false`.

You may want to retrieve the actual error message that was thrown
by a failed processor. To do so you can access metadata fields called
`on_failure_message`, `on_failure_processor_type`, and `on_failure_processor_tag`. These fields are only accessible
from within the context of an `on_failure` block.
`on_failure_message`, `on_failure_processor_type`, `on_failure_processor_tag` and
`on_failure_pipeline` (in case an error occurred inside a pipeline processor).
These fields are only accessible from within the context of an `on_failure` block.

Here is an updated version of the example that you
saw earlier. But instead of setting the error message manually, the example leverages the `on_failure_message`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,4 @@ teardown:
pipeline: "outer"
body: {}
- match: { error.root_cause.0.type: "ingest_processor_exception" }
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: inner" }
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: outer" }
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean v
handler.accept(new SimulateDocumentVerboseResult(processorResultList), e);
});
} else {
pipeline.execute(ingestDocument, (result, e) -> {
ingestDocument.executePipeline(pipeline, (result, e) -> {
if (e == null) {
handler.accept(new SimulateDocumentBaseResult(result), null);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class CompoundProcessor implements Processor {
public static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message";
public static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type";
public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
public static final String ON_FAILURE_PIPELINE_FIELD = "on_failure_pipeline";

private final boolean ignoreFailure;
private final List<Processor> processors;
Expand Down Expand Up @@ -144,7 +145,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume
innerExecute(currentProcessor + 1, ingestDocument, handler);
} else {
IngestProcessorException compoundProcessorException =
newCompoundProcessorException(e, processor.getType(), processor.getTag());
newCompoundProcessorException(e, processor, ingestDocument);
if (onFailureProcessors.isEmpty()) {
handler.accept(null, compoundProcessorException);
} else {
Expand Down Expand Up @@ -177,7 +178,7 @@ void executeOnFailureAsync(int currentOnFailureProcessor, IngestDocument ingestD
onFailureProcessor.execute(ingestDocument, (result, e) -> {
if (e != null) {
removeFailureMetadata(ingestDocument);
handler.accept(null, newCompoundProcessorException(e, onFailureProcessor.getType(), onFailureProcessor.getTag()));
handler.accept(null, newCompoundProcessorException(e, onFailureProcessor, ingestDocument));
return;
}
if (result == null) {
Expand All @@ -192,34 +193,46 @@ void executeOnFailureAsync(int currentOnFailureProcessor, IngestDocument ingestD
private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) {
List<String> processorTypeHeader = cause.getHeader("processor_type");
List<String> processorTagHeader = cause.getHeader("processor_tag");
List<String> processorOriginHeader = cause.getHeader("pipeline_origin");
String failedProcessorType = (processorTypeHeader != null) ? processorTypeHeader.get(0) : null;
String failedProcessorTag = (processorTagHeader != null) ? processorTagHeader.get(0) : null;
String failedPipelineId = (processorOriginHeader != null) ? processorOriginHeader.get(0) : null;
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getRootCause().getMessage());
ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType);
ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag);
if (failedPipelineId != null) {
ingestMetadata.put(ON_FAILURE_PIPELINE_FIELD, failedPipelineId);
}
}

private void removeFailureMetadata(IngestDocument ingestDocument) {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD);
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD);
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);
ingestMetadata.remove(ON_FAILURE_PIPELINE_FIELD);
}

private IngestProcessorException newCompoundProcessorException(Exception e, String processorType, String processorTag) {
static IngestProcessorException newCompoundProcessorException(Exception e, Processor processor, IngestDocument document) {
if (e instanceof IngestProcessorException && ((IngestProcessorException) e).getHeader("processor_type") != null) {
return (IngestProcessorException) e;
}

IngestProcessorException exception = new IngestProcessorException(e);

String processorType = processor.getType();
if (processorType != null) {
exception.addHeader("processor_type", processorType);
}
String processorTag = processor.getTag();
if (processorTag != null) {
exception.addHeader("processor_tag", processorTag);
}
List<String> pipelineStack = document.getPipelineStack();
if (pipelineStack.size() > 1) {
exception.addHeader("pipeline_origin", pipelineStack);
}

return exception;
}
Expand Down
17 changes: 13 additions & 4 deletions server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import java.util.Date;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -60,7 +60,7 @@ public final class IngestDocument {
private final Map<String, Object> ingestMetadata;

// Contains all pipelines that have been executed for this document
private final Set<Pipeline> executedPipelines = Collections.newSetFromMap(new IdentityHashMap<>());
private final Set<String> executedPipelines = new LinkedHashSet<>();

public IngestDocument(String index, String type, String id, String routing,
Long version, VersionType versionType, Map<String, Object> source) {
Expand Down Expand Up @@ -647,16 +647,25 @@ private static Object deepCopy(Object value) {
* @param handler handles the result or failure
*/
public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
if (executedPipelines.add(pipeline)) {
if (executedPipelines.add(pipeline.getId())) {
pipeline.execute(this, (result, e) -> {
executedPipelines.remove(pipeline);
executedPipelines.remove(pipeline.getId());
handler.accept(result, e);
});
} else {
handler.accept(null, new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId()));
}
}

/**
* @return a pipeline stack; all pipelines that are in execution by this document in reverse order
*/
List<String> getPipelineStack() {
List<String> pipelineStack = new ArrayList<>(executedPipelines);
Collections.reverse(pipelineStack);
return pipelineStack;
}

@Override
public boolean equals(Object obj) {
if (obj == this) { return true; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ private void innerExecute(int slot, IndexRequest indexRequest, Pipeline pipeline
VersionType versionType = indexRequest.versionType();
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap);
pipeline.execute(ingestDocument, (result, e) -> {
ingestDocument.executePipeline(pipeline, (result, e) -> {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
totalMetrics.postIngest(ingestTimeInMillis);
if (e != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongSupplier;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -279,6 +282,82 @@ public void testBreakOnFailure() throws Exception {
assertStats(pipeline, 1, 1, 0);
}

public void testFailureProcessorIsInvokedOnFailure() {
TestProcessor onFailureProcessor = new TestProcessor(null, "on_failure", ingestDocument -> {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(4));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("failure!"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test-processor"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), nullValue());
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PIPELINE_FIELD), equalTo("2"));
});

Pipeline pipeline2 = new Pipeline("2", null, null, new CompoundProcessor(new TestProcessor(new RuntimeException("failure!"))));
Pipeline pipeline1 = new Pipeline("1", null, null, new CompoundProcessor(false, List.of(new AbstractProcessor(null) {
@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
ingestDocument.executePipeline(pipeline2, handler);
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new AssertionError();
}

@Override
public String getType() {
return "pipeline";
}
}), List.of(onFailureProcessor)));

ingestDocument.executePipeline(pipeline1, (document, e) -> {
assertThat(document, notNullValue());
assertThat(e, nullValue());
});
assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1));
}

public void testNewCompoundProcessorException() {
TestProcessor processor = new TestProcessor("my_tag", "my_type", new RuntimeException());
IngestProcessorException ingestProcessorException1 =
CompoundProcessor.newCompoundProcessorException(new RuntimeException(), processor, ingestDocument);
assertThat(ingestProcessorException1.getHeader("processor_tag"), equalTo(List.of("my_tag")));
assertThat(ingestProcessorException1.getHeader("processor_type"), equalTo(List.of("my_type")));
assertThat(ingestProcessorException1.getHeader("pipeline_origin"), nullValue());

IngestProcessorException ingestProcessorException2 =
CompoundProcessor.newCompoundProcessorException(ingestProcessorException1, processor, ingestDocument);
assertThat(ingestProcessorException2, sameInstance(ingestProcessorException1));
}

public void testNewCompoundProcessorExceptionPipelineOrigin() {
Pipeline pipeline2 = new Pipeline("2", null, null,
new CompoundProcessor(new TestProcessor("my_tag", "my_type", new RuntimeException())));
Pipeline pipeline1 = new Pipeline("1", null, null, new CompoundProcessor(new AbstractProcessor(null) {
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException();
}

@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
ingestDocument.executePipeline(pipeline2, handler);
}

@Override
public String getType() {
return "my_type2";
}
}));

Exception[] holder = new Exception[1];
ingestDocument.executePipeline(pipeline1, (document, e) -> holder[0] = e);
IngestProcessorException ingestProcessorException = (IngestProcessorException) holder[0];
assertThat(ingestProcessorException.getHeader("processor_tag"), equalTo(List.of("my_tag")));
assertThat(ingestProcessorException.getHeader("processor_type"), equalTo(List.of("my_type")));
assertThat(ingestProcessorException.getHeader("pipeline_origin"), equalTo(List.of("2", "1")));
}

private void assertStats(CompoundProcessor compoundProcessor, long count, long failed, long time) {
assertStats(0, compoundProcessor, 0L, count, failed, time);
}
Expand Down
Loading

0 comments on commit 6bbeaa5

Please sign in to comment.