Skip to content

Commit

Permalink
ingest: fix on_failure with Drop processor
Browse files Browse the repository at this point in the history
This commit allows a document to be dropped when a Drop processor
is used in the on_failure fork of the processor chain.

Fixes elastic#36151
  • Loading branch information
jakelandis committed Dec 16, 2018
1 parent 97107e9 commit c1d1601
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,44 @@ teardown:
type: test
id: 2
- match: { _source.foo: "blub" }

---
"Test Drop Processor On Failure":
- do:
ingest.put_pipeline:
id: "my_pipeline_with_failure"
body: >
{
"description" : "pipeline with on failure drop",
"processors": [
{
"fail": {
"message": "failed",
"on_failure": [
{
"drop": {}
}
]
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
type: test
id: 3
pipeline: "my_pipeline_with_failure"
body: {
foo: "bar"
}

- do:
catch: missing
get:
index: test
type: test
id: 3
- match: { found: false }
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
if (onFailureProcessors.isEmpty()) {
throw compoundProcessorException;
} else {
executeOnFailure(ingestDocument, compoundProcessorException);
if (executeOnFailure(ingestDocument, compoundProcessorException) == false) {
return null;
}
break;
}
} finally {
Expand All @@ -145,20 +147,25 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
return ingestDocument;
}


void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception {
/**
* @return true if execution should continue, false if document is dropped.
*/
boolean executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception {
try {
putFailureMetadata(ingestDocument, exception);
for (Processor processor : onFailureProcessors) {
try {
processor.execute(ingestDocument);
if (processor.execute(ingestDocument) == null) {
return false;
}
} catch (Exception e) {
throw newCompoundProcessorException(e, processor.getType(), processor.getTag());
}
}
} finally {
removeFailureMetadata(ingestDocument);
}
return true;
}

private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,35 @@ public void testSingleProcessorWithOnFailureProcessor() throws Exception {
assertThat(processor2.getInvokedCounter(), equalTo(1));
}

public void testSingleProcessorWithOnFailureDropProcessor() throws Exception {
TestProcessor processor1 = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
Processor processor2 = new Processor() {
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
//Simulates the drop processor
return null;
}

@Override
public String getType() {
return "drop";
}

@Override
public String getTag() {
return null;
}
};

LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1),
Collections.singletonList(processor2), relativeTimeProvider);
assertNull(compoundProcessor.execute(ingestDocument));
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 0);
}

public void testSingleProcessorWithNestedFailures() throws Exception {
TestProcessor processor = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processorToFail = new TestProcessor("id2", "second", ingestDocument -> {
Expand Down

0 comments on commit c1d1601

Please sign in to comment.