Skip to content

Commit

Permalink
INGEST: Tests for Drop Processor (#33430)
Browse files Browse the repository at this point in the history
* INGEST: Tests for Drop Processor

* UT for behavior of dropped callback
and drop processor
   * Moved drop processor to `server`
project to enable this test
* Simple IT
* Relates #32278
  • Loading branch information
original-brownbear authored and kcm committed Oct 30, 2018
1 parent 0a7f603 commit d634a24
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.grok.Grok;
import org.elasticsearch.grok.ThreadWatchdog;
import org.elasticsearch.ingest.DropProcessor;
import org.elasticsearch.ingest.PipelineProcessor;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.ActionPlugin;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
---
teardown:
- do:
ingest.delete_pipeline:
id: "my_pipeline"
ignore: 404

---
"Test Drop Processor":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description" : "pipeline with drop",
"processors" : [
{
"drop" : {
"if": "ctx.foo == 'bar'"
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
type: test
id: 1
pipeline: "my_pipeline"
body: {
foo: "bar"
}

- do:
index:
index: test
type: test
id: 2
pipeline: "my_pipeline"
body: {
foo: "blub"
}

- do:
catch: missing
get:
index: test
type: test
id: 1
- match: { found: false }

- do:
get:
index: test
type: test
id: 2
- match: { _source.foo: "blub" }
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@
* under the License.
*/

package org.elasticsearch.ingest.common;
package org.elasticsearch.ingest;

import java.util.Map;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;

/**
* Drop processor only returns {@code null} for the execution result to indicate that any document
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,45 @@ public void testUpdatingStatsWhenRemovingPipelineWorks() {
assertThat(ingestService.stats().getStatsPerPipeline(), not(hasKey("_id2")));
}

public void testExecuteWithDrop() {
Map<String, Processor.Factory> factories = new HashMap<>();
factories.put("drop", new DropProcessor.Factory());
factories.put("mock", (processorFactories, tag, config) -> new Processor() {
@Override
public IngestDocument execute(final IngestDocument ingestDocument) {
throw new AssertionError("Document should have been dropped but reached this processor");
}

@Override
public String getType() {
return null;
}

@Override
public String getTag() {
return null;
}
});
IngestService ingestService = createWithProcessors(factories);
PutPipelineRequest putRequest = new PutPipelineRequest("_id",
new BytesArray("{\"processors\": [{\"drop\" : {}}, {\"mock\" : {}}]}"), XContentType.JSON);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
ClusterState previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
@SuppressWarnings("unchecked")
final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
final Consumer<IndexRequest> dropHandler = mock(Consumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, dropHandler);
verify(failureHandler, never()).accept(any(), any());
verify(completionHandler, times(1)).accept(null);
verify(dropHandler, times(1)).accept(indexRequest);
}

private IngestDocument eqIndexTypeId(final Map<String, Object> source) {
return argThat(new IngestDocumentMatcher("_index", "_type", "_id", source));
}
Expand Down

0 comments on commit d634a24

Please sign in to comment.