From 7f4537c3d4068a117576e6e2214eaf924db6748f Mon Sep 17 00:00:00 2001 From: gaobinlong Date: Wed, 22 Nov 2023 03:18:23 +0800 Subject: [PATCH] Add template snippets support for field and target_field in KV ingest processor (#10040) * Add template snippets support for field and target_field in KV ingest processor Signed-off-by: Gao Binlong * modify change log Signed-off-by: Gao Binlong * revert replace assertThat by assertEquals Signed-off-by: Gao Binlong * Revert some code Signed-off-by: Gao Binlong * Revert some code Signed-off-by: Gao Binlong * Fix typo and skip some yml test by version Signed-off-by: Gao Binlong --------- Signed-off-by: Gao Binlong --- CHANGELOG.md | 1 + .../common/IngestCommonModulePlugin.java | 2 +- .../ingest/common/KeyValueProcessor.java | 108 ++++++++----- .../common/KeyValueProcessorFactoryTests.java | 20 ++- .../ingest/common/KeyValueProcessorTests.java | 10 +- .../rest-api-spec/test/ingest/150_kv.yml | 148 ++++++++++++++++++ 6 files changed, 242 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 61900a40bb2a4..6fff945518bd4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -107,6 +107,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670)) - Adding slf4j license header to LoggerMessageFormat.java ([#11069](https://github.com/opensearch-project/OpenSearch/pull/11069)) - [Streaming Indexing] Introduce new experimental server HTTP transport based on Netty 4 and Project Reactor (Reactor Netty) ([#9672](https://github.com/opensearch-project/OpenSearch/pull/9672)) +- Add template snippets support for field and target_field in KV ingest processor ([#10040](https://github.com/opensearch-project/OpenSearch/pull/10040)) - Allowing pipeline processors to access index mapping info by passing ingest service ref as part of the processor factory parameters ([#10307](https://github.com/opensearch-project/OpenSearch/pull/10307)) ### Dependencies diff --git a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java index 0f8422ea474d2..a2a51d968e078 100644 --- a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java +++ b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java @@ -98,7 +98,7 @@ public Map getProcessors(Processor.Parameters paramet processors.put(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService)); processors.put(DotExpanderProcessor.TYPE, new DotExpanderProcessor.Factory()); processors.put(JsonProcessor.TYPE, new JsonProcessor.Factory()); - processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory()); + processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory(parameters.scriptService)); processors.put(URLDecodeProcessor.TYPE, new URLDecodeProcessor.Factory()); processors.put(BytesProcessor.TYPE, new BytesProcessor.Factory()); processors.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService)); diff --git a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/KeyValueProcessor.java b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/KeyValueProcessor.java index ff3cca4ce111f..73f03b3cb2e0f 100644 --- a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/KeyValueProcessor.java +++ b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/KeyValueProcessor.java @@ -33,10 +33,13 @@ package org.opensearch.ingest.common; import org.opensearch.common.util.set.Sets; +import org.opensearch.core.common.Strings; import org.opensearch.ingest.AbstractProcessor; import org.opensearch.ingest.ConfigurationUtils; import org.opensearch.ingest.IngestDocument; import org.opensearch.ingest.Processor; +import org.opensearch.script.ScriptService; +import org.opensearch.script.TemplateScript; import java.util.Collections; import java.util.List; @@ -56,24 +59,24 @@ public final class KeyValueProcessor extends AbstractProcessor { private static final Pattern STRIP_BRACKETS = Pattern.compile("(^[\\(\\[<\"'])|([\\]\\)>\"']$)"); - private final String field; + private final TemplateScript.Factory field; private final String fieldSplit; private final String valueSplit; private final Set includeKeys; private final Set excludeKeys; - private final String targetField; + private final TemplateScript.Factory targetField; private final boolean ignoreMissing; private final Consumer execution; KeyValueProcessor( String tag, String description, - String field, + TemplateScript.Factory field, String fieldSplit, String valueSplit, Set includeKeys, Set excludeKeys, - String targetField, + TemplateScript.Factory targetField, boolean ignoreMissing, String trimKey, String trimValue, @@ -106,10 +109,10 @@ public final class KeyValueProcessor extends AbstractProcessor { private static Consumer buildExecution( String fieldSplit, String valueSplit, - String field, + TemplateScript.Factory field, Set includeKeys, Set excludeKeys, - String targetField, + TemplateScript.Factory targetField, boolean ignoreMissing, String trimKey, String trimValue, @@ -130,41 +133,62 @@ private static Consumer buildExecution( keyFilter = key -> includeKeys.contains(key) && excludeKeys.contains(key) == false; } } - final String fieldPathPrefix; - String keyPrefix = prefix == null ? "" : prefix; - if (targetField == null) { - fieldPathPrefix = keyPrefix; - } else { - fieldPathPrefix = targetField + "." + keyPrefix; - } - final Function keyPrefixer; - if (fieldPathPrefix.isEmpty()) { - keyPrefixer = val -> val; - } else { - keyPrefixer = val -> fieldPathPrefix + val; - } - final Function fieldSplitter = buildSplitter(fieldSplit, true); - Function valueSplitter = buildSplitter(valueSplit, false); - final Function keyTrimmer = buildTrimmer(trimKey); - final Function bracketStrip; - if (stripBrackets) { - bracketStrip = val -> STRIP_BRACKETS.matcher(val).replaceAll(""); - } else { - bracketStrip = val -> val; - } - final Function valueTrimmer = buildTrimmer(trimValue); + return document -> { - String value = document.getFieldValue(field, String.class, ignoreMissing); + final String fieldPathPrefix; + String keyPrefix = prefix == null ? "" : prefix; + if (targetField != null) { + String targetFieldPath = document.renderTemplate(targetField); + if (!Strings.isNullOrEmpty((targetFieldPath))) { + fieldPathPrefix = targetFieldPath + "." + keyPrefix; + } else { + fieldPathPrefix = keyPrefix; + } + } else { + fieldPathPrefix = keyPrefix; + } + + final Function keyPrefixer; + if (fieldPathPrefix.isEmpty()) { + keyPrefixer = val -> val; + } else { + keyPrefixer = val -> fieldPathPrefix + val; + } + final Function fieldSplitter = buildSplitter(fieldSplit, true); + Function valueSplitter = buildSplitter(valueSplit, false); + final Function keyTrimmer = buildTrimmer(trimKey); + final Function bracketStrip; + if (stripBrackets) { + bracketStrip = val -> STRIP_BRACKETS.matcher(val).replaceAll(""); + } else { + bracketStrip = val -> val; + } + final Function valueTrimmer = buildTrimmer(trimValue); + + String path = document.renderTemplate(field); + final boolean fieldPathNullOrEmpty = Strings.isNullOrEmpty(path); + if (fieldPathNullOrEmpty || document.hasField(path, true) == false) { + if (ignoreMissing) { + return; + } else if (fieldPathNullOrEmpty) { + throw new IllegalArgumentException("field path cannot be null nor empty"); + } else { + throw new IllegalArgumentException("field [" + path + "] doesn't exist"); + } + } + + String value = document.getFieldValue(path, String.class, ignoreMissing); if (value == null) { if (ignoreMissing) { return; } - throw new IllegalArgumentException("field [" + field + "] is null, cannot extract key-value pairs."); + throw new IllegalArgumentException("field [" + path + "] is null, cannot extract key-value pairs. "); } + for (String part : fieldSplitter.apply(value)) { String[] kv = valueSplitter.apply(part); if (kv.length != 2) { - throw new IllegalArgumentException("field [" + field + "] does not contain value_split [" + valueSplit + "]"); + throw new IllegalArgumentException("field [" + path + "] does not contain value_split [" + valueSplit + "]"); } String key = keyTrimmer.apply(kv[0]); if (keyFilter.test(key)) { @@ -193,7 +217,7 @@ private static Function buildSplitter(String split, boolean fi } } - String getField() { + TemplateScript.Factory getField() { return field; } @@ -213,7 +237,7 @@ Set getExcludeKeys() { return excludeKeys; } - String getTargetField() { + TemplateScript.Factory getTargetField() { return targetField; } @@ -241,6 +265,12 @@ public String getType() { } public static class Factory implements Processor.Factory { + private final ScriptService scriptService; + + public Factory(ScriptService scriptService) { + this.scriptService = scriptService; + } + @Override public KeyValueProcessor create( Map registry, @@ -249,7 +279,13 @@ public KeyValueProcessor create( Map config ) throws Exception { String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); + TemplateScript.Factory fieldTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", field, scriptService); String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field"); + TemplateScript.Factory targetFieldTemplate = null; + if (!Strings.isNullOrEmpty(targetField)) { + targetFieldTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag, "target_field", targetField, scriptService); + } + String fieldSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field_split"); String valueSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "value_split"); String trimKey = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "trim_key"); @@ -270,12 +306,12 @@ public KeyValueProcessor create( return new KeyValueProcessor( processorTag, description, - field, + fieldTemplate, fieldSplit, valueSplit, includeKeys, excludeKeys, - targetField, + targetFieldTemplate, ignoreMissing, trimKey, trimValue, diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/KeyValueProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/KeyValueProcessorFactoryTests.java index 62060a682c0cb..78972ff8d5dea 100644 --- a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/KeyValueProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/KeyValueProcessorFactoryTests.java @@ -35,7 +35,9 @@ import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchParseException; import org.opensearch.common.util.set.Sets; +import org.opensearch.ingest.TestTemplateService; import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; import java.util.Arrays; import java.util.Collections; @@ -48,8 +50,14 @@ public class KeyValueProcessorFactoryTests extends OpenSearchTestCase { + private KeyValueProcessor.Factory factory; + + @Before + public void init() { + factory = new KeyValueProcessor.Factory(TestTemplateService.instance()); + } + public void testCreateWithDefaults() throws Exception { - KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory(); Map config = new HashMap<>(); config.put("field", "field1"); config.put("field_split", "&"); @@ -57,7 +65,7 @@ public void testCreateWithDefaults() throws Exception { String processorTag = randomAlphaOfLength(10); KeyValueProcessor processor = factory.create(null, processorTag, null, config); assertThat(processor.getTag(), equalTo(processorTag)); - assertThat(processor.getField(), equalTo("field1")); + assertThat(processor.getField().newInstance(Collections.emptyMap()).execute(), equalTo("field1")); assertThat(processor.getFieldSplit(), equalTo("&")); assertThat(processor.getValueSplit(), equalTo("=")); assertThat(processor.getIncludeKeys(), is(nullValue())); @@ -66,7 +74,6 @@ public void testCreateWithDefaults() throws Exception { } public void testCreateWithAllFieldsSet() throws Exception { - KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory(); Map config = new HashMap<>(); config.put("field", "field1"); config.put("field_split", "&"); @@ -78,17 +85,16 @@ public void testCreateWithAllFieldsSet() throws Exception { String processorTag = randomAlphaOfLength(10); KeyValueProcessor processor = factory.create(null, processorTag, null, config); assertThat(processor.getTag(), equalTo(processorTag)); - assertThat(processor.getField(), equalTo("field1")); + assertThat(processor.getField().newInstance(Collections.emptyMap()).execute(), equalTo("field1")); assertThat(processor.getFieldSplit(), equalTo("&")); assertThat(processor.getValueSplit(), equalTo("=")); assertThat(processor.getIncludeKeys(), equalTo(Sets.newHashSet("a", "b"))); assertThat(processor.getExcludeKeys(), equalTo(Collections.emptySet())); - assertThat(processor.getTargetField(), equalTo("target")); + assertThat(processor.getTargetField().newInstance(Collections.emptyMap()).execute(), equalTo("target")); assertTrue(processor.isIgnoreMissing()); } public void testCreateWithMissingField() { - KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory(); Map config = new HashMap<>(); String processorTag = randomAlphaOfLength(10); OpenSearchException exception = expectThrows( @@ -99,7 +105,6 @@ public void testCreateWithMissingField() { } public void testCreateWithMissingFieldSplit() { - KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory(); Map config = new HashMap<>(); config.put("field", "field1"); String processorTag = randomAlphaOfLength(10); @@ -111,7 +116,6 @@ public void testCreateWithMissingFieldSplit() { } public void testCreateWithMissingValueSplit() { - KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory(); Map config = new HashMap<>(); config.put("field", "field1"); config.put("field_split", "&"); diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/KeyValueProcessorTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/KeyValueProcessorTests.java index 685a78e2e769b..5f71ea6f16a4f 100644 --- a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/KeyValueProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/KeyValueProcessorTests.java @@ -36,6 +36,7 @@ import org.opensearch.ingest.IngestDocument; import org.opensearch.ingest.Processor; import org.opensearch.ingest.RandomDocumentPicks; +import org.opensearch.ingest.TestTemplateService; import org.opensearch.test.OpenSearchTestCase; import java.util.ArrayList; @@ -51,7 +52,7 @@ public class KeyValueProcessorTests extends OpenSearchTestCase { - private static final KeyValueProcessor.Factory FACTORY = new KeyValueProcessor.Factory(); + private static final KeyValueProcessor.Factory FACTORY = new KeyValueProcessor.Factory(TestTemplateService.instance()); public void test() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); @@ -123,7 +124,12 @@ public void testMissingField() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); Processor processor = createKvProcessor("unknown", "&", "=", null, null, "target", false); IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument)); - assertThat(exception.getMessage(), equalTo("field [unknown] not present as part of path [unknown]")); + assertThat(exception.getMessage(), equalTo("field [unknown] doesn't exist")); + + // when using template snippet, the resolved field path maybe empty + Processor processorWithEmptyFieldPath = createKvProcessor("", "&", "=", null, null, "target", false); + exception = expectThrows(IllegalArgumentException.class, () -> processorWithEmptyFieldPath.execute(ingestDocument)); + assertThat(exception.getMessage(), equalTo("field path cannot be null nor empty")); } public void testNullValueWithIgnoreMissing() throws Exception { diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/150_kv.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/150_kv.yml index 836243652b2e0..30a0a520b5c40 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/150_kv.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/150_kv.yml @@ -39,3 +39,151 @@ teardown: id: 1 - match: { _source.goodbye: "everybody" } - match: { _source.hello: "world" } + +--- +"Test KV Processor with template snippets": + - skip: + version: " - 2.11.99" + reason: "KV Processor with template snippets is only supported since 2.12.0" + + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "kv" : { + "field" : "{{source}}", + "target_field" : "{{target}}", + "field_split": " ", + "value_split": "=" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "1" + body: { + source: "foo", + target: "zoo", + foo: "goodbye=everybody hello=world" + } + + - do: + get: + index: test + id: 1 + - match: { _source.zoo.goodbye: "everybody" } + - match: { _source.zoo.hello: "world" } + +--- +"Test KV Processor with non-existing field and without ignore_missing": + - skip: + version: " - 2.11.99" + reason: "KV Processor with template snippets is only supported since 2.12.0" + + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "kv" : { + "field" : "{{source}}", + "target_field" : "{{target}}", + "field_split": " ", + "value_split": "=" + } + } + ] + } + - match: { acknowledged: true } + + - do: + catch: /field path cannot be null nor empty/ + index: + index: test + id: 1 + pipeline: "1" + body: { + target: "zoo", + foo: "goodbye=everybody hello=world" + } + + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "kv" : { + "field" : "{{source}}", + "target_field" : "{{target}}", + "field_split": " ", + "value_split": "=" + } + } + ] + } + - match: { acknowledged: true } + + - do: + catch: /field \[unknown\] doesn\'t exist/ + index: + index: test + id: 1 + pipeline: "1" + body: { + source: "unknown", + target: "zoo", + foo: "goodbye=everybody hello=world" + } + +--- +"Test KV Processor with non-existing field and ignore_missing": + - skip: + version: " - 2.11.99" + reason: "KV Processor with template snippets is only supported since 2.12.0" + + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "kv" : { + "field" : "{{source}}", + "target_field" : "{{target}}", + "field_split": " ", + "value_split": "=", + "ignore_missing": true + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "1" + body: { + target: "zoo", + foo: "goodbye=everybody hello=world" + } + + - do: + get: + index: test + id: 1 + - match: { _source: { target: "zoo", foo: "goodbye=everybody hello=world"}}