From 8fd01554bc0056388bf68a0f306a5fac43b7bb52 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Wed, 13 Jul 2016 13:13:00 -0700 Subject: [PATCH] update foreach processor to only support one applied processor. (#19402) Closes #19345. --- .../ingest/ConfigurationUtils.java | 2 +- docs/reference/ingest/ingest-node.asciidoc | 52 ++++++------- .../ingest/common/ForEachProcessor.java | 29 +++++--- .../common/ForEachProcessorFactoryTests.java | 73 +++++++++++++------ .../ingest/common/ForEachProcessorTests.java | 39 +++------- .../rest-api-spec/test/ingest/80_foreach.yaml | 6 +- .../10_pipeline_with_mustache_templates.yaml | 6 +- .../test/ingest/20_combine_processors.yaml | 12 +-- 8 files changed, 108 insertions(+), 111 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java b/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java index 3f2c294c12c70..8b6077c667d00 100644 --- a/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java +++ b/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java @@ -251,7 +251,7 @@ public static List readProcessorConfigs(List processorFactories, + public static Processor readProcessor(Map processorFactories, String type, Map config) throws Exception { Processor.Factory factory = processorFactories.get(type); if (factory != null) { diff --git a/docs/reference/ingest/ingest-node.asciidoc b/docs/reference/ingest/ingest-node.asciidoc index ec4f9c30e66cf..40537f4d29c77 100644 --- a/docs/reference/ingest/ingest-node.asciidoc +++ b/docs/reference/ingest/ingest-node.asciidoc @@ -856,10 +856,10 @@ Processes elements in an array of unknown length. All processors can operate on elements inside an array, but if all elements of an array need to be processed in the same way, defining a processor for each element becomes cumbersome and tricky because it is likely that the number of elements in an array is unknown. For this reason the `foreach` -processor exists. By specifying the field holding array elements and a list of processors that -define what should happen to each element, array fields can easily be preprocessed. +processor exists. By specifying the field holding array elements and a processor that +defines what should happen to each element, array fields can easily be preprocessed. -Processors inside the foreach processor work in a different context, and the only valid top-level +A processor inside the foreach processor works in a different context, and the only valid top-level field is `_value`, which holds the array element value. Under this field other fields may exist. If the `foreach` processor fails to process an element inside the array, and no `on_failure` processor has been specified, @@ -871,7 +871,7 @@ then it aborts the execution and leaves the array unmodified. |====== | Name | Required | Default | Description | `field` | yes | - | The array field -| `processors` | yes | - | The processors +| `processor` | yes | - | The processor to execute against each field |====== Assume the following document: @@ -890,13 +890,11 @@ When this `foreach` processor operates on this sample document: { "foreach" : { "field" : "values", - "processors" : [ - { - "uppercase" : { - "field" : "_value" - } + "processor" : { + "uppercase" : { + "field" : "_value" } - ] + } } } -------------------------------------------------- @@ -936,13 +934,11 @@ so the following `foreach` processor is used: { "foreach" : { "field" : "persons", - "processors" : [ - { - "remove" : { - "field" : "_value.id" - } + "processor" : { + "remove" : { + "field" : "_value.id" } - ] + } } } -------------------------------------------------- @@ -975,21 +971,19 @@ block to send the document to the 'failure_index' index for later inspection: { "foreach" : { "field" : "persons", - "processors" : [ - { - "remove" : { - "field" : "_value.id", - "on_failure" : [ - { - "set" : { - "field", "_index", - "value", "failure_index" - } + "processor" : { + "remove" : { + "field" : "_value.id", + "on_failure" : [ + { + "set" : { + "field", "_index", + "value", "failure_index" } - ] - } + } + ] } - ] + } } } -------------------------------------------------- diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index f4cbdf06e7da9..c4640733d0619 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -28,10 +28,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; +import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; import static org.elasticsearch.ingest.ConfigurationUtils.readList; +import static org.elasticsearch.ingest.ConfigurationUtils.readMap; import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty; /** @@ -45,12 +49,12 @@ public final class ForEachProcessor extends AbstractProcessor { public static final String TYPE = "foreach"; private final String field; - private final List processors; + private final Processor processor; - ForEachProcessor(String tag, String field, List processors) { + ForEachProcessor(String tag, String field, Processor processor) { super(tag); this.field = field; - this.processors = processors; + this.processor = processor; } @Override @@ -61,9 +65,7 @@ public void execute(IngestDocument ingestDocument) throws Exception { Map innerSource = new HashMap<>(ingestDocument.getSourceAndMetadata()); innerSource.put("_value", value); // scalar value to access the list item being evaluated IngestDocument innerIngestDocument = new IngestDocument(innerSource, ingestDocument.getIngestMetadata()); - for (Processor processor : processors) { - processor.execute(innerIngestDocument); - } + processor.execute(innerIngestDocument); newValues.add(innerSource.get("_value")); } ingestDocument.setFieldValue(field, newValues); @@ -78,8 +80,8 @@ String getField() { return field; } - List getProcessors() { - return processors; + Processor getProcessor() { + return processor; } public static final class Factory implements Processor.Factory { @@ -87,9 +89,14 @@ public static final class Factory implements Processor.Factory { public ForEachProcessor create(Map factories, String tag, Map config) throws Exception { String field = readStringProperty(TYPE, tag, config, "field"); - List>> processorConfigs = readList(TYPE, tag, config, "processors"); - List processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, factories); - return new ForEachProcessor(tag, field, Collections.unmodifiableList(processors)); + Map> processorConfig = readMap(TYPE, tag, config, "processor"); + Set>> entries = processorConfig.entrySet(); + if (entries.size() != 1) { + throw newConfigurationException(TYPE, tag, "processor", "Must specify exactly one processor type"); + } + Map.Entry> entry = entries.iterator().next(); + Processor processor = ConfigurationUtils.readProcessor(factories, entry.getKey(), entry.getValue()); + return new ForEachProcessor(tag, field, processor); } } } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java index 617a2f27d14d5..49611d76f4081 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java @@ -19,11 +19,9 @@ package org.elasticsearch.ingest.common; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.ingest.Processor; +import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.TestProcessor; -import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; @@ -31,42 +29,69 @@ import java.util.HashMap; import java.util.Map; -import static org.mockito.Mockito.mock; +import static org.hamcrest.Matchers.equalTo; public class ForEachProcessorFactoryTests extends ESTestCase { public void testCreate() throws Exception { - Processor processor = new TestProcessor(ingestDocument -> {}); + Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(); Map config = new HashMap<>(); config.put("field", "_field"); - config.put("processors", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap()))); + config.put("processor", Collections.singletonMap("_name", Collections.emptyMap())); ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config); assertThat(forEachProcessor, Matchers.notNullValue()); - assertThat(forEachProcessor.getField(), Matchers.equalTo("_field")); - assertThat(forEachProcessor.getProcessors().size(), Matchers.equalTo(1)); - assertThat(forEachProcessor.getProcessors().get(0), Matchers.sameInstance(processor)); + assertThat(forEachProcessor.getField(), equalTo("_field")); + assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor)); + } - config = new HashMap<>(); - config.put("processors", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap()))); - try { - forEachFactory.create(registry, null, config); - fail("exception expected"); - } catch (Exception e) { - assertThat(e.getMessage(), Matchers.equalTo("[field] required property is missing")); - } + public void testCreateWithTooManyProcessorTypes() throws Exception { + Processor processor = new TestProcessor(ingestDocument -> { }); + Map registry = new HashMap<>(); + registry.put("_first", (r, t, c) -> processor); + registry.put("_second", (r, t, c) -> processor); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(); - config = new HashMap<>(); + Map config = new HashMap<>(); + config.put("field", "_field"); + Map processorTypes = new HashMap<>(); + processorTypes.put("_first", Collections.emptyMap()); + processorTypes.put("_second", Collections.emptyMap()); + config.put("processor", processorTypes); + Exception exception = expectThrows(ElasticsearchParseException.class, () -> forEachFactory.create(registry, null, config)); + assertThat(exception.getMessage(), equalTo("[processor] Must specify exactly one processor type")); + } + + public void testCreateWithNonExistingProcessorType() throws Exception { + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(); + Map config = new HashMap<>(); + config.put("field", "_field"); + config.put("processor", Collections.singletonMap("_name", Collections.emptyMap())); + Exception expectedException = expectThrows(ElasticsearchParseException.class, + () -> forEachFactory.create(Collections.emptyMap(), null, config)); + assertThat(expectedException.getMessage(), equalTo("No processor type exists with name [_name]")); + } + + public void testCreateWithMissingField() throws Exception { + Processor processor = new TestProcessor(ingestDocument -> { }); + Map registry = new HashMap<>(); + registry.put("_name", (r, t, c) -> processor); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(); + Map config = new HashMap<>(); + config.put("processor", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap()))); + Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(registry, null, config)); + assertThat(exception.getMessage(), equalTo("[field] required property is missing")); + } + + public void testCreateWithMissingProcessor() { + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(); + Map config = new HashMap<>(); config.put("field", "_field"); - try { - forEachFactory.create(registry, null, config); - fail("exception expected"); - } catch (Exception e) { - assertThat(e.getMessage(), Matchers.equalTo("[processors] required property is missing")); - } + Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(Collections.emptyMap(), null, config)); + assertThat(exception.getMessage(), equalTo("[processor] required property is missing")); } } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index 05287935b495a..714722418e795 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -25,7 +25,6 @@ import org.elasticsearch.ingest.TemplateService; import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.ingest.TestTemplateService; -import org.elasticsearch.ingest.ValueSource; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; @@ -50,7 +49,7 @@ public void testExecute() throws Exception { ); ForEachProcessor processor = new ForEachProcessor( - "_tag", "values", Collections.singletonList(new UppercaseProcessor("_tag", "_value")) + "_tag", "values", new UppercaseProcessor("_tag", "_value") ); processor.execute(ingestDocument); @@ -70,7 +69,7 @@ public void testExecuteWithFailure() throws Exception { throw new RuntimeException("failure"); } }); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", Collections.singletonList(testProcessor)); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor); try { processor.execute(ingestDocument); fail("exception expected"); @@ -90,8 +89,7 @@ public void testExecuteWithFailure() throws Exception { }); Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {}); processor = new ForEachProcessor( - "_tag", "values", - Collections.singletonList(new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor))) + "_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)) ); processor.execute(ingestDocument); assertThat(testProcessor.getInvokedCounter(), equalTo(3)); @@ -111,7 +109,7 @@ public void testMetaDataAvailable() throws Exception { id.setFieldValue("_value.type", id.getSourceAndMetadata().get("_type")); id.setFieldValue("_value.id", id.getSourceAndMetadata().get("_id")); }); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", Collections.singletonList(innerProcessor)); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor); processor.execute(ingestDocument); assertThat(innerProcessor.getInvokedCounter(), equalTo(2)); @@ -138,9 +136,7 @@ public void testRestOfTheDocumentIsAvailable() throws Exception { TemplateService ts = TestTemplateService.instance(); ForEachProcessor processor = new ForEachProcessor( - "_tag", "values", Arrays.asList( - new AppendProcessor("_tag", ts.compile("flat_values"), ValueSource.wrap("value", ts)), - new SetProcessor("_tag", ts.compile("_value.new_field"), (model) -> model.get("other"))) + "_tag", "values", new SetProcessor("_tag", ts.compile("_value.new_field"), (model) -> model.get("other")) ); processor.execute(ingestDocument); @@ -149,21 +145,10 @@ public void testRestOfTheDocumentIsAvailable() throws Exception { assertThat(ingestDocument.getFieldValue("values.2.new_field", String.class), equalTo("value")); assertThat(ingestDocument.getFieldValue("values.3.new_field", String.class), equalTo("value")); assertThat(ingestDocument.getFieldValue("values.4.new_field", String.class), equalTo("value")); - - List flatValues = ingestDocument.getFieldValue("flat_values", List.class); - assertThat(flatValues.size(), equalTo(5)); - assertThat(flatValues.get(0), equalTo("value")); - assertThat(flatValues.get(1), equalTo("value")); - assertThat(flatValues.get(2), equalTo("value")); - assertThat(flatValues.get(3), equalTo("value")); - assertThat(flatValues.get(4), equalTo("value")); } public void testRandom() throws Exception { - int numProcessors = randomInt(8); - List processors = new ArrayList<>(numProcessors); - for (int i = 0; i < numProcessors; i++) { - processors.add(new Processor() { + Processor innerProcessor = new Processor() { @Override public void execute(IngestDocument ingestDocument) throws Exception { String existingValue = ingestDocument.getFieldValue("_value", String.class); @@ -179,8 +164,7 @@ public String getType() { public String getTag() { return null; } - }); - } + }; int numValues = randomIntBetween(1, 32); List values = new ArrayList<>(numValues); for (int i = 0; i < numValues; i++) { @@ -190,18 +174,13 @@ public String getTag() { "_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values) ); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", processors); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor); processor.execute(ingestDocument); List result = ingestDocument.getFieldValue("values", List.class); assertThat(result.size(), equalTo(numValues)); - String expectedString = ""; - for (int i = 0; i < numProcessors; i++) { - expectedString = expectedString + "."; - } - for (String r : result) { - assertThat(r, equalTo(expectedString)); + assertThat(r, equalTo(".")); } } diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/80_foreach.yaml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/80_foreach.yaml index fc0fca81b097d..2ebfc08939679 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/80_foreach.yaml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/80_foreach.yaml @@ -10,13 +10,11 @@ { "foreach" : { "field" : "values", - "processors" : [ - { + "processor" : { "uppercase" : { "field" : "_value" } - } - ] + } } } ] diff --git a/qa/smoke-test-ingest-with-all-dependencies/src/test/resources/rest-api-spec/test/ingest/10_pipeline_with_mustache_templates.yaml b/qa/smoke-test-ingest-with-all-dependencies/src/test/resources/rest-api-spec/test/ingest/10_pipeline_with_mustache_templates.yaml index ba07943d3d9d9..cb914b9afce8e 100644 --- a/qa/smoke-test-ingest-with-all-dependencies/src/test/resources/rest-api-spec/test/ingest/10_pipeline_with_mustache_templates.yaml +++ b/qa/smoke-test-ingest-with-all-dependencies/src/test/resources/rest-api-spec/test/ingest/10_pipeline_with_mustache_templates.yaml @@ -231,14 +231,12 @@ { "foreach": { "field": "values", - "processors": [ - { + "processor": { "append": { "field": "values_flat", "value": "{{_value.key}}_{{_value.value}}" } - } - ] + } } } ] diff --git a/qa/smoke-test-ingest-with-all-dependencies/src/test/resources/rest-api-spec/test/ingest/20_combine_processors.yaml b/qa/smoke-test-ingest-with-all-dependencies/src/test/resources/rest-api-spec/test/ingest/20_combine_processors.yaml index 7ccaa64b9f40d..9a52979b93019 100644 --- a/qa/smoke-test-ingest-with-all-dependencies/src/test/resources/rest-api-spec/test/ingest/20_combine_processors.yaml +++ b/qa/smoke-test-ingest-with-all-dependencies/src/test/resources/rest-api-spec/test/ingest/20_combine_processors.yaml @@ -82,13 +82,11 @@ { "foreach" : { "field" : "friends", - "processors" : [ - { + "processor" : { "remove" : { "field" : "_value.id" } - } - ] + } } }, { @@ -106,13 +104,11 @@ { "foreach" : { "field" : "address", - "processors" : [ - { + "processor" : { "trim" : { "field" : "_value" } - } - ] + } } }, {