diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java index 0ccfa90baa..3891622127 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java @@ -5,10 +5,12 @@ package org.opensearch.dataprepper.plugins.processor.keyvalue; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; @@ -43,6 +45,8 @@ public class KeyValueProcessor extends AbstractProcessor, Record includeKeysSet = new HashSet(); @@ -61,7 +65,9 @@ public class KeyValueProcessor extends AbstractProcessor, Record tagsOnFailure; @DataPrepperPluginConstructor - public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProcessorConfig keyValueProcessorConfig) { + public KeyValueProcessor(final PluginMetrics pluginMetrics, + final KeyValueProcessorConfig keyValueProcessorConfig, + final ExpressionEvaluator expressionEvaluator) { super(pluginMetrics); this.keyValueProcessorConfig = keyValueProcessorConfig; @@ -184,6 +190,13 @@ public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProces if (keyValueProcessorConfig.getRemoveBrackets() && keyValueProcessorConfig.getRecursive()) { throw new IllegalArgumentException("Cannot remove brackets needed for determining levels of recursion"); } + + this.expressionEvaluator = expressionEvaluator; + if (keyValueProcessorConfig.getKeyValueWhen() != null + && !expressionEvaluator.isValidExpressionStatement(keyValueProcessorConfig.getKeyValueWhen())) { + throw new InvalidPluginConfigurationException( + String.format("key_value_when %s is not a valid expression statement", keyValueProcessorConfig.getKeyValueWhen())); + } } private String buildRegexFromCharacters(String s) { @@ -239,6 +252,11 @@ public Collection> doExecute(final Collection> recor final Event recordEvent = record.getData(); try { + + if (keyValueProcessorConfig.getKeyValueWhen() != null && !expressionEvaluator.evaluateConditional(keyValueProcessorConfig.getKeyValueWhen(), recordEvent)) { + continue; + } + final String groupsRaw = recordEvent.get(keyValueProcessorConfig.getSource(), String.class); if (groupsRaw == null) { continue; diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java index 4e3344483b..c92660a084 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java @@ -101,6 +101,9 @@ public class KeyValueProcessorConfig { @JsonProperty("overwrite_if_destination_exists") private boolean overwriteIfDestinationExists = true; + @JsonProperty("key_value_when") + private String keyValueWhen; + public String getSource() { return source; } @@ -180,4 +183,6 @@ public List getTagsOnFailure() { public boolean getOverwriteIfDestinationExists() { return overwriteIfDestinationExists; } + + public String getKeyValueWhen() { return keyValueWhen; } } diff --git a/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java b/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java index 429407877c..8b2b84eabd 100644 --- a/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java +++ b/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java @@ -5,10 +5,6 @@ package org.opensearch.dataprepper.plugins.processor.keyvalue; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.JacksonEvent; -import org.opensearch.dataprepper.model.record.Record; import org.hamcrest.CoreMatchers; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -17,6 +13,12 @@ import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.record.Record; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,6 +28,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.regex.PatternSyntaxException; import static org.hamcrest.CoreMatchers.equalTo; @@ -33,6 +36,8 @@ import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.when; @@ -46,6 +51,9 @@ public class KeyValueProcessorTests { @Mock private KeyValueProcessorConfig mockConfig; + @Mock + private ExpressionEvaluator expressionEvaluator; + private KeyValueProcessor keyValueProcessor; static Record buildRecordWithEvent(final Map data) { @@ -78,7 +86,28 @@ void setup() { lenient().when(mockConfig.getRecursive()).thenReturn(defaultConfig.getRecursive()); lenient().when(mockConfig.getOverwriteIfDestinationExists()).thenReturn(defaultConfig.getOverwriteIfDestinationExists()); - keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + final String keyValueWhen = UUID.randomUUID().toString(); + when(mockConfig.getKeyValueWhen()).thenReturn(keyValueWhen); + when(expressionEvaluator.isValidExpressionStatement(keyValueWhen)).thenReturn(true); + lenient().when(expressionEvaluator.evaluateConditional(eq(keyValueWhen), any(Event.class))).thenReturn(true); + + + keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig, expressionEvaluator); + } + + private KeyValueProcessor createObjectUnderTest() { + return new KeyValueProcessor(pluginMetrics, mockConfig, expressionEvaluator); + } + + @Test + void invalid_expression_statement_throws_InvalidPluginConfigurationException() { + final String keyValueWhen = UUID.randomUUID().toString(); + + when(mockConfig.getKeyValueWhen()).thenReturn(keyValueWhen); + + when(expressionEvaluator.isValidExpressionStatement(keyValueWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); } @Test @@ -91,6 +120,27 @@ void testSingleKvToObjectKeyValueProcessor() { assertThatKeyEquals(parsed_message, "key1", "value1"); } + @Test + void do_not_modify_event_when_the_expression_evaluation_returns_false() { + final String keyValueWhen = UUID.randomUUID().toString(); + when(mockConfig.getKeyValueWhen()).thenReturn(keyValueWhen); + when(expressionEvaluator.isValidExpressionStatement(keyValueWhen)).thenReturn(true); + when(expressionEvaluator.evaluateConditional(eq(keyValueWhen), any(Event.class))).thenReturn(false); + + + final KeyValueProcessor objectUnderTest = createObjectUnderTest(); + + final Record record = getMessage("key1=value1"); + final Map eventMap = record.getData().toMap(); + + final List> editedRecords = (List>) objectUnderTest.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.size(), equalTo(1)); + assertThat(editedRecords.get(0), notNullValue()); + assertThat(editedRecords.get(0).getData(), notNullValue()); + assertThat(editedRecords.get(0).getData().toMap(), equalTo(eventMap)); + } + @Test void testKeyValueProcessorWithoutMessage() { final Map testData = new HashMap(); @@ -189,7 +239,7 @@ void testSingleRegexFieldDelimiterKvToObjectKeyValueProcessor() { when(mockConfig.getFieldDelimiterRegex()).thenReturn(":_*:"); when(mockConfig.getFieldSplitCharacters()).thenReturn(null); - keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + keyValueProcessor = createObjectUnderTest(); final Record record = getMessage("key1=value1:_____:key2=value2"); final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); @@ -205,14 +255,14 @@ void testSingleRegexFieldDelimiterKvToObjectKeyValueProcessor() { void testBothKeyValuesDefinedErrorKeyValueProcessor() { when(mockConfig.getKeyValueDelimiterRegex()).thenReturn(":\\+*:"); - assertThrows(IllegalArgumentException.class, () -> new KeyValueProcessor(pluginMetrics, mockConfig)); + assertThrows(IllegalArgumentException.class, this::createObjectUnderTest); } @Test void testBothFieldsDefinedErrorKeyValueProcessor() { when(mockConfig.getFieldDelimiterRegex()).thenReturn(":\\+*:"); - assertThrows(IllegalArgumentException.class, () -> new KeyValueProcessor(pluginMetrics, mockConfig)); + assertThrows(IllegalArgumentException.class, this::createObjectUnderTest); } @Test @@ -220,7 +270,7 @@ void testSingleRegexKvDelimiterKvToObjectKeyValueProcessor() { when(mockConfig.getKeyValueDelimiterRegex()).thenReturn(":\\+*:"); when(mockConfig.getValueSplitCharacters()).thenReturn(null); - keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + keyValueProcessor = createObjectUnderTest(); final Record record = getMessage("key1:++:value1&key2:+:value2"); final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); @@ -236,7 +286,7 @@ void testBadKeyValueDelimiterRegexKeyValueProcessor() { when(mockConfig.getKeyValueDelimiterRegex()).thenReturn("["); when(mockConfig.getValueSplitCharacters()).thenReturn(null); - PatternSyntaxException e = assertThrows(PatternSyntaxException.class, () -> new KeyValueProcessor(pluginMetrics, mockConfig)); + PatternSyntaxException e = assertThrows(PatternSyntaxException.class, this::createObjectUnderTest); assertThat(e.getMessage(), CoreMatchers.startsWith("key_value_delimiter")); } @@ -245,21 +295,21 @@ void testBadFieldDelimiterRegexKeyValueProcessor() { when(mockConfig.getFieldDelimiterRegex()).thenReturn("["); when(mockConfig.getFieldSplitCharacters()).thenReturn(null); - PatternSyntaxException e = assertThrows(PatternSyntaxException.class, () -> new KeyValueProcessor(pluginMetrics, mockConfig)); + PatternSyntaxException e = assertThrows(PatternSyntaxException.class, this::createObjectUnderTest); assertThat(e.getMessage(), CoreMatchers.startsWith("field_delimiter")); } @Test void testBadDeleteKeyRegexKeyValueProcessor() { when(mockConfig.getDeleteKeyRegex()).thenReturn("["); - PatternSyntaxException e = assertThrows(PatternSyntaxException.class, () -> new KeyValueProcessor(pluginMetrics, mockConfig)); + PatternSyntaxException e = assertThrows(PatternSyntaxException.class, this::createObjectUnderTest); assertThat(e.getMessage(), CoreMatchers.startsWith("delete_key_regex")); } @Test void testBadDeleteValueRegexKeyValueProcessor() { when(mockConfig.getDeleteValueRegex()).thenReturn("["); - PatternSyntaxException e = assertThrows(PatternSyntaxException.class, () -> new KeyValueProcessor(pluginMetrics, mockConfig)); + PatternSyntaxException e = assertThrows(PatternSyntaxException.class, this::createObjectUnderTest); assertThat(e.getMessage(), CoreMatchers.startsWith("delete_value_regex")); } @@ -294,7 +344,7 @@ void testDuplicateKeyToArrayWithNonMatchValueProcessor() { @Test void testFieldSplitCharactersKeyValueProcessor() { when(mockConfig.getFieldSplitCharacters()).thenReturn("&!"); - keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + keyValueProcessor = createObjectUnderTest(); final Record record = getMessage("key1=value1&key1=value2!key1"); final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); @@ -312,7 +362,7 @@ void testFieldSplitCharactersKeyValueProcessor() { void testFieldSplitCharactersDoesntSupercedeDelimiterKeyValueProcessor() { when(mockConfig.getFieldDelimiterRegex()).thenReturn(":d+:"); when(mockConfig.getFieldSplitCharacters()).thenReturn(null); - keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + keyValueProcessor = createObjectUnderTest(); final Record record = getMessage("key1=value1:d:key1=value2:d:key1"); final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); @@ -330,7 +380,7 @@ void testFieldSplitCharactersDoesntSupercedeDelimiterKeyValueProcessor() { void testIncludeKeysKeyValueProcessor() { final List includeKeys = List.of("key2", "key3"); when(mockConfig.getIncludeKeys()).thenReturn(includeKeys); - keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + keyValueProcessor = createObjectUnderTest(); final Record record = getMessage("key1=value1&key2=value2&key3=value3"); final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); @@ -345,7 +395,7 @@ void testIncludeKeysKeyValueProcessor() { void testIncludeKeysNoMatchKeyValueProcessor() { final List includeKeys = Collections.singletonList("noMatch"); when(mockConfig.getIncludeKeys()).thenReturn(includeKeys); - keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + keyValueProcessor = createObjectUnderTest(); final Record record = getMessage("key1=value1&key2=value2"); final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); @@ -357,7 +407,7 @@ void testIncludeKeysNoMatchKeyValueProcessor() { @Test void testIncludeKeysAsDefaultKeyValueProcessor() { when(mockConfig.getIncludeKeys()).thenReturn(List.of()); - keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + keyValueProcessor = createObjectUnderTest(); final Record record = getMessage("key1=value1&key2=value2"); final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); @@ -372,7 +422,7 @@ void testIncludeKeysAsDefaultKeyValueProcessor() { void testExcludeKeysKeyValueProcessor() { final List excludeKeys = List.of("key2"); when(mockConfig.getExcludeKeys()).thenReturn(excludeKeys); - keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + keyValueProcessor = createObjectUnderTest(); final Record record = getMessage("key1=value1&key2=value2"); final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); @@ -385,7 +435,7 @@ void testExcludeKeysKeyValueProcessor() { @Test void testExcludeKeysAsDefaultKeyValueProcessor() { when(mockConfig.getExcludeKeys()).thenReturn(List.of()); - keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + keyValueProcessor = createObjectUnderTest(); final Record record = getMessage("key1=value1&key2=value2"); final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); @@ -403,14 +453,14 @@ void testIncludeExcludeKeysOverlapKeyValueProcessor() { when(mockConfig.getIncludeKeys()).thenReturn(includeKeys); when(mockConfig.getExcludeKeys()).thenReturn(excludeKeys); - assertThrows(IllegalArgumentException.class, () -> new KeyValueProcessor(pluginMetrics, mockConfig)); + assertThrows(IllegalArgumentException.class, this::createObjectUnderTest); } @Test void testDefaultKeysNoOverlapsBetweenEventKvProcessor() { final Map defaultMap = Map.of("dKey", "dValue"); when(mockConfig.getDefaultValues()).thenReturn(defaultMap); - keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + keyValueProcessor = createObjectUnderTest(); final Record record = getMessage("key1=value1"); final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); @@ -427,7 +477,7 @@ void testDefaultKeysAlreadyInMessageKvProcessor(boolean skipDuplicateValues) { final Map defaultMap = Map.of("dKey", "dValue"); when(mockConfig.getDefaultValues()).thenReturn(defaultMap); when(mockConfig.getSkipDuplicateValues()).thenReturn(skipDuplicateValues); - keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + keyValueProcessor = createObjectUnderTest(); final Record record = getMessage("key1=value1&dKey=abc"); final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); @@ -446,7 +496,7 @@ void testDefaultIncludeKeysOverlapKvProcessor(boolean skipDuplicateValues) { when(mockConfig.getDefaultValues()).thenReturn(defaultMap); when(mockConfig.getIncludeKeys()).thenReturn(includeKeys); when(mockConfig.getSkipDuplicateValues()).thenReturn(skipDuplicateValues); - keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + keyValueProcessor = createObjectUnderTest(); final Record record = getMessage("key1=value1&key2=value2"); final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); @@ -464,7 +514,7 @@ void testDefaultPrioritizeIncludeKeysKvProcessor(boolean skipDuplicateValues) { when(mockConfig.getDefaultValues()).thenReturn(defaultMap); when(mockConfig.getIncludeKeys()).thenReturn(includeKeys); when(mockConfig.getSkipDuplicateValues()).thenReturn(skipDuplicateValues); - keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + keyValueProcessor = createObjectUnderTest(); final Record record = getMessage("key1=value1&key2=abc"); final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); @@ -483,7 +533,7 @@ void testIncludeKeysNotInRecordMessageKvProcessor(boolean skipDuplicateValues) { when(mockConfig.getDefaultValues()).thenReturn(defaultMap); when(mockConfig.getIncludeKeys()).thenReturn(includeKeys); when(mockConfig.getSkipDuplicateValues()).thenReturn(skipDuplicateValues); - keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + keyValueProcessor = createObjectUnderTest(); final Record record = getMessage("key2=abc"); final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); @@ -500,7 +550,7 @@ void testDefaultExcludeKeysOverlapKeyValueProcessor() { when(mockConfig.getDefaultValues()).thenReturn(defaultMap); when(mockConfig.getExcludeKeys()).thenReturn(excludeKeys); - assertThrows(IllegalArgumentException.class, () -> new KeyValueProcessor(pluginMetrics, mockConfig)); + assertThrows(IllegalArgumentException.class, this::createObjectUnderTest); } @Test @@ -780,7 +830,7 @@ void testIncludeInnerKeyRecursiveKvProcessor() { final List includeKeys = List.of("item1-subitem1"); when(mockConfig.getRecursive()).thenReturn(true); when(mockConfig.getIncludeKeys()).thenReturn(includeKeys); - keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + keyValueProcessor = createObjectUnderTest(); final Record record = getMessage("item1=[item1-subitem1=item1-subitem1-value&item1-subitem2=item1-subitem2-value]&item2=item2-value"); final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); @@ -794,7 +844,7 @@ void testExcludeInnerKeyRecursiveKvProcessor() { final List excludeKeys = List.of("item1-subitem1"); when(mockConfig.getRecursive()).thenReturn(true); when(mockConfig.getExcludeKeys()).thenReturn(excludeKeys); - keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + keyValueProcessor = createObjectUnderTest(); final Record record = getMessage("item1=[item1-subitem1=item1-subitem1-value&item1-subitem2=item1-subitem2-value]&item2=item2-value"); final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); @@ -814,7 +864,7 @@ void testDefaultInnerKeyRecursiveKvProcessor() { final Map defaultMap = Map.of("item1-subitem1", "default"); when(mockConfig.getRecursive()).thenReturn(true); when(mockConfig.getDefaultValues()).thenReturn(defaultMap); - keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + keyValueProcessor = createObjectUnderTest(); final Record record = getMessage("item1=[item1-subitem1=item1-subitem1-value&item1-subitem2=item1-subitem2-value]&item2=item2-value"); final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); @@ -834,7 +884,7 @@ void testDefaultInnerKeyRecursiveKvProcessor() { void testTagsAddedWhenParsingFails() { when(mockConfig.getRecursive()).thenReturn(true); when(mockConfig.getTagsOnFailure()).thenReturn(List.of("tag1", "tag2")); - keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + keyValueProcessor = createObjectUnderTest(); final Record record = getMessage("item1=[]"); final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record));