Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into os-10060
Browse files Browse the repository at this point in the history
  • Loading branch information
niyatiagg committed Nov 22, 2023
2 parents 1affddf + b974dfb commit 966ae6e
Show file tree
Hide file tree
Showing 97 changed files with 300 additions and 96 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/add-untriaged.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
if: github.repository == 'opensearch-project/OpenSearch'
runs-on: ubuntu-latest
steps:
- uses: actions/github-script@v6
- uses: actions/github-script@v7
with:
script: |
github.rest.issues.addLabels({
Expand Down
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))
- Adding slf4j license header to LoggerMessageFormat.java ([#11069](https://github.com/opensearch-project/OpenSearch/pull/11069))
- [Streaming Indexing] Introduce new experimental server HTTP transport based on Netty 4 and Project Reactor (Reactor Netty) ([#9672](https://github.com/opensearch-project/OpenSearch/pull/9672))
- Add template snippets support for field and target_field in KV ingest processor ([#10040](https://github.com/opensearch-project/OpenSearch/pull/10040))
- Allowing pipeline processors to access index mapping info by passing ingest service ref as part of the processor factory parameters ([#10307](https://github.com/opensearch-project/OpenSearch/pull/10307))

### Dependencies
Expand All @@ -123,7 +124,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `org.apache.logging.log4j:log4j-core` from 2.20.0 to 2.22.0 ([#10858](https://github.com/opensearch-project/OpenSearch/pull/10858), [#11000](https://github.com/opensearch-project/OpenSearch/pull/11000), [#11270](https://github.com/opensearch-project/OpenSearch/pull/11270))
- Bump `aws-actions/configure-aws-credentials` from 2 to 4 ([#10504](https://github.com/opensearch-project/OpenSearch/pull/10504))
- Bump `stefanzweifel/git-auto-commit-action` from 4 to 5 ([#11171](https://github.com/opensearch-project/OpenSearch/pull/11171))
- Bump `actions/github-script` from 6 to 7 ([#11271](https://github.com/opensearch-project/OpenSearch/pull/11271))
- Bump `jackson` and `jackson_databind` from 2.15.2 to 2.16.0 ([#11273](https://github.com/opensearch-project/OpenSearch/pull/11273))
- Bump `netty` from 4.1.100.Final to 4.1.101.Final ([#11294](https://github.com/opensearch-project/OpenSearch/pull/11294))

### Changed
- Mute the query profile IT with concurrent execution ([#9840](https://github.com/opensearch-project/OpenSearch/pull/9840))
Expand All @@ -134,12 +137,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [BUG] Fix java.lang.SecurityException in repository-gcs plugin ([#10642](https://github.com/opensearch-project/OpenSearch/pull/10642))
- Add telemetry tracer/metric enable flag and integ test. ([#10395](https://github.com/opensearch-project/OpenSearch/pull/10395))
- Add instrumentation for indexing in transport bulk action and transport shard bulk action. ([#10273](https://github.com/opensearch-project/OpenSearch/pull/10273))
- [BUG] Disable sort optimization for HALF_FLOAT ([#10999](https://github.com/opensearch-project/OpenSearch/pull/10999))
- Refactor common parts from the Rounding class into a separate 'round' package ([#11023](https://github.com/opensearch-project/OpenSearch/issues/11023))
- Performance improvement for MultiTerm Queries on Keyword fields ([#7057](https://github.com/opensearch-project/OpenSearch/issues/7057))
- Disable concurrent aggs for Diversified Sampler and Sampler aggs ([#11087](https://github.com/opensearch-project/OpenSearch/issues/11087))
- Made leader/follower check timeout setting dynamic ([#10528](https://github.com/opensearch-project/OpenSearch/pull/10528))
- Use iterative approach to evaluate Regex.simpleMatch ([#11060](https://github.com/opensearch-project/OpenSearch/pull/11060))

### Deprecated

Expand All @@ -153,7 +154,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix some test methods in SimulatePipelineRequestParsingTests never run and fix test failure ([#10496](https://github.com/opensearch-project/OpenSearch/pull/10496))
- Fix passing wrong parameter when calling newConfigurationException() in DotExpanderProcessor ([#10737](https://github.com/opensearch-project/OpenSearch/pull/10737))
- Fix SuggestSearch.testSkipDuplicates by forceing refresh when indexing its test documents ([#11068](https://github.com/opensearch-project/OpenSearch/pull/11068))
- Adding version condition while adding geoshape doc values to the index, to ensure backward compatibility.([#11095](https://github.com/opensearch-project/OpenSearch/pull/11095))
- Delegating CachingWeightWrapper#count to internal weight object ([#10543](https://github.com/opensearch-project/OpenSearch/pull/10543))
- Fix per request latency last phase not tracked ([#10934](https://github.com/opensearch-project/OpenSearch/pull/10934))

Expand Down
2 changes: 1 addition & 1 deletion buildSrc/version.properties
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jakarta_annotation = 1.3.5
# when updating the JNA version, also update the version in buildSrc/build.gradle
jna = 5.13.0

netty = 4.1.100.Final
netty = 4.1.101.Final
joda = 2.12.2

# project reactor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
processors.put(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService));
processors.put(DotExpanderProcessor.TYPE, new DotExpanderProcessor.Factory());
processors.put(JsonProcessor.TYPE, new JsonProcessor.Factory());
processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory());
processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory(parameters.scriptService));
processors.put(URLDecodeProcessor.TYPE, new URLDecodeProcessor.Factory());
processors.put(BytesProcessor.TYPE, new BytesProcessor.Factory());
processors.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@
package org.opensearch.ingest.common;

import org.opensearch.common.util.set.Sets;
import org.opensearch.core.common.Strings;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.ingest.IngestDocument;
import org.opensearch.ingest.Processor;
import org.opensearch.script.ScriptService;
import org.opensearch.script.TemplateScript;

import java.util.Collections;
import java.util.List;
Expand All @@ -56,24 +59,24 @@ public final class KeyValueProcessor extends AbstractProcessor {

private static final Pattern STRIP_BRACKETS = Pattern.compile("(^[\\(\\[<\"'])|([\\]\\)>\"']$)");

private final String field;
private final TemplateScript.Factory field;
private final String fieldSplit;
private final String valueSplit;
private final Set<String> includeKeys;
private final Set<String> excludeKeys;
private final String targetField;
private final TemplateScript.Factory targetField;
private final boolean ignoreMissing;
private final Consumer<IngestDocument> execution;

KeyValueProcessor(
String tag,
String description,
String field,
TemplateScript.Factory field,
String fieldSplit,
String valueSplit,
Set<String> includeKeys,
Set<String> excludeKeys,
String targetField,
TemplateScript.Factory targetField,
boolean ignoreMissing,
String trimKey,
String trimValue,
Expand Down Expand Up @@ -106,10 +109,10 @@ public final class KeyValueProcessor extends AbstractProcessor {
private static Consumer<IngestDocument> buildExecution(
String fieldSplit,
String valueSplit,
String field,
TemplateScript.Factory field,
Set<String> includeKeys,
Set<String> excludeKeys,
String targetField,
TemplateScript.Factory targetField,
boolean ignoreMissing,
String trimKey,
String trimValue,
Expand All @@ -130,41 +133,62 @@ private static Consumer<IngestDocument> buildExecution(
keyFilter = key -> includeKeys.contains(key) && excludeKeys.contains(key) == false;
}
}
final String fieldPathPrefix;
String keyPrefix = prefix == null ? "" : prefix;
if (targetField == null) {
fieldPathPrefix = keyPrefix;
} else {
fieldPathPrefix = targetField + "." + keyPrefix;
}
final Function<String, String> keyPrefixer;
if (fieldPathPrefix.isEmpty()) {
keyPrefixer = val -> val;
} else {
keyPrefixer = val -> fieldPathPrefix + val;
}
final Function<String, String[]> fieldSplitter = buildSplitter(fieldSplit, true);
Function<String, String[]> valueSplitter = buildSplitter(valueSplit, false);
final Function<String, String> keyTrimmer = buildTrimmer(trimKey);
final Function<String, String> bracketStrip;
if (stripBrackets) {
bracketStrip = val -> STRIP_BRACKETS.matcher(val).replaceAll("");
} else {
bracketStrip = val -> val;
}
final Function<String, String> valueTrimmer = buildTrimmer(trimValue);

return document -> {
String value = document.getFieldValue(field, String.class, ignoreMissing);
final String fieldPathPrefix;
String keyPrefix = prefix == null ? "" : prefix;
if (targetField != null) {
String targetFieldPath = document.renderTemplate(targetField);
if (!Strings.isNullOrEmpty((targetFieldPath))) {
fieldPathPrefix = targetFieldPath + "." + keyPrefix;
} else {
fieldPathPrefix = keyPrefix;
}
} else {
fieldPathPrefix = keyPrefix;
}

final Function<String, String> keyPrefixer;
if (fieldPathPrefix.isEmpty()) {
keyPrefixer = val -> val;
} else {
keyPrefixer = val -> fieldPathPrefix + val;
}
final Function<String, String[]> fieldSplitter = buildSplitter(fieldSplit, true);
Function<String, String[]> valueSplitter = buildSplitter(valueSplit, false);
final Function<String, String> keyTrimmer = buildTrimmer(trimKey);
final Function<String, String> bracketStrip;
if (stripBrackets) {
bracketStrip = val -> STRIP_BRACKETS.matcher(val).replaceAll("");
} else {
bracketStrip = val -> val;
}
final Function<String, String> valueTrimmer = buildTrimmer(trimValue);

String path = document.renderTemplate(field);
final boolean fieldPathNullOrEmpty = Strings.isNullOrEmpty(path);
if (fieldPathNullOrEmpty || document.hasField(path, true) == false) {
if (ignoreMissing) {
return;
} else if (fieldPathNullOrEmpty) {
throw new IllegalArgumentException("field path cannot be null nor empty");
} else {
throw new IllegalArgumentException("field [" + path + "] doesn't exist");
}
}

String value = document.getFieldValue(path, String.class, ignoreMissing);
if (value == null) {
if (ignoreMissing) {
return;
}
throw new IllegalArgumentException("field [" + field + "] is null, cannot extract key-value pairs.");
throw new IllegalArgumentException("field [" + path + "] is null, cannot extract key-value pairs. ");
}

for (String part : fieldSplitter.apply(value)) {
String[] kv = valueSplitter.apply(part);
if (kv.length != 2) {
throw new IllegalArgumentException("field [" + field + "] does not contain value_split [" + valueSplit + "]");
throw new IllegalArgumentException("field [" + path + "] does not contain value_split [" + valueSplit + "]");
}
String key = keyTrimmer.apply(kv[0]);
if (keyFilter.test(key)) {
Expand Down Expand Up @@ -193,7 +217,7 @@ private static Function<String, String[]> buildSplitter(String split, boolean fi
}
}

String getField() {
TemplateScript.Factory getField() {
return field;
}

Expand All @@ -213,7 +237,7 @@ Set<String> getExcludeKeys() {
return excludeKeys;
}

String getTargetField() {
TemplateScript.Factory getTargetField() {
return targetField;
}

Expand Down Expand Up @@ -241,6 +265,12 @@ public String getType() {
}

public static class Factory implements Processor.Factory {
private final ScriptService scriptService;

public Factory(ScriptService scriptService) {
this.scriptService = scriptService;
}

@Override
public KeyValueProcessor create(
Map<String, Processor.Factory> registry,
Expand All @@ -249,7 +279,13 @@ public KeyValueProcessor create(
Map<String, Object> config
) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
TemplateScript.Factory fieldTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", field, scriptService);
String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field");
TemplateScript.Factory targetFieldTemplate = null;
if (!Strings.isNullOrEmpty(targetField)) {
targetFieldTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag, "target_field", targetField, scriptService);
}

String fieldSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field_split");
String valueSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "value_split");
String trimKey = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "trim_key");
Expand All @@ -270,12 +306,12 @@ public KeyValueProcessor create(
return new KeyValueProcessor(
processorTag,
description,
field,
fieldTemplate,
fieldSplit,
valueSplit,
includeKeys,
excludeKeys,
targetField,
targetFieldTemplate,
ignoreMissing,
trimKey,
trimValue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchParseException;
import org.opensearch.common.util.set.Sets;
import org.opensearch.ingest.TestTemplateService;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -48,16 +50,22 @@

public class KeyValueProcessorFactoryTests extends OpenSearchTestCase {

private KeyValueProcessor.Factory factory;

@Before
public void init() {
factory = new KeyValueProcessor.Factory(TestTemplateService.instance());
}

public void testCreateWithDefaults() throws Exception {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("field_split", "&");
config.put("value_split", "=");
String processorTag = randomAlphaOfLength(10);
KeyValueProcessor processor = factory.create(null, processorTag, null, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("field1"));
assertThat(processor.getField().newInstance(Collections.emptyMap()).execute(), equalTo("field1"));
assertThat(processor.getFieldSplit(), equalTo("&"));
assertThat(processor.getValueSplit(), equalTo("="));
assertThat(processor.getIncludeKeys(), is(nullValue()));
Expand All @@ -66,7 +74,6 @@ public void testCreateWithDefaults() throws Exception {
}

public void testCreateWithAllFieldsSet() throws Exception {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("field_split", "&");
Expand All @@ -78,17 +85,16 @@ public void testCreateWithAllFieldsSet() throws Exception {
String processorTag = randomAlphaOfLength(10);
KeyValueProcessor processor = factory.create(null, processorTag, null, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("field1"));
assertThat(processor.getField().newInstance(Collections.emptyMap()).execute(), equalTo("field1"));
assertThat(processor.getFieldSplit(), equalTo("&"));
assertThat(processor.getValueSplit(), equalTo("="));
assertThat(processor.getIncludeKeys(), equalTo(Sets.newHashSet("a", "b")));
assertThat(processor.getExcludeKeys(), equalTo(Collections.emptySet()));
assertThat(processor.getTargetField(), equalTo("target"));
assertThat(processor.getTargetField().newInstance(Collections.emptyMap()).execute(), equalTo("target"));
assertTrue(processor.isIgnoreMissing());
}

public void testCreateWithMissingField() {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
String processorTag = randomAlphaOfLength(10);
OpenSearchException exception = expectThrows(
Expand All @@ -99,7 +105,6 @@ public void testCreateWithMissingField() {
}

public void testCreateWithMissingFieldSplit() {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
String processorTag = randomAlphaOfLength(10);
Expand All @@ -111,7 +116,6 @@ public void testCreateWithMissingFieldSplit() {
}

public void testCreateWithMissingValueSplit() {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("field_split", "&");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.ingest.IngestDocument;
import org.opensearch.ingest.Processor;
import org.opensearch.ingest.RandomDocumentPicks;
import org.opensearch.ingest.TestTemplateService;
import org.opensearch.test.OpenSearchTestCase;

import java.util.ArrayList;
Expand All @@ -51,7 +52,7 @@

public class KeyValueProcessorTests extends OpenSearchTestCase {

private static final KeyValueProcessor.Factory FACTORY = new KeyValueProcessor.Factory();
private static final KeyValueProcessor.Factory FACTORY = new KeyValueProcessor.Factory(TestTemplateService.instance());

public void test() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Expand Down Expand Up @@ -123,7 +124,12 @@ public void testMissingField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
Processor processor = createKvProcessor("unknown", "&", "=", null, null, "target", false);
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
assertThat(exception.getMessage(), equalTo("field [unknown] not present as part of path [unknown]"));
assertThat(exception.getMessage(), equalTo("field [unknown] doesn't exist"));

// when using template snippet, the resolved field path maybe empty
Processor processorWithEmptyFieldPath = createKvProcessor("", "&", "=", null, null, "target", false);
exception = expectThrows(IllegalArgumentException.class, () -> processorWithEmptyFieldPath.execute(ingestDocument));
assertThat(exception.getMessage(), equalTo("field path cannot be null nor empty"));
}

public void testNullValueWithIgnoreMissing() throws Exception {
Expand Down
Loading

0 comments on commit 966ae6e

Please sign in to comment.