Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Add copy_from parameter for set ingest processor (#63540) #66202

Merged
merged 2 commits into from
Dec 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion docs/reference/ingest/processors/set.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ its value will be replaced with the provided one.
|======
| Name | Required | Default | Description
| `field` | yes | - | The field to insert, upsert, or update. Supports <<accessing-template-fields,template snippets>>.
| `value` | yes | - | The value to be set for the field. Supports <<accessing-template-fields,template snippets>>.
| `value` | yes* | - | The value to be set for the field. Supports <<accessing-template-fields,template snippets>>. May specify only one of `value` or `copy_from`.
| `copy_from` | no | - | The origin field which will be copied to `field`, cannot set `value` simultaneously. Supported data types are `boolean`, `number`, `array`, `object`, `string`, `date`, etc.
| `override` | no | true | If processor will update fields with pre-existing non-null-valued field. When set to `false`, such fields will not be touched.
| `ignore_empty_value` | no | `false` | If `true` and `value` is a <<accessing-template-fields,template snippet>> that evaluates to `null` or the empty string, the processor quietly exits without modifying the document
include::common-options.asciidoc[]
Expand Down Expand Up @@ -88,3 +89,55 @@ Result:
}
--------------------------------------------------
// TESTRESPONSE[s/2019-03-11T21:54:37.909224Z/$body.docs.0.doc._ingest.timestamp/]
The contents of a field including complex values such as arrays and objects can be copied to another field using `copy_from`:
[source,console]
--------------------------------------------------
PUT _ingest/pipeline/set_bar
{
"description": "sets the value of bar from the field foo",
"processors": [
{
"set": {
"field": "bar",
"copy_from": "foo"
}
}
]
}

POST _ingest/pipeline/set_bar/_simulate
{
"docs": [
{
"_source": {
"foo": ["foo1", "foo2"]
}
}
]
}
--------------------------------------------------

Result:

[source,console-result]
--------------------------------------------------
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"bar": ["foo1", "foo2"],
"foo": ["foo1", "foo2"]
},
"_ingest" : {
"timestamp" : "2020-09-30T12:55:17.742795Z"
}
}
}
]
}
--------------------------------------------------
// TESTRESPONSE[s/2020-09-30T12:55:17.742795Z/$body.docs.0.doc._ingest.timestamp/]
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

import java.util.Map;

import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;

/**
* Processor that adds new fields with their corresponding values. If the field is already present, its value
* will be replaced with the provided one.
Expand All @@ -40,18 +42,20 @@ public final class SetProcessor extends AbstractProcessor {
private final boolean overrideEnabled;
private final TemplateScript.Factory field;
private final ValueSource value;
private final String copyFrom;
private final boolean ignoreEmptyValue;

SetProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value) {
this(tag, description, field, value, true, false);
SetProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value, String copyFrom) {
this(tag, description, field, value, copyFrom, true, false);
}

SetProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value, boolean overrideEnabled,
SetProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value, String copyFrom, boolean overrideEnabled,
boolean ignoreEmptyValue) {
super(tag, description);
this.overrideEnabled = overrideEnabled;
this.field = field;
this.value = value;
this.copyFrom = copyFrom;
this.ignoreEmptyValue = ignoreEmptyValue;
}

Expand All @@ -67,14 +71,23 @@ public ValueSource getValue() {
return value;
}

public String getCopyFrom() {
return copyFrom;
}

public boolean isIgnoreEmptyValue() {
return ignoreEmptyValue;
}

@Override
public IngestDocument execute(IngestDocument document) {
if (overrideEnabled || document.hasField(field) == false || document.getFieldValue(field, Object.class) == null) {
document.setFieldValue(field, value, ignoreEmptyValue);
if (copyFrom != null) {
Object fieldValue = document.getFieldValue(copyFrom, Object.class, ignoreEmptyValue);
document.setFieldValue(field, fieldValue, ignoreEmptyValue);
} else {
document.setFieldValue(field, value, ignoreEmptyValue);
}
}
return document;
}
Expand All @@ -96,16 +109,30 @@ public Factory(ScriptService scriptService) {
public SetProcessor create(Map<String, Processor.Factory> registry, String processorTag,
String description, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
String copyFrom = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "copy_from");
ValueSource valueSource = null;
if (copyFrom == null) {
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
valueSource = ValueSource.wrap(value, scriptService);
} else {
Object value = config.remove("value");
if (value != null) {
throw newConfigurationException(TYPE, processorTag, "copy_from",
"cannot set both `copy_from` and `value` in the same processor");
}
}

boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "override", true);
TemplateScript.Factory compiledTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag,
"field", field, scriptService);
boolean ignoreEmptyValue = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_empty_value", false);

return new SetProcessor(
processorTag,
description,
compiledTemplate,
ValueSource.wrap(value, scriptService),
valueSource,
copyFrom,
overrideEnabled,
ignoreEmptyValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void testRestOfTheDocumentIsAvailable() throws Exception {
ForEachProcessor processor = new ForEachProcessor(
"_tag", null, "values", new SetProcessor("_tag",
null, new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"),
(model) -> model.get("other")), false);
(model) -> model.get("other"), null), false);
processor.execute(ingestDocument, (result, e) -> {});

assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,25 @@ public void testInvalidMustacheTemplate() throws Exception {
assertThat(exception.getMetadata("es.processor_tag").get(0), equalTo(processorTag));
}

public void testCreateWithCopyFrom() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("copy_from", "field2");
String processorTag = randomAlphaOfLength(10);
SetProcessor setProcessor = factory.create(null, processorTag, null, config);
assertThat(setProcessor.getTag(), equalTo(processorTag));
assertThat(setProcessor.getField().newInstance(Collections.emptyMap()).execute(), equalTo("field1"));
assertThat(setProcessor.getCopyFrom(), equalTo("field2"));
}

public void testCreateWithCopyFromAndValue() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("copy_from", "field2");
config.put("value", "value1");
String processorTag = randomAlphaOfLength(10);
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> factory.create(null, processorTag, null, config));
assertThat(exception.getMessage(), equalTo("[copy_from] cannot set both `copy_from` and `value` in the same processor"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.hamcrest.Matchers;

import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;

Expand All @@ -38,7 +39,7 @@ public void testSetExistingFields() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String fieldName = RandomDocumentPicks.randomExistingFieldName(random(), ingestDocument);
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
Processor processor = createSetProcessor(fieldName, fieldValue, true, false);
Processor processor = createSetProcessor(fieldName, fieldValue, null, true, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
Expand All @@ -50,7 +51,7 @@ public void testSetNewFields() throws Exception {
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
String fieldName = RandomDocumentPicks.addRandomField(random(), testIngestDocument, fieldValue);
Processor processor = createSetProcessor(fieldName, fieldValue, true, false);
Processor processor = createSetProcessor(fieldName, fieldValue, null, true, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
Expand All @@ -59,7 +60,7 @@ public void testSetNewFields() throws Exception {
public void testSetFieldsTypeMismatch() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
ingestDocument.setFieldValue("field", "value");
Processor processor = createSetProcessor("field.inner", "value", true, false);
Processor processor = createSetProcessor("field.inner", "value", null, true, false);
try {
processor.execute(ingestDocument);
fail("processor execute should have failed");
Expand All @@ -73,7 +74,7 @@ public void testSetNewFieldWithOverrideDisabled() throws Exception {
IngestDocument ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
Processor processor = createSetProcessor(fieldName, fieldValue, false, false);
Processor processor = createSetProcessor(fieldName, fieldValue, null, false, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
Expand All @@ -83,7 +84,7 @@ public void testSetExistingFieldWithOverrideDisabled() throws Exception {
IngestDocument ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
Object fieldValue = "foo";
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = createSetProcessor(fieldName, "bar", false, false);
Processor processor = createSetProcessor(fieldName, "bar", null, false, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
Expand All @@ -94,54 +95,69 @@ public void testSetExistingNullFieldWithOverrideDisabled() throws Exception {
Object fieldValue = null;
Object newValue = "bar";
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = createSetProcessor(fieldName, newValue, false, false);
Processor processor = createSetProcessor(fieldName, newValue, null, false, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(newValue));
}

public void testSetMetadataExceptVersion() throws Exception {
Metadata randomMetadata = randomFrom(Metadata.INDEX, Metadata.TYPE, Metadata.ID, Metadata.ROUTING);
Processor processor = createSetProcessor(randomMetadata.getFieldName(), "_value", true, false);
Processor processor = createSetProcessor(randomMetadata.getFieldName(), "_value", null, true, false);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(randomMetadata.getFieldName(), String.class), Matchers.equalTo("_value"));
}

public void testSetMetadataVersion() throws Exception {
long version = randomNonNegativeLong();
Processor processor = createSetProcessor(Metadata.VERSION.getFieldName(), version, true, false);
Processor processor = createSetProcessor(Metadata.VERSION.getFieldName(), version, null, true, false);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(Metadata.VERSION.getFieldName(), Long.class), Matchers.equalTo(version));
}

public void testSetMetadataVersionType() throws Exception {
String versionType = randomFrom("internal", "external", "external_gte");
Processor processor = createSetProcessor(Metadata.VERSION_TYPE.getFieldName(), versionType, true, false);
Processor processor = createSetProcessor(Metadata.VERSION_TYPE.getFieldName(), versionType, null, true, false);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(Metadata.VERSION_TYPE.getFieldName(), String.class), Matchers.equalTo(versionType));
}

public void testSetMetadataIfSeqNo() throws Exception {
long ifSeqNo = randomNonNegativeLong();
Processor processor = createSetProcessor(Metadata.IF_SEQ_NO.getFieldName(), ifSeqNo, true, false);
Processor processor = createSetProcessor(Metadata.IF_SEQ_NO.getFieldName(), ifSeqNo, null, true, false);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(Metadata.IF_SEQ_NO.getFieldName(), Long.class), Matchers.equalTo(ifSeqNo));
}

public void testSetMetadataIfPrimaryTerm() throws Exception {
long ifPrimaryTerm = randomNonNegativeLong();
Processor processor = createSetProcessor(Metadata.IF_PRIMARY_TERM.getFieldName(), ifPrimaryTerm, true, false);
Processor processor = createSetProcessor(Metadata.IF_PRIMARY_TERM.getFieldName(), ifPrimaryTerm, null, true, false);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(Metadata.IF_PRIMARY_TERM.getFieldName(), Long.class), Matchers.equalTo(ifPrimaryTerm));
}

private static Processor createSetProcessor(String fieldName, Object fieldValue, boolean overrideEnabled, boolean ignoreEmptyValue) {
public void testCopyFromOtherField() throws Exception {
Map<String, Object> document = new HashMap<>();
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
document.put("field", fieldValue);

IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
String fieldName = RandomDocumentPicks.randomExistingFieldName(random(), ingestDocument);

Processor processor = createSetProcessor(fieldName, null, "field", true, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
}

private static Processor createSetProcessor(String fieldName, Object fieldValue, String copyFrom, boolean overrideEnabled,
boolean ignoreEmptyValue) {
return new SetProcessor(randomAlphaOfLength(10), null, new TestTemplateService.MockTemplateScript.Factory(fieldName),
ValueSource.wrap(fieldValue, TestTemplateService.instance()), overrideEnabled, ignoreEmptyValue);
ValueSource.wrap(fieldValue, TestTemplateService.instance()), copyFrom, overrideEnabled, ignoreEmptyValue);
}
}
Loading