Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update foreach processor to only support one applied processor. #19402

Merged
merged 1 commit into from
Jul 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public static List<Processor> readProcessorConfigs(List<Map<String, Map<String,
return processors;
}

private static Processor readProcessor(Map<String, Processor.Factory> processorFactories,
public static Processor readProcessor(Map<String, Processor.Factory> processorFactories,
String type, Map<String, Object> config) throws Exception {
Processor.Factory factory = processorFactories.get(type);
if (factory != null) {
Expand Down
52 changes: 23 additions & 29 deletions docs/reference/ingest/ingest-node.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -856,10 +856,10 @@ Processes elements in an array of unknown length.
All processors can operate on elements inside an array, but if all elements of an array need to
be processed in the same way, defining a processor for each element becomes cumbersome and tricky
because it is likely that the number of elements in an array is unknown. For this reason the `foreach`
processor exists. By specifying the field holding array elements and a list of processors that
define what should happen to each element, array fields can easily be preprocessed.
processor exists. By specifying the field holding array elements and a processor that
defines what should happen to each element, array fields can easily be preprocessed.

Processors inside the foreach processor work in a different context, and the only valid top-level
A processor inside the foreach processor works in a different context, and the only valid top-level
field is `_value`, which holds the array element value. Under this field other fields may exist.

If the `foreach` processor fails to process an element inside the array, and no `on_failure` processor has been specified,
Expand All @@ -871,7 +871,7 @@ then it aborts the execution and leaves the array unmodified.
|======
| Name | Required | Default | Description
| `field` | yes | - | The array field
| `processors` | yes | - | The processors
| `processor` | yes | - | The processor to execute against each field
|======

Assume the following document:
Expand All @@ -890,13 +890,11 @@ When this `foreach` processor operates on this sample document:
{
"foreach" : {
"field" : "values",
"processors" : [
{
"uppercase" : {
"field" : "_value"
}
"processor" : {
"uppercase" : {
"field" : "_value"
}
]
}
}
}
--------------------------------------------------
Expand Down Expand Up @@ -936,13 +934,11 @@ so the following `foreach` processor is used:
{
"foreach" : {
"field" : "persons",
"processors" : [
{
"remove" : {
"field" : "_value.id"
}
"processor" : {
"remove" : {
"field" : "_value.id"
}
]
}
}
}
--------------------------------------------------
Expand Down Expand Up @@ -975,21 +971,19 @@ block to send the document to the 'failure_index' index for later inspection:
{
"foreach" : {
"field" : "persons",
"processors" : [
{
"remove" : {
"field" : "_value.id",
"on_failure" : [
{
"set" : {
"field", "_index",
"value", "failure_index"
}
"processor" : {
"remove" : {
"field" : "_value.id",
"on_failure" : [
{
"set" : {
"field", "_index",
"value", "failure_index"
}
]
}
}
]
}
]
}
}
}
--------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.ConfigurationUtils.readList;
import static org.elasticsearch.ingest.ConfigurationUtils.readMap;
import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty;

/**
Expand All @@ -45,12 +49,12 @@ public final class ForEachProcessor extends AbstractProcessor {
public static final String TYPE = "foreach";

private final String field;
private final List<Processor> processors;
private final Processor processor;

ForEachProcessor(String tag, String field, List<Processor> processors) {
ForEachProcessor(String tag, String field, Processor processor) {
super(tag);
this.field = field;
this.processors = processors;
this.processor = processor;
}

@Override
Expand All @@ -61,9 +65,7 @@ public void execute(IngestDocument ingestDocument) throws Exception {
Map<String, Object> innerSource = new HashMap<>(ingestDocument.getSourceAndMetadata());
innerSource.put("_value", value); // scalar value to access the list item being evaluated
IngestDocument innerIngestDocument = new IngestDocument(innerSource, ingestDocument.getIngestMetadata());
for (Processor processor : processors) {
processor.execute(innerIngestDocument);
}
processor.execute(innerIngestDocument);
newValues.add(innerSource.get("_value"));
}
ingestDocument.setFieldValue(field, newValues);
Expand All @@ -78,18 +80,23 @@ String getField() {
return field;
}

List<Processor> getProcessors() {
return processors;
Processor getProcessor() {
return processor;
}

public static final class Factory implements Processor.Factory {
@Override
public ForEachProcessor create(Map<String, Processor.Factory> factories, String tag,
Map<String, Object> config) throws Exception {
String field = readStringProperty(TYPE, tag, config, "field");
List<Map<String, Map<String, Object>>> processorConfigs = readList(TYPE, tag, config, "processors");
List<Processor> processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, factories);
return new ForEachProcessor(tag, field, Collections.unmodifiableList(processors));
Map<String, Map<String, Object>> processorConfig = readMap(TYPE, tag, config, "processor");
Set<Map.Entry<String, Map<String, Object>>> entries = processorConfig.entrySet();
if (entries.size() != 1) {
throw newConfigurationException(TYPE, tag, "processor", "Must specify exactly one processor type");
}
Map.Entry<String, Map<String, Object>> entry = entries.iterator().next();
Processor processor = ConfigurationUtils.readProcessor(factories, entry.getKey(), entry.getValue());
return new ForEachProcessor(tag, field, processor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,54 +19,79 @@

package org.elasticsearch.ingest.common;

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.mockito.Mockito.mock;
import static org.hamcrest.Matchers.equalTo;

public class ForEachProcessorFactoryTests extends ESTestCase {

public void testCreate() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> {});
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("processors", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap())));
config.put("processor", Collections.singletonMap("_name", Collections.emptyMap()));
ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config);
assertThat(forEachProcessor, Matchers.notNullValue());
assertThat(forEachProcessor.getField(), Matchers.equalTo("_field"));
assertThat(forEachProcessor.getProcessors().size(), Matchers.equalTo(1));
assertThat(forEachProcessor.getProcessors().get(0), Matchers.sameInstance(processor));
assertThat(forEachProcessor.getField(), equalTo("_field"));
assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor));
}

config = new HashMap<>();
config.put("processors", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap())));
try {
forEachFactory.create(registry, null, config);
fail("exception expected");
} catch (Exception e) {
assertThat(e.getMessage(), Matchers.equalTo("[field] required property is missing"));
}
public void testCreateWithTooManyProcessorTypes() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_first", (r, t, c) -> processor);
registry.put("_second", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();

config = new HashMap<>();
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Map<String, Object> processorTypes = new HashMap<>();
processorTypes.put("_first", Collections.emptyMap());
processorTypes.put("_second", Collections.emptyMap());
config.put("processor", processorTypes);
Exception exception = expectThrows(ElasticsearchParseException.class, () -> forEachFactory.create(registry, null, config));
assertThat(exception.getMessage(), equalTo("[processor] Must specify exactly one processor type"));
}

public void testCreateWithNonExistingProcessorType() throws Exception {
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("processor", Collections.singletonMap("_name", Collections.emptyMap()));
Exception expectedException = expectThrows(ElasticsearchParseException.class,
() -> forEachFactory.create(Collections.emptyMap(), null, config));
assertThat(expectedException.getMessage(), equalTo("No processor type exists with name [_name]"));
}

public void testCreateWithMissingField() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("processor", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap())));
Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(registry, null, config));
assertThat(exception.getMessage(), equalTo("[field] required property is missing"));
}

public void testCreateWithMissingProcessor() {
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
try {
forEachFactory.create(registry, null, config);
fail("exception expected");
} catch (Exception e) {
assertThat(e.getMessage(), Matchers.equalTo("[processors] required property is missing"));
}
Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(Collections.emptyMap(), null, config));
assertThat(exception.getMessage(), equalTo("[processor] required property is missing"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.ValueSource;
import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
Expand All @@ -50,7 +49,7 @@ public void testExecute() throws Exception {
);

ForEachProcessor processor = new ForEachProcessor(
"_tag", "values", Collections.singletonList(new UppercaseProcessor("_tag", "_value"))
"_tag", "values", new UppercaseProcessor("_tag", "_value")
);
processor.execute(ingestDocument);

Expand All @@ -70,7 +69,7 @@ public void testExecuteWithFailure() throws Exception {
throw new RuntimeException("failure");
}
});
ForEachProcessor processor = new ForEachProcessor("_tag", "values", Collections.singletonList(testProcessor));
ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor);
try {
processor.execute(ingestDocument);
fail("exception expected");
Expand All @@ -90,8 +89,7 @@ public void testExecuteWithFailure() throws Exception {
});
Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {});
processor = new ForEachProcessor(
"_tag", "values",
Collections.singletonList(new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)))
"_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor))
);
processor.execute(ingestDocument);
assertThat(testProcessor.getInvokedCounter(), equalTo(3));
Expand All @@ -111,7 +109,7 @@ public void testMetaDataAvailable() throws Exception {
id.setFieldValue("_value.type", id.getSourceAndMetadata().get("_type"));
id.setFieldValue("_value.id", id.getSourceAndMetadata().get("_id"));
});
ForEachProcessor processor = new ForEachProcessor("_tag", "values", Collections.singletonList(innerProcessor));
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor);
processor.execute(ingestDocument);

assertThat(innerProcessor.getInvokedCounter(), equalTo(2));
Expand All @@ -138,9 +136,7 @@ public void testRestOfTheDocumentIsAvailable() throws Exception {

TemplateService ts = TestTemplateService.instance();
ForEachProcessor processor = new ForEachProcessor(
"_tag", "values", Arrays.asList(
new AppendProcessor("_tag", ts.compile("flat_values"), ValueSource.wrap("value", ts)),
new SetProcessor("_tag", ts.compile("_value.new_field"), (model) -> model.get("other")))
"_tag", "values", new SetProcessor("_tag", ts.compile("_value.new_field"), (model) -> model.get("other"))
);
processor.execute(ingestDocument);

Expand All @@ -149,21 +145,10 @@ public void testRestOfTheDocumentIsAvailable() throws Exception {
assertThat(ingestDocument.getFieldValue("values.2.new_field", String.class), equalTo("value"));
assertThat(ingestDocument.getFieldValue("values.3.new_field", String.class), equalTo("value"));
assertThat(ingestDocument.getFieldValue("values.4.new_field", String.class), equalTo("value"));

List<String> flatValues = ingestDocument.getFieldValue("flat_values", List.class);
assertThat(flatValues.size(), equalTo(5));
assertThat(flatValues.get(0), equalTo("value"));
assertThat(flatValues.get(1), equalTo("value"));
assertThat(flatValues.get(2), equalTo("value"));
assertThat(flatValues.get(3), equalTo("value"));
assertThat(flatValues.get(4), equalTo("value"));
}

public void testRandom() throws Exception {
int numProcessors = randomInt(8);
List<Processor> processors = new ArrayList<>(numProcessors);
for (int i = 0; i < numProcessors; i++) {
processors.add(new Processor() {
Processor innerProcessor = new Processor() {
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
String existingValue = ingestDocument.getFieldValue("_value", String.class);
Expand All @@ -179,8 +164,7 @@ public String getType() {
public String getTag() {
return null;
}
});
}
};
int numValues = randomIntBetween(1, 32);
List<String> values = new ArrayList<>(numValues);
for (int i = 0; i < numValues; i++) {
Expand All @@ -190,18 +174,13 @@ public String getTag() {
"_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values)
);

ForEachProcessor processor = new ForEachProcessor("_tag", "values", processors);
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor);
processor.execute(ingestDocument);
List<String> result = ingestDocument.getFieldValue("values", List.class);
assertThat(result.size(), equalTo(numValues));

String expectedString = "";
for (int i = 0; i < numProcessors; i++) {
expectedString = expectedString + ".";
}

for (String r : result) {
assertThat(r, equalTo(expectedString));
assertThat(r, equalTo("."));
}
}

Expand Down
Loading