Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce on_failure_pipeline ingest metadata inside on_failure block #49076

Merged
merged 7 commits into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions docs/reference/ingest/ingest-node.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ The `if` condition can be more then a simple equality check.
The full power of the <<modules-scripting-painless, Painless Scripting Language>> is available and
running in the {painless}/painless-ingest-processor-context.html[ingest processor context].

IMPORTANT: The value of ctx is read-only in `if` conditions.
IMPORTANT: The value of ctx is read-only in `if` conditions.

A more complex `if` condition that drops the document (i.e. not index it)
unless it has a multi-valued tag field with at least one value that contains the characters
Expand Down Expand Up @@ -718,8 +718,9 @@ The `ignore_failure` can be set on any processor and defaults to `false`.

You may want to retrieve the actual error message that was thrown
by a failed processor. To do so you can access metadata fields called
`on_failure_message`, `on_failure_processor_type`, and `on_failure_processor_tag`. These fields are only accessible
from within the context of an `on_failure` block.
`on_failure_message`, `on_failure_processor_type`, `on_failure_processor_tag` and
`on_failure_pipeline` (in case an error occurred inside a pipeline processor).
These fields are only accessible from within the context of an `on_failure` block.

Here is an updated version of the example that you
saw earlier. But instead of setting the error message manually, the example leverages the `on_failure_message`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,4 @@ teardown:
pipeline: "outer"
body: {}
- match: { error.root_cause.0.type: "ingest_processor_exception" }
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: inner" }
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: outer" }
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean v
handler.accept(new SimulateDocumentVerboseResult(processorResultList), e);
});
} else {
pipeline.execute(ingestDocument, (result, e) -> {
ingestDocument.executePipeline(pipeline, (result, e) -> {
if (e == null) {
handler.accept(new SimulateDocumentBaseResult(result), null);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class CompoundProcessor implements Processor {
public static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message";
public static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type";
public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
public static final String ON_FAILURE_PIPELINE_FIELD = "on_failure_pipeline";

private final boolean ignoreFailure;
private final List<Processor> processors;
Expand Down Expand Up @@ -144,7 +145,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume
innerExecute(currentProcessor + 1, ingestDocument, handler);
} else {
IngestProcessorException compoundProcessorException =
newCompoundProcessorException(e, processor.getType(), processor.getTag());
newCompoundProcessorException(e, processor, ingestDocument);
if (onFailureProcessors.isEmpty()) {
handler.accept(null, compoundProcessorException);
} else {
Expand Down Expand Up @@ -177,7 +178,7 @@ void executeOnFailureAsync(int currentOnFailureProcessor, IngestDocument ingestD
onFailureProcessor.execute(ingestDocument, (result, e) -> {
if (e != null) {
removeFailureMetadata(ingestDocument);
handler.accept(null, newCompoundProcessorException(e, onFailureProcessor.getType(), onFailureProcessor.getTag()));
handler.accept(null, newCompoundProcessorException(e, onFailureProcessor, ingestDocument));
return;
}
if (result == null) {
Expand All @@ -192,34 +193,46 @@ void executeOnFailureAsync(int currentOnFailureProcessor, IngestDocument ingestD
private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) {
List<String> processorTypeHeader = cause.getHeader("processor_type");
List<String> processorTagHeader = cause.getHeader("processor_tag");
List<String> processorOriginHeader = cause.getHeader("pipeline_origin");
String failedProcessorType = (processorTypeHeader != null) ? processorTypeHeader.get(0) : null;
String failedProcessorTag = (processorTagHeader != null) ? processorTagHeader.get(0) : null;
String failedPipelineId = (processorOriginHeader != null) ? processorOriginHeader.get(0) : null;
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getRootCause().getMessage());
ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType);
ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag);
if (failedPipelineId != null) {
ingestMetadata.put(ON_FAILURE_PIPELINE_FIELD, failedPipelineId);
}
}

private void removeFailureMetadata(IngestDocument ingestDocument) {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD);
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD);
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);
ingestMetadata.remove(ON_FAILURE_PIPELINE_FIELD);
}

private IngestProcessorException newCompoundProcessorException(Exception e, String processorType, String processorTag) {
static IngestProcessorException newCompoundProcessorException(Exception e, Processor processor, IngestDocument document) {
if (e instanceof IngestProcessorException && ((IngestProcessorException) e).getHeader("processor_type") != null) {
return (IngestProcessorException) e;
}

IngestProcessorException exception = new IngestProcessorException(e);

String processorType = processor.getType();
if (processorType != null) {
exception.addHeader("processor_type", processorType);
}
String processorTag = processor.getTag();
if (processorTag != null) {
exception.addHeader("processor_tag", processorTag);
}
List<String> pipelineStack = document.getPipelineStack();
if (pipelineStack.size() > 1) {
exception.addHeader("pipeline_origin", pipelineStack);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the reason we are providing a stack here because we can't pin point which pipeline in the stack failed? I might be off here but would it make sense to pass the pipeline id to the handlers and report the exact pipeline that failed? https://github.com/elastic/elasticsearch/pull/49076/files#diff-197c33567939d6a2a4f637dda45b72dcR652

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is because there may be several levels of nested pipeline processors and then it is not clear if we just report the pipeline name that has failed. A pipeline may be used multiple times from the same initial pipeline. Having a stack here, really pin points to the pipeline and how it executed to that point.

}

return exception;
}
Expand Down
17 changes: 13 additions & 4 deletions server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import java.util.Date;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -60,7 +60,7 @@ public final class IngestDocument {
private final Map<String, Object> ingestMetadata;

// Contains all pipelines that have been executed for this document
private final Set<Pipeline> executedPipelines = Collections.newSetFromMap(new IdentityHashMap<>());
private final Set<String> executedPipelines = new LinkedHashSet<>();

public IngestDocument(String index, String id, String routing,
Long version, VersionType versionType, Map<String, Object> source) {
Expand Down Expand Up @@ -646,16 +646,25 @@ private static Object deepCopy(Object value) {
* @param handler handles the result or failure
*/
public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
if (executedPipelines.add(pipeline)) {
if (executedPipelines.add(pipeline.getId())) {
pipeline.execute(this, (result, e) -> {
executedPipelines.remove(pipeline);
executedPipelines.remove(pipeline.getId());
handler.accept(result, e);
});
} else {
handler.accept(null, new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId()));
}
}

/**
* @return a pipeline stack; all pipelines that are in execution by this document in reverse order
*/
List<String> getPipelineStack() {
List<String> pipelineStack = new ArrayList<>(executedPipelines);
Collections.reverse(pipelineStack);
return pipelineStack;
}

@Override
public boolean equals(Object obj) {
if (obj == this) { return true; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ private void innerExecute(int slot, IndexRequest indexRequest, Pipeline pipeline
VersionType versionType = indexRequest.versionType();
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
IngestDocument ingestDocument = new IngestDocument(index, id, routing, version, versionType, sourceAsMap);
pipeline.execute(ingestDocument, (result, e) -> {
ingestDocument.executePipeline(pipeline, (result, e) -> {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
totalMetrics.postIngest(ingestTimeInMillis);
if (e != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongSupplier;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -279,6 +282,82 @@ public void testBreakOnFailure() throws Exception {
assertStats(pipeline, 1, 1, 0);
}

public void testFailureProcessorIsInvokedOnFailure() {
TestProcessor onFailureProcessor = new TestProcessor(null, "on_failure", ingestDocument -> {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(4));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("failure!"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test-processor"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), nullValue());
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PIPELINE_FIELD), equalTo("2"));
});

Pipeline pipeline2 = new Pipeline("2", null, null, new CompoundProcessor(new TestProcessor(new RuntimeException("failure!"))));
Pipeline pipeline1 = new Pipeline("1", null, null, new CompoundProcessor(false, List.of(new AbstractProcessor(null) {
@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
ingestDocument.executePipeline(pipeline2, handler);
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new AssertionError();
}

@Override
public String getType() {
return "pipeline";
}
}), List.of(onFailureProcessor)));

ingestDocument.executePipeline(pipeline1, (document, e) -> {
assertThat(document, notNullValue());
assertThat(e, nullValue());
});
assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1));
}

public void testNewCompoundProcessorException() {
TestProcessor processor = new TestProcessor("my_tag", "my_type", new RuntimeException());
IngestProcessorException ingestProcessorException1 =
CompoundProcessor.newCompoundProcessorException(new RuntimeException(), processor, ingestDocument);
assertThat(ingestProcessorException1.getHeader("processor_tag"), equalTo(List.of("my_tag")));
assertThat(ingestProcessorException1.getHeader("processor_type"), equalTo(List.of("my_type")));
assertThat(ingestProcessorException1.getHeader("pipeline_origin"), nullValue());

IngestProcessorException ingestProcessorException2 =
CompoundProcessor.newCompoundProcessorException(ingestProcessorException1, processor, ingestDocument);
assertThat(ingestProcessorException2, sameInstance(ingestProcessorException1));
}

public void testNewCompoundProcessorExceptionPipelineOrigin() {
Pipeline pipeline2 = new Pipeline("2", null, null,
new CompoundProcessor(new TestProcessor("my_tag", "my_type", new RuntimeException())));
Pipeline pipeline1 = new Pipeline("1", null, null, new CompoundProcessor(new AbstractProcessor(null) {
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException();
}

@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
ingestDocument.executePipeline(pipeline2, handler);
}

@Override
public String getType() {
return "my_type2";
}
}));

Exception[] holder = new Exception[1];
ingestDocument.executePipeline(pipeline1, (document, e) -> holder[0] = e);
IngestProcessorException ingestProcessorException = (IngestProcessorException) holder[0];
assertThat(ingestProcessorException.getHeader("processor_tag"), equalTo(List.of("my_tag")));
assertThat(ingestProcessorException.getHeader("processor_type"), equalTo(List.of("my_type")));
assertThat(ingestProcessorException.getHeader("pipeline_origin"), equalTo(List.of("2", "1")));
}

private void assertStats(CompoundProcessor compoundProcessor, long count, long failed, long time) {
assertStats(0, compoundProcessor, 0L, count, failed, time);
}
Expand Down
Loading