From a84b462a58e103decde7b4e2409ce16f15c410f7 Mon Sep 17 00:00:00 2001 From: Hai Yan Date: Fri, 16 Feb 2024 15:42:42 -0600 Subject: [PATCH 1/3] Add append option Signed-off-by: Hai Yan --- .../mutateevent/AddEntryProcessor.java | 32 ++- .../mutateevent/AddEntryProcessorConfig.java | 14 ++ .../mutateevent/AddEntryProcessorTests.java | 182 +++++++++++++----- 3 files changed, 181 insertions(+), 47 deletions(-) diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java index daaa739e84..3d538a62f8 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java @@ -17,6 +17,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -67,12 +68,15 @@ public Collection> doExecute(final Collection> recor if (!Objects.isNull(key)) { if (!recordEvent.containsKey(key) || entry.getOverwriteIfKeyExists()) { recordEvent.put(key, value); + } else if (recordEvent.containsKey(key) && entry.getAppendIfKeyExists()) { + mergeValueToEvent(recordEvent, key, value); } } else { Map attributes = recordEvent.getMetadata().getAttributes(); if (!attributes.containsKey(metadataKey) || entry.getOverwriteIfKeyExists()) { recordEvent.getMetadata().setAttribute(metadataKey, value); - + } else if (attributes.containsKey(metadataKey) && entry.getAppendIfKeyExists()) { + mergeValueToEventMetadata(recordEvent, metadataKey, value); } } } catch (Exception e) { @@ -97,4 +101,30 @@ public boolean isReadyForShutdown() { @Override public void shutdown() { } + + private void mergeValueToEvent(final Event recordEvent, final String key, final Object value) { + final Object currentValue = recordEvent.get(key, Object.class); + final List mergedValue = new ArrayList<>(); + if (currentValue instanceof List) { + mergedValue.addAll((List) currentValue); + } else { + mergedValue.add(currentValue); + } + + mergedValue.add(value); + recordEvent.put(key, mergedValue); + } + + private void mergeValueToEventMetadata(final Event recordEvent, final String key, final Object value) { + final Object currentValue = recordEvent.getMetadata().getAttribute(key); + final List mergedValue = new ArrayList<>(); + if (currentValue instanceof List) { + mergedValue.addAll((List) currentValue); + } else { + mergedValue.add(currentValue); + } + + mergedValue.add(value); + recordEvent.getMetadata().setAttribute(key, mergedValue); + } } diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java index 90b114d62f..5ee75eddf5 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java @@ -34,6 +34,9 @@ public static class Entry { @JsonProperty("overwrite_if_key_exists") private boolean overwriteIfKeyExists = false; + @JsonProperty("append_if_key_exists") + private boolean appendIfKeyExists = false; + public String getKey() { return key; } @@ -58,6 +61,10 @@ public boolean getOverwriteIfKeyExists() { return overwriteIfKeyExists; } + public boolean getAppendIfKeyExists() { + return appendIfKeyExists; + } + public String getAddWhen() { return addWhen; } @AssertTrue(message = "Either value or format or expression must be specified, and only one of them can be specified") @@ -65,12 +72,18 @@ public boolean hasValueOrFormatOrExpression() { return Stream.of(value, format, valueExpression).filter(n -> n!=null).count() == 1; } + @AssertTrue(message = "overwrite_if_key_exists and append_if_key_exists can not be set at the same time.") + boolean overwriteAndAppendNotBothSet() { + return !(overwriteIfKeyExists && appendIfKeyExists); + } + public Entry(final String key, final String metadataKey, final Object value, final String format, final String valueExpression, final boolean overwriteIfKeyExists, + final boolean appendIfKeyExists, final String addWhen) { if (key != null && metadataKey != null) { @@ -85,6 +98,7 @@ public Entry(final String key, this.format = format; this.valueExpression = valueExpression; this.overwriteIfKeyExists = overwriteIfKeyExists; + this.appendIfKeyExists = appendIfKeyExists; this.addWhen = addWhen; } diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorTests.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorTests.java index b3363aca53..341bad79de 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorTests.java +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorTests.java @@ -15,14 +15,15 @@ import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.Random; +import java.util.UUID; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -47,7 +48,7 @@ public class AddEntryProcessorTests { @Test public void testSingleAddProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -61,8 +62,8 @@ public void testSingleAddProcessorTests() { @Test public void testMultiAddProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, null), - createEntry("message2", null, 4, null, null, false, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,null), + createEntry("message2", null, 4, null, null, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -78,7 +79,7 @@ public void testMultiAddProcessorTests() { @Test public void testSingleNoOverwriteAddProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -93,7 +94,7 @@ public void testSingleNoOverwriteAddProcessorTests() { @Test public void testSingleOverwriteAddProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, true, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, true, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -108,8 +109,8 @@ public void testSingleOverwriteAddProcessorTests() { @Test public void testMultiOverwriteMixedAddProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, true, null), - createEntry("message", null, 4, null, null, false, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, true, false,null), + createEntry("message", null, 4, null, null, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -122,9 +123,37 @@ public void testMultiOverwriteMixedAddProcessorTests() { assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage")); } + @Test + public void testAppendValueToExistingSimpleField() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", null, 3, null, null, false, true,null))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final String currentValue = "old_message"; + final Record record = getEvent(currentValue); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo(List.of(currentValue, 3))); + } + + @Test + public void testAppendValueToExistingListField() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", null, 3, null, null, false, true,null))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final List listValue = new ArrayList<>(); + final String currentItem = "old_message"; + listValue.add(currentItem); + final Record record = getEvent(listValue); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo(List.of(currentItem, 3))); + } + @Test public void testIntAddProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -138,7 +167,7 @@ public void testIntAddProcessorTests() { @Test public void testBoolAddProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, true, null, null, false, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, true, null, null, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -152,7 +181,7 @@ public void testBoolAddProcessorTests() { @Test public void testStringAddProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, "string", null, null, false, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, "string", null, null, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -166,7 +195,7 @@ public void testStringAddProcessorTests() { @Test public void testNullAddProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, null, null, null, false, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, null, null, null, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -192,7 +221,7 @@ public boolean equals(Object o) { public void testNestedAddProcessorTests() { TestObject obj = new TestObject(); obj.a = "test"; - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, obj, null, null, false, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, obj, null, null, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -207,7 +236,7 @@ public void testNestedAddProcessorTests() { @Test public void testArrayAddProcessorTests() { Object[] array = new Object[] { 1, 1.2, "string", true, null }; - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, array, null, null, false, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, array, null, null, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -222,7 +251,7 @@ public void testArrayAddProcessorTests() { @Test public void testFloatAddProcessorTests() { when(mockConfig.getEntries()) - .thenReturn(createListOfEntries(createEntry("newMessage", null, 1.2, null, null, false, null))); + .thenReturn(createListOfEntries(createEntry("newMessage", null, 1.2, null, null, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -237,7 +266,7 @@ public void testFloatAddProcessorTests() { @Test public void testAddSingleFormatEntry() { when(mockConfig.getEntries()) - .thenReturn(createListOfEntries(createEntry("date-time", null, null, TEST_FORMAT, null, false, null))); + .thenReturn(createListOfEntries(createEntry("date-time", null, null, TEST_FORMAT, null, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleFields(); @@ -252,8 +281,8 @@ public void testAddSingleFormatEntry() { @Test public void testAddMultipleFormatEntries() { when(mockConfig.getEntries()) - .thenReturn(createListOfEntries(createEntry("date-time", null, null, TEST_FORMAT, null, false, null), - createEntry("date-time2", null, null, ANOTHER_TEST_FORMAT, null, false, null))); + .thenReturn(createListOfEntries(createEntry("date-time", null, null, TEST_FORMAT, null, false, false,null), + createEntry("date-time2", null, null, ANOTHER_TEST_FORMAT, null, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleFields(); @@ -270,7 +299,7 @@ public void testAddMultipleFormatEntries() { public void testFormatOverwritesExistingEntry() { when(mockConfig.getEntries()) .thenReturn( - createListOfEntries(createEntry("time", null, null, TEST_FORMAT, null, true, null))); + createListOfEntries(createEntry("time", null, null, TEST_FORMAT, null, true, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleFields(); @@ -285,7 +314,7 @@ public void testFormatOverwritesExistingEntry() { public void testFormatNotOverwriteExistingEntry() { when(mockConfig.getEntries()) .thenReturn( - createListOfEntries(createEntry("time", null, null, TEST_FORMAT, null, false, null))); + createListOfEntries(createEntry("time", null, null, TEST_FORMAT, null, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleFields(); @@ -297,11 +326,40 @@ public void testFormatNotOverwriteExistingEntry() { assertThat(event.containsKey("date-time"), equalTo(false)); } + @Test + public void testAppendFormatValueToExistingSimpleField() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("time", null, 3, TEST_FORMAT, null, false, true,null))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final Record record = getTestEventWithMultipleFields(); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + Event event = editedRecords.get(0).getData(); + assertThat(event.get("date", Object.class), equalTo("date-value")); + assertThat(event.get("time", Object.class), equalTo(List.of("time-value", "date-value time-value"))); + } + + @Test + public void testAppendFormatValueToExistingListField() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("date-time", null, 3, TEST_FORMAT, null, false, true,null))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final Record record = getTestEventWithMultipleFields(); + final List listValue = new ArrayList<>(); + final String currentItem = "date-time-value-1"; + listValue.add(currentItem); + record.getData().put("date-time", listValue); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + Event event = editedRecords.get(0).getData(); + assertThat(event.get("date-time", Object.class), equalTo(List.of(currentItem, "date-value time-value"))); + } + @Test public void testFormatPrecedesValue() { when(mockConfig.getEntries()) .thenReturn( - createListOfEntries(createEntry("date-time", null, "date-time-value", TEST_FORMAT, null, false, null))); + createListOfEntries(createEntry("date-time", null, "date-time-value", TEST_FORMAT, null, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleFields(); @@ -316,7 +374,7 @@ public void testFormatPrecedesValue() { @Test public void testFormatVariousDataTypes() { when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry( - "newField", null, null, "${number-key}-${boolean-key}-${string-key}", null, false, null))); + "newField", null, null, "${number-key}-${boolean-key}-${string-key}", null, false, false, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleDataTypes(); @@ -328,7 +386,7 @@ public void testFormatVariousDataTypes() { @Test public void testBadFormatThenEntryNotAdded() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("data-time", null, null, BAD_TEST_FORMAT, null, false, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("data-time", null, null, BAD_TEST_FORMAT, null, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleFields(); @@ -342,7 +400,7 @@ public void testBadFormatThenEntryNotAdded() { @Test public void testMetadataKeySetWithBadFormatThenEntryNotAdded() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null,"data-time", null, BAD_TEST_FORMAT, null, false, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null,"data-time", null, BAD_TEST_FORMAT, null, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEventWithMetadata("message", Map.of("date", "date-value", "time", "time-value")); @@ -358,7 +416,7 @@ public void testMetadataKeySetWithBadFormatThenEntryNotAdded() { public void testKeyIsNotAdded_when_addWhen_condition_is_false() { final String addWhen = UUID.randomUUID().toString(); - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, addWhen))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,addWhen))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -375,7 +433,7 @@ public void testKeyIsNotAdded_when_addWhen_condition_is_false() { public void testMetadataKeyIsNotAdded_when_addWhen_condition_is_false() { final String addWhen = UUID.randomUUID().toString(); - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null, "newMessage", 3, null, null, false, addWhen))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null, "newMessage", 3, null, null, false, false,addWhen))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEventWithMetadata("thisisamessage", Map.of("key", "value")); @@ -393,9 +451,9 @@ public void testMetadataKeyIsNotAdded_when_addWhen_condition_is_false() { @Test public void testMetadataKeySetWithDifferentDataTypes() { when(mockConfig.getEntries()).thenReturn(createListOfEntries( - createEntry(null, "newField", "newValue", null, null, false, null), - createEntry(null, "newIntField", 123, null, null, false, null), - createEntry(null, "newBooleanField", true, null, null, false, null) + createEntry(null, "newField", "newValue", null, null, false, false,null), + createEntry(null, "newIntField", 123, null, null, false, false,null), + createEntry(null, "newBooleanField", true, null, null, false, false,null) )); final AddEntryProcessor processor = createObjectUnderTest(); @@ -412,7 +470,7 @@ public void testMetadataKeySetWithDifferentDataTypes() { public void testMetadataKeySetWithFormatNotOverwriteExistingEntry() { when(mockConfig.getEntries()) .thenReturn( - createListOfEntries(createEntry(null, "time", null, TEST_FORMAT, null, false, null))); + createListOfEntries(createEntry(null, "time", null, TEST_FORMAT, null, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEventWithMetadata("message", Map.of("date", "date-value", "time", "time-value")); @@ -428,7 +486,7 @@ public void testMetadataKeySetWithFormatNotOverwriteExistingEntry() { public void testMetadataKeySetWithFormatOverwriteExistingEntry() { when(mockConfig.getEntries()) .thenReturn( - createListOfEntries(createEntry(null, "time", null, TEST_FORMAT, null, true, null))); + createListOfEntries(createEntry(null, "time", null, TEST_FORMAT, null, true, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEventWithMetadata("message", Map.of("date", "date-value", "time", "time-value")); @@ -440,40 +498,72 @@ public void testMetadataKeySetWithFormatOverwriteExistingEntry() { assertThat(attributes.containsKey("date-time"), equalTo(false)); } + @Test + public void testMetadataKeySetAppendToExistingSimpleValue() { + when(mockConfig.getEntries()) + .thenReturn( + createListOfEntries(createEntry(null, "time", "time-value2", null, null, false, true,null))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final String currentValue = "time-value1"; + final Record record = getEventWithMetadata("message", Map.of("time", currentValue)); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + Map attributes = editedRecords.get(0).getData().getMetadata().getAttributes(); + assertThat(attributes.get("time"), equalTo(List.of(currentValue, "time-value2"))); + } + + @Test + public void testMetadataKeySetAppendToExistingListValue() { + when(mockConfig.getEntries()) + .thenReturn( + createListOfEntries(createEntry(null, "time", "time-value2", null, null, false, true,null))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final List listValue = new ArrayList<>(); + final String currentItem = "time-value1"; + listValue.add(currentItem); + final Record record = getEventWithMetadata("message", Map.of("time", listValue)); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + Map attributes = editedRecords.get(0).getData().getMetadata().getAttributes(); + assertThat(attributes.get("time"), equalTo(List.of(currentItem, "time-value2"))); + } + @Test public void testMetadataKeyAndKeyBothNotSetThrows() { - assertThrows(IllegalArgumentException.class, () -> createEntry(null, null, "newValue", null, null, false, null)); + assertThrows(IllegalArgumentException.class, () -> createEntry(null, null, "newValue", null, null, false, false,null)); } @Test public void testMetadataKeyAndKeyBothSetThrows() { - assertThrows(IllegalArgumentException.class, () -> createEntry("newKey", "newMetadataKey", "newValue", null, null, false, null)); + assertThrows(IllegalArgumentException.class, () -> createEntry("newKey", "newMetadataKey", "newValue", null, null, false, false,null)); } @Test public void testOnlyOneTypeOfValueIsSupported() { - assertThrows(RuntimeException.class, () -> createEntry("newKey", "newMetadataKey", "newValue", "/newFormat", null, false, null)); + assertThrows(RuntimeException.class, () -> createEntry("newKey", "newMetadataKey", "newValue", "/newFormat", null, false, false,null)); } @Test public void testOnlyOneTypeOfValueIsSupportedWithExpressionAndFormat() { - assertThrows(RuntimeException.class, () -> createEntry("newKey", "newMetadataKey", null, "/newFormat", "length(/message)", false, null)); + assertThrows(RuntimeException.class, () -> createEntry("newKey", "newMetadataKey", null, "/newFormat", "length(/message)", false, false,null)); } @Test public void testOnlyOneTypeOfValueIsSupportedWithValueAndExpressionAndFormat() { - assertThrows(RuntimeException.class, () -> createEntry("newKey", "newMetadataKey", "value", "/newFormat", "length(/message)", false, null)); + assertThrows(RuntimeException.class, () -> createEntry("newKey", "newMetadataKey", "value", "/newFormat", "length(/message)", false, false,null)); } @Test public void testWithAllValuesNull() { - assertThrows(RuntimeException.class, () -> createEntry("newKey", "newMetadataKey", null, null, null, false, null)); + assertThrows(RuntimeException.class, () -> createEntry("newKey", "newMetadataKey", null, null, null, false, false,null)); } @Test public void testValueExpressionWithArithmeticExpression() { String valueExpression = "/number-key"; - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("num_key", null, null, null, valueExpression, false, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("num_key", null, null, null, valueExpression, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleDataTypes(); Random random = new Random(); @@ -487,7 +577,7 @@ public void testValueExpressionWithArithmeticExpression() { @Test public void testValueExpressionWithStringExpression() { String valueExpression = "/string-key"; - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("num_key", null, null, null, valueExpression, false, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("num_key", null, null, null, valueExpression, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleDataTypes(); String randomString = UUID.randomUUID().toString(); @@ -500,7 +590,7 @@ public void testValueExpressionWithStringExpression() { @Test public void testValueExpressionWithBooleanExpression() { String valueExpression = "/number-key > 5"; - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("num_key", null, null, null, valueExpression, false, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("num_key", null, null, null, valueExpression, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleDataTypes(); when(expressionEvaluator.evaluate(valueExpression, record.getData())).thenReturn(false); @@ -512,7 +602,7 @@ public void testValueExpressionWithBooleanExpression() { @Test public void testValueExpressionWithIntegerFunctions() { String valueExpression = "length(/string-key)"; - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("length_key", null, null, null, valueExpression, false, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("length_key", null, null, null, valueExpression, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleDataTypes(); String randomString = UUID.randomUUID().toString(); @@ -525,7 +615,7 @@ public void testValueExpressionWithIntegerFunctions() { @Test public void testValueExpressionWithIntegerFunctionsAndMetadataKey() { String valueExpression = "length(/date)"; - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null, "length_key", null, null, valueExpression, false, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null, "length_key", null, null, valueExpression, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEventWithMetadata("message", Map.of("key", "value")); String randomString = UUID.randomUUID().toString(); @@ -538,7 +628,7 @@ public void testValueExpressionWithIntegerFunctionsAndMetadataKey() { @Test public void testValueExpressionWithStringExpressionWithMetadataKey() { String valueExpression = "/date"; - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null, "newkey", null, null, valueExpression, false, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null, "newkey", null, null, valueExpression, false, false,null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEventWithMetadata("message", Map.of("key", "value")); String randomString = UUID.randomUUID().toString(); @@ -553,15 +643,15 @@ private AddEntryProcessor createObjectUnderTest() { } private AddEntryProcessorConfig.Entry createEntry( - final String key, final String metadataKey, final Object value, final String format, final String valueExpression, final boolean overwriteIfKeyExists, final String addWhen) { - return new AddEntryProcessorConfig.Entry(key, metadataKey, value, format, valueExpression, overwriteIfKeyExists, addWhen); + final String key, final String metadataKey, final Object value, final String format, final String valueExpression, final boolean overwriteIfKeyExists, final boolean appendIfKeyExists, final String addWhen) { + return new AddEntryProcessorConfig.Entry(key, metadataKey, value, format, valueExpression, overwriteIfKeyExists, appendIfKeyExists, addWhen); } private List createListOfEntries(final AddEntryProcessorConfig.Entry... entries) { return new LinkedList<>(Arrays.asList(entries)); } - private Record getEvent(String message) { + private Record getEvent(Object message) { final Map testData = new HashMap<>(); testData.put("message", message); return buildRecordWithEvent(testData); From bd33c89099bd6ddb1a97634a73c22c9d970ca260 Mon Sep 17 00:00:00 2001 From: Hai Yan Date: Mon, 19 Feb 2024 11:58:06 -0600 Subject: [PATCH 2/3] Address comments: combine two mergeValue methods Signed-off-by: Hai Yan --- .../mutateevent/AddEntryProcessor.java | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java index 3d538a62f8..4096a2f58a 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Supplier; import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; @@ -103,20 +105,15 @@ public void shutdown() { } private void mergeValueToEvent(final Event recordEvent, final String key, final Object value) { - final Object currentValue = recordEvent.get(key, Object.class); - final List mergedValue = new ArrayList<>(); - if (currentValue instanceof List) { - mergedValue.addAll((List) currentValue); - } else { - mergedValue.add(currentValue); - } - - mergedValue.add(value); - recordEvent.put(key, mergedValue); + mergeValue(value, () -> recordEvent.get(key, Object.class), newValue -> recordEvent.put(key, newValue)); } private void mergeValueToEventMetadata(final Event recordEvent, final String key, final Object value) { - final Object currentValue = recordEvent.getMetadata().getAttribute(key); + mergeValue(value, () -> recordEvent.getMetadata().getAttribute(key), newValue -> recordEvent.getMetadata().setAttribute(key, newValue)); + } + + private void mergeValue(final Object value, Supplier getter, Consumer setter) { + final Object currentValue = getter.get(); final List mergedValue = new ArrayList<>(); if (currentValue instanceof List) { mergedValue.addAll((List) currentValue); @@ -125,6 +122,6 @@ private void mergeValueToEventMetadata(final Event recordEvent, final String key } mergedValue.add(value); - recordEvent.getMetadata().setAttribute(key, mergedValue); + setter.accept(mergedValue); } } From e89000cd4b104ac3bc0988e94aeec86ccf5f5adc Mon Sep 17 00:00:00 2001 From: Hai Yan Date: Tue, 20 Feb 2024 11:13:46 -0600 Subject: [PATCH 3/3] Address comments: update assertion message Signed-off-by: Hai Yan --- .../plugins/processor/mutateevent/AddEntryProcessorConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java index 5ee75eddf5..81f6bbab34 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java @@ -72,7 +72,7 @@ public boolean hasValueOrFormatOrExpression() { return Stream.of(value, format, valueExpression).filter(n -> n!=null).count() == 1; } - @AssertTrue(message = "overwrite_if_key_exists and append_if_key_exists can not be set at the same time.") + @AssertTrue(message = "overwrite_if_key_exists and append_if_key_exists can not be set to true at the same time.") boolean overwriteAndAppendNotBothSet() { return !(overwriteIfKeyExists && appendIfKeyExists); }