From 358fd4eec96e4febbd32d2652fa601f0f176501a Mon Sep 17 00:00:00 2001
From: Jake Landis <jake.landis@elastic.co>
Date: Mon, 17 Dec 2018 14:10:13 -0600
Subject: [PATCH] ingest: fix on_failure with Drop processor (#36686)

This commit allows a document to be dropped when a Drop processor
is used in the on_failure fork of the processor chain.

Fixes #36151
---
 .../test/ingest/220_drop_processor.yml        | 41 +++++++++++++++++++
 .../ingest/CompoundProcessor.java             | 15 +++++--
 .../ingest/CompoundProcessorTests.java        | 29 +++++++++++++
 3 files changed, 81 insertions(+), 4 deletions(-)

diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/220_drop_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/220_drop_processor.yml
index 3be038aca2499..accc30faa21e7 100644
--- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/220_drop_processor.yml
+++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/220_drop_processor.yml
@@ -57,3 +57,44 @@ teardown:
       type: test
       id: 2
 - match: { _source.foo: "blub" }
+
+---
+"Test Drop Processor On Failure":
+- do:
+    ingest.put_pipeline:
+      id: "my_pipeline_with_failure"
+      body:  >
+        {
+          "description" : "pipeline with on failure drop",
+          "processors": [
+              {
+                "fail": {
+                  "message": "failed",
+                  "on_failure": [
+                    {
+                      "drop": {}
+                    }
+                  ]
+                }
+              }
+            ]
+        }
+- match: { acknowledged: true }
+
+- do:
+    index:
+      index: test
+      type: test
+      id: 3
+      pipeline: "my_pipeline_with_failure"
+      body: {
+        foo: "bar"
+      }
+
+- do:
+    catch: missing
+    get:
+      index: test
+      type: test
+      id: 3
+- match: { found: false }
diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java
index 364e19d1f2e8e..5286c6ea7880a 100644
--- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java
+++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java
@@ -135,7 +135,9 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
                 if (onFailureProcessors.isEmpty()) {
                     throw compoundProcessorException;
                 } else {
-                    executeOnFailure(ingestDocument, compoundProcessorException);
+                    if (executeOnFailure(ingestDocument, compoundProcessorException) == false) {
+                        return null;
+                    }
                     break;
                 }
             } finally {
@@ -146,13 +148,17 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
         return ingestDocument;
     }
 
-
-    void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception {
+    /**
+     * @return true if execution should continue, false if document is dropped.
+     */
+    boolean executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception {
         try {
             putFailureMetadata(ingestDocument, exception);
             for (Processor processor : onFailureProcessors) {
                 try {
-                    processor.execute(ingestDocument);
+                    if (processor.execute(ingestDocument) == null) {
+                        return false;
+                    }
                 } catch (Exception e) {
                     throw newCompoundProcessorException(e, processor.getType(), processor.getTag());
                 }
@@ -160,6 +166,7 @@ void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exce
         } finally {
             removeFailureMetadata(ingestDocument);
         }
+        return true;
     }
 
     private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) {
diff --git a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java
index dabcae533a0bf..24e3dcd76774b 100644
--- a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java
+++ b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java
@@ -129,6 +129,35 @@ public void testSingleProcessorWithOnFailureProcessor() throws Exception {
         assertThat(processor2.getInvokedCounter(), equalTo(1));
     }
 
+    public void testSingleProcessorWithOnFailureDropProcessor() throws Exception {
+        TestProcessor processor1 = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
+        Processor processor2 = new Processor() {
+            @Override
+            public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
+                //Simulates the drop processor
+                return null;
+            }
+
+            @Override
+            public String getType() {
+                return "drop";
+            }
+
+            @Override
+            public String getTag() {
+                return null;
+            }
+        };
+
+        LongSupplier relativeTimeProvider = mock(LongSupplier.class);
+        when(relativeTimeProvider.getAsLong()).thenReturn(0L);
+        CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1),
+            Collections.singletonList(processor2), relativeTimeProvider);
+        assertNull(compoundProcessor.execute(ingestDocument));
+        assertThat(processor1.getInvokedCounter(), equalTo(1));
+        assertStats(compoundProcessor, 1, 1, 0);
+    }
+
     public void testSingleProcessorWithNestedFailures() throws Exception {
         TestProcessor processor = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
         TestProcessor processorToFail = new TestProcessor("id2", "second", ingestDocument -> {