diff --git a/data-prepper-plugins/mutate-event-processors/README.md b/data-prepper-plugins/mutate-event-processors/README.md new file mode 100644 index 0000000000..3baa382fc3 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/README.md @@ -0,0 +1,207 @@ +# Mutate Event Processors +The following is a list of processors available to mutate an event. + +___ + +##AddEntryProcessor +A processor that adds entries to an event + +###Basic Usage +To get started, create the following `pipeline.yaml`. +```yaml +pipeline: + source: + file: + path: "/full/path/to/logs_json.log" + record_type: "event" + format: "json" + processor: + - add_entries: + entries: + - key: "newMessage" + value: 3 + overwrite_if_key_exists: true + sink: + - stdout: +``` + +Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file. + +```json +{"message": "value"} +``` + +When run, the processor will parse the message into the following output: + +```json +{"message": "value", "newMessage": 3} +``` + +> If `newMessage` had already existed, its existing value would have been overwritten with `3` + +###Configuration +* `entries` - (required) - A list of entries to add to an event + * `key` - (required) - The key of the new entry to be added + * `value` - (required) - The value of the new entry to be added. Strings, booleans, numbers, null, nested objects, and arrays containing the aforementioned data types are valid to use + * `overwrite_if_key_exists` - (optional) - When set to `true`, if `key` already exists in the event, then the existing value will be overwritten. The default is `false`. + +___ + +##CopyValueProcessor +A processor that copies values within an event + +###Basic Usage +To get started, create the following `pipeline.yaml`. +```yaml +pipeline: + source: + file: + path: "/full/path/to/logs_json.log" + record_type: "event" + format: "json" + processor: + - copy_values: + entries: + - from_key: "message" + to_key: "newMessage" + overwrite_if_to_key_exists: true + sink: + - stdout: +``` + +Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file. + +```json +{"message": "value"} +``` + +When run, the processor will parse the message into the following output: + +```json +{"message": "value", "newMessage": "value"} +``` + +> If `newMessage` had already existed, its existing value would have been overwritten with `value` + +###Configuration +* `entries` - (required) - A list of entries to be copied in an event + * `from_key` - (required) - The key of the entry to be copied + * `to_key` - (required) - The key of the new entry to be added + * `overwrite_if_to_key_exists` - (optional) - When set to `true`, if `to_key` already exists in the event, then the existing value will be overwritten. The default is `false`. + +___ + +##DeleteEntryProcessor +A processor that deletes entries in an event + +###Basic Usage +To get started, create the following `pipeline.yaml`. +```yaml +pipeline: + source: + file: + path: "/full/path/to/logs_json.log" + record_type: "event" + format: "json" + processor: + - delete_entries: + with_keys: ["message"] + sink: + - stdout: +``` + +Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file. + +```json +{"message": "value", "message2": "value2"} +``` + +When run, the processor will parse the message into the following output: + +```json +{"message2": "value2"} +``` + +> If `message` had not existed in the event, then nothing would have happened + +###Configuration +* `with_keys` - (required) - An array of keys of the entries to be deleted + +___ + +##RenameKeyProcessor +A processor that renames keys in an event + +###Basic Usage +To get started, create the following `pipeline.yaml`. +```yaml +pipeline: + source: + file: + path: "/full/path/to/logs_json.log" + record_type: "event" + format: "json" + processor: + - rename_keys: + entries: + - from_key: "message" + to_key: "newMessage" + overwrite_if_to_key_exists: true + sink: + - stdout: +``` + +Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file. + +```json +{"message": "value"} +``` + +When run, the processor will parse the message into the following output: + +```json +{"newMessage": "value"} +``` + +> If `newMessage` had already existed, its existing value would have been overwritten with `value` + +###Configuration +* `entries` - (required) - A list of entries to rename in an event + * `from_key` - (required) - The key of the entry to be renamed + * `to_key` - (required) - The new key of the entry + * `overwrite_if_to_key_exists` - (optional) - When set to `true`, if `to_key` already exists in the event, then the existing value will be overwritten. The default is `false`. + +###Special Consideration +The renaming operation occurs in the order defined. This means that chaining is implicit with the RenameKeyProcessor. Take the following `piplines.yaml` for example: +```yaml +pipeline: + source: + file: + path: "/full/path/to/logs_json.log" + record_type: "event" + format: "json" + processor: + - rename_key: + entries: + - from_key: "message" + to_key: "message2" + - from_key: "message2" + to_key: "message3" + sink: + - stdout: +``` + +Let the contents of `logs_json.log` be the following: +```json +{"message": "value"} +``` + +After the processor runs, this will be the output +```json +{"message3": "value"} +``` + +## Developer Guide +This plugin is compatible with Java 14. See +- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md) +- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/readme/monitoring.md) diff --git a/data-prepper-plugins/mutate-event-processors/build.gradle b/data-prepper-plugins/mutate-event-processors/build.gradle new file mode 100644 index 0000000000..3fbbc37254 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/build.gradle @@ -0,0 +1,25 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java' +} + +jacocoTestCoverageVerification { + dependsOn jacocoTestReport + violationRules { + rule { + limit { + minimum = 1.0 + } + } + } +} + +dependencies { + implementation project(':data-prepper-api') + implementation project(':data-prepper-plugins:common') + implementation 'com.fasterxml.jackson.core:jackson-databind' +} \ No newline at end of file diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java new file mode 100644 index 0000000000..5eb7af3967 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.processor.mutateevent; + +import com.amazon.dataprepper.metrics.PluginMetrics; +import com.amazon.dataprepper.model.annotations.DataPrepperPlugin; +import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor; +import com.amazon.dataprepper.model.event.Event; +import com.amazon.dataprepper.model.processor.AbstractProcessor; +import com.amazon.dataprepper.model.processor.Processor; +import com.amazon.dataprepper.model.record.Record; + +import java.util.Collection; +import java.util.List; + +@DataPrepperPlugin(name = "add_entries", pluginType = Processor.class, pluginConfigurationType = AddEntryProcessorConfig.class) +public class AddEntryProcessor extends AbstractProcessor, Record> { + private final List entries; + + @DataPrepperPluginConstructor + public AddEntryProcessor(final PluginMetrics pluginMetrics, final AddEntryProcessorConfig config) { + super(pluginMetrics); + this.entries = config.getEntries(); + } + + @Override + public Collection> doExecute(final Collection> records) { + for(final Record record : records) { + final Event recordEvent = record.getData(); + + for(AddEntryProcessorConfig.Entry entry : entries) { + if (!recordEvent.containsKey(entry.getKey()) || entry.getOverwriteIfKeyExists()) { + recordEvent.put(entry.getKey(), entry.getValue()); + } + } + } + + return records; + } + + @Override + public void prepareForShutdown() { + } + + @Override + public boolean isReadyForShutdown() { + return true; + } + + @Override + public void shutdown() { + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java new file mode 100644 index 0000000000..e167039ca6 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.processor.mutateevent; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; + +import java.util.List; + +public class AddEntryProcessorConfig { + public static class Entry { + @NotEmpty + @NotNull + private String key; + + @NotEmpty + @NotNull + private Object value; + + @JsonProperty("overwrite_if_key_exists") + private boolean overwriteIfKeyExists = false; + + public String getKey() { + return key; + } + + public Object getValue() { + return value; + } + + public boolean getOverwriteIfKeyExists() { + return overwriteIfKeyExists; + } + + public Entry(final String key, final Object value, final boolean overwriteIfKeyExists) + { + this.key = key; + this.value = value; + this.overwriteIfKeyExists = overwriteIfKeyExists; + } + + public Entry() { + + } + } + + @NotEmpty + @NotNull + private List entries; + + public List getEntries() { + return entries; + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/CopyValueProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/CopyValueProcessor.java new file mode 100644 index 0000000000..5693372931 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/CopyValueProcessor.java @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.processor.mutateevent; + +import com.amazon.dataprepper.metrics.PluginMetrics; +import com.amazon.dataprepper.model.annotations.DataPrepperPlugin; +import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor; +import com.amazon.dataprepper.model.event.Event; +import com.amazon.dataprepper.model.processor.AbstractProcessor; +import com.amazon.dataprepper.model.processor.Processor; +import com.amazon.dataprepper.model.record.Record; + +import java.util.Collection; +import java.util.List; + +@DataPrepperPlugin(name = "copy_values", pluginType = Processor.class, pluginConfigurationType = CopyValueProcessorConfig.class) +public class CopyValueProcessor extends AbstractProcessor, Record> { + private final List entries; + + @DataPrepperPluginConstructor + public CopyValueProcessor(final PluginMetrics pluginMetrics, final CopyValueProcessorConfig config) { + super(pluginMetrics); + this.entries = config.getEntries(); + } + + @Override + public Collection> doExecute(final Collection> records) { + for(final Record record : records) { + final Event recordEvent = record.getData(); + for(CopyValueProcessorConfig.Entry entry : entries) { + if (entry.getFromKey().equals(entry.getToKey()) || !recordEvent.containsKey(entry.getFromKey())) { + continue; + } + + if (!recordEvent.containsKey(entry.getToKey()) || entry.getOverwriteIfToKeyExists()) { + final Object source = recordEvent.get(entry.getFromKey(), Object.class); + recordEvent.put(entry.getToKey(), source); + } + } + } + + return records; + } + + @Override + public void prepareForShutdown() { + } + + @Override + public boolean isReadyForShutdown() { + return true; + } + + @Override + public void shutdown() { + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/CopyValueProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/CopyValueProcessorConfig.java new file mode 100644 index 0000000000..3279a8071d --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/CopyValueProcessorConfig.java @@ -0,0 +1,59 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.processor.mutateevent; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; + +import java.util.List; + +public class CopyValueProcessorConfig { + public static class Entry { + @NotEmpty + @NotNull + @JsonProperty("from_key") + private String fromKey; + + @NotEmpty + @NotNull + @JsonProperty("to_key") + private String toKey; + + @JsonProperty("overwrite_if_to_key_exists") + private boolean overwriteIfToKeyExists = false; + + public String getFromKey() { + return fromKey; + } + + public String getToKey() { + return toKey; + } + + public boolean getOverwriteIfToKeyExists() { + return overwriteIfToKeyExists; + } + + public Entry(final String fromKey, final String toKey, final boolean overwriteIfToKeyExists) { + this.fromKey = fromKey; + this.toKey = toKey; + this.overwriteIfToKeyExists = overwriteIfToKeyExists; + } + + public Entry() { + + } + } + + @NotEmpty + @NotNull + private List entries; + + public List getEntries() { + return entries; + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessor.java new file mode 100644 index 0000000000..a124bf0449 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessor.java @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.processor.mutateevent; + +import com.amazon.dataprepper.metrics.PluginMetrics; +import com.amazon.dataprepper.model.annotations.DataPrepperPlugin; +import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor; +import com.amazon.dataprepper.model.event.Event; +import com.amazon.dataprepper.model.processor.AbstractProcessor; +import com.amazon.dataprepper.model.processor.Processor; +import com.amazon.dataprepper.model.record.Record; + +import java.util.Collection; + +@DataPrepperPlugin(name = "delete_entries", pluginType = Processor.class, pluginConfigurationType = DeleteEntryProcessorConfig.class) +public class DeleteEntryProcessor extends AbstractProcessor, Record> { + private final String[] entries; + + @DataPrepperPluginConstructor + public DeleteEntryProcessor(final PluginMetrics pluginMetrics, final DeleteEntryProcessorConfig config) { + super(pluginMetrics); + this.entries = config.getWithKeys(); + } + + @Override + public Collection> doExecute(final Collection> records) { + for(final Record record : records) { + final Event recordEvent = record.getData(); + + for(String entry : entries) { + recordEvent.delete(entry); + } + } + + return records; + } + + @Override + public void prepareForShutdown() { + } + + @Override + public boolean isReadyForShutdown() { + return true; + } + + @Override + public void shutdown() { + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessorConfig.java new file mode 100644 index 0000000000..fc1f16a333 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessorConfig.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.processor.mutateevent; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; + +public class DeleteEntryProcessorConfig { + @NotEmpty + @NotNull + @JsonProperty("with_keys") + private String[] withKeys; + + public String[] getWithKeys() { + return withKeys; + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/RenameKeyProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/RenameKeyProcessor.java new file mode 100644 index 0000000000..d2e874e377 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/RenameKeyProcessor.java @@ -0,0 +1,62 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.processor.mutateevent; + +import com.amazon.dataprepper.metrics.PluginMetrics; +import com.amazon.dataprepper.model.annotations.DataPrepperPlugin; +import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor; +import com.amazon.dataprepper.model.event.Event; +import com.amazon.dataprepper.model.processor.AbstractProcessor; +import com.amazon.dataprepper.model.processor.Processor; +import com.amazon.dataprepper.model.record.Record; + +import java.util.Collection; +import java.util.List; + +@DataPrepperPlugin(name = "rename_keys", pluginType = Processor.class, pluginConfigurationType = RenameKeyProcessorConfig.class) +public class RenameKeyProcessor extends AbstractProcessor, Record> { + private final List entries; + + @DataPrepperPluginConstructor + public RenameKeyProcessor(final PluginMetrics pluginMetrics, final RenameKeyProcessorConfig config) { + super(pluginMetrics); + this.entries = config.getEntries(); + } + + @Override + public Collection> doExecute(final Collection> records) { + for(final Record record : records) { + final Event recordEvent = record.getData(); + + for(RenameKeyProcessorConfig.Entry entry : entries) { + if(entry.getFromKey().equals(entry.getToKey()) || !recordEvent.containsKey(entry.getFromKey())) { + continue; + } + + if (!recordEvent.containsKey(entry.getToKey()) || entry.getOverwriteIfToKeyExists()) { + final Object source = recordEvent.get(entry.getFromKey(), Object.class); + recordEvent.put(entry.getToKey(), source); + recordEvent.delete(entry.getFromKey()); + } + } + } + + return records; + } + + @Override + public void prepareForShutdown() { + } + + @Override + public boolean isReadyForShutdown() { + return true; + } + + @Override + public void shutdown() { + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/RenameKeyProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/RenameKeyProcessorConfig.java new file mode 100644 index 0000000000..4224ee81cc --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutateevent/RenameKeyProcessorConfig.java @@ -0,0 +1,59 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.processor.mutateevent; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; + +import java.util.List; + +public class RenameKeyProcessorConfig { + public static class Entry { + @NotEmpty + @NotNull + @JsonProperty("from_key") + private String fromKey; + + @NotEmpty + @NotNull + @JsonProperty("to_key") + private String toKey; + + @JsonProperty("overwrite_if_to_key_exists") + private boolean overwriteIfToKeyExists = false; + + public String getFromKey() { + return fromKey; + } + + public String getToKey() { + return toKey; + } + + public boolean getOverwriteIfToKeyExists() { + return overwriteIfToKeyExists; + } + + public Entry(final String fromKey, final String toKey, final boolean overwriteIfKeyExists) { + this.fromKey = fromKey; + this.toKey = toKey; + this.overwriteIfToKeyExists = overwriteIfKeyExists; + } + + public Entry() { + + } + } + + @NotEmpty + @NotNull + private List entries; + + public List getEntries() { + return entries; + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/com/amazon/dataprepper/plugins/processor/mutateevent/AddEntryProcessorTests.java b/data-prepper-plugins/mutate-event-processors/src/test/java/com/amazon/dataprepper/plugins/processor/mutateevent/AddEntryProcessorTests.java new file mode 100644 index 0000000000..5f67ae9a76 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/com/amazon/dataprepper/plugins/processor/mutateevent/AddEntryProcessorTests.java @@ -0,0 +1,249 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.processor.mutateevent; + +import com.amazon.dataprepper.metrics.PluginMetrics; +import com.amazon.dataprepper.model.event.Event; +import com.amazon.dataprepper.model.event.JacksonEvent; +import com.amazon.dataprepper.model.record.Record; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class AddEntryProcessorTests { + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private AddEntryProcessorConfig mockConfig; + + @Test + public void testSingleAddProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", 3, false))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage")); + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo(3)); + } + + @Test + public void testMultiAddProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", 3, false), + createEntry("message2", 4, false))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage")); + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo(3)); + assertThat(editedRecords.get(0).getData().containsKey("message2"), is(true)); + assertThat(editedRecords.get(0).getData().get("message2", Object.class), equalTo(4)); + } + + @Test + public void testSingleNoOverwriteAddProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", 3, false))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + record.getData().put("newMessage", "test"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage")); + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo("test")); + } + + @Test + public void testSingleOverwriteAddProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", 3, true))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + record.getData().put("newMessage", "test"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage")); + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo(3)); + } + + @Test + public void testMultiOverwriteMixedAddProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", 3, true), + (createEntry("message", 4, false)))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + record.getData().put("newMessage", "test"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo(3)); + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage")); + } + + @Test + public void testIntAddProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", 3, false))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage")); + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo(3)); + } + + @Test + public void testBoolAddProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", true, false))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage")); + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo(true)); + } + + @Test + public void testStringAddProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", "string", false))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage")); + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo("string")); + } + + @Test + public void testNullAddProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, false))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage")); + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo(null)); + } + + private static class TestObject { + public String a; + + @Override + public boolean equals(Object o) { + TestObject testObject = (TestObject) o; + return this.a == testObject.a; + } + } + + @Test + public void testNestedAddProcessorTests() { + TestObject obj = new TestObject(); + obj.a = "test"; + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", obj, false))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage")); + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().get("newMessage", TestObject.class), equalTo(obj)); + } + + @Test + public void testArrayAddProcessorTests() { + Object[] array = new Object[] { 1, 1.2, "string", true, null }; + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", array, false))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage")); + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().get("newMessage", Object[].class), equalTo(array)); + } + + @Test + public void testFloatAddProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", 1.2, false))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage")); + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo(1.2)); + } + + private AddEntryProcessor createObjectUnderTest() { + return new AddEntryProcessor(pluginMetrics, mockConfig); + } + + private AddEntryProcessorConfig.Entry createEntry(final String key, final Object value, final boolean overwriteIfKeyExists) { + return new AddEntryProcessorConfig.Entry(key, value, overwriteIfKeyExists); + } + + private List createListOfEntries(final AddEntryProcessorConfig.Entry... entries) { + return new LinkedList<>(Arrays.asList(entries)); + } + + private Record getEvent(String message) { + final Map testData = new HashMap(); + testData.put("message", message); + return buildRecordWithEvent(testData); + } + + private static Record buildRecordWithEvent(final Map data) { + return new Record<>(JacksonEvent.builder() + .withData(data) + .withEventType("event") + .build()); + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/com/amazon/dataprepper/plugins/processor/mutateevent/CopyValueProcessorTests.java b/data-prepper-plugins/mutate-event-processors/src/test/java/com/amazon/dataprepper/plugins/processor/mutateevent/CopyValueProcessorTests.java new file mode 100644 index 0000000000..c83e9981ca --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/com/amazon/dataprepper/plugins/processor/mutateevent/CopyValueProcessorTests.java @@ -0,0 +1,191 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.processor.mutateevent; + +import com.amazon.dataprepper.metrics.PluginMetrics; +import com.amazon.dataprepper.model.event.Event; +import com.amazon.dataprepper.model.event.JacksonEvent; +import com.amazon.dataprepper.model.record.Record; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class CopyValueProcessorTests { + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private CopyValueProcessorConfig mockConfig; + + @Test + public void testSingleCopyProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", false))); + + final CopyValueProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo("thisisamessage")); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage")); + } + + @Test + public void testMultiCopyProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", false), + createEntry("message2", "entry", false))); + + final CopyValueProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + record.getData().put("message2", "test"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo("thisisamessage")); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage")); + assertThat(editedRecords.get(0).getData().containsKey("entry"), is(true)); + assertThat(editedRecords.get(0).getData().containsKey("message2"), is(true)); + assertThat(editedRecords.get(0).getData().get("entry", Object.class), equalTo("test")); + assertThat(editedRecords.get(0).getData().get("message2", Object.class), equalTo("test")); + } + + @Test + public void testNoOverwriteSingleCopyProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", false))); + + final CopyValueProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + record.getData().put("newMessage", "test"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo("test")); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage")); + } + + private static class TestObject { + public String a; + + @Override + public boolean equals(Object o) { + if(o instanceof TestObject) { + TestObject testObject = (TestObject) o; + return this.a == testObject.a; + } + + return false; + } + } + + @Test + public void testNestedObjectCopyProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", true))); + + final CopyValueProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + TestObject data = new TestObject(); + data.a = "test"; + record.getData().put("message", data); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("newMessage", TestObject.class), equalTo(data)); + assertThat(editedRecords.get(0).getData().get("message", TestObject.class), equalTo(data)); + } + + @Test + public void testFromKeyDneCopyProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message2", "newMessage", false))); + + final CopyValueProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(false)); + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage")); + } + + @Test + public void testOverwriteSingleCopyProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", true))); + + final CopyValueProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + record.getData().put("newMessage", "test"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo("thisisamessage")); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage")); + } + + @Test + public void testOverwriteMixedSingleCopyProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", false), + createEntry("message2", "entry", true))); + + final CopyValueProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + record.getData().put("newMessage", "test"); + record.getData().put("message2", "test2"); + record.getData().put("entry", "test3"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().containsKey("message2"), is(true)); + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().containsKey("entry"), is(true)); + assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo("test")); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage")); + assertThat(editedRecords.get(0).getData().get("message2", Object.class), equalTo("test2")); + assertThat(editedRecords.get(0).getData().get("entry", Object.class), equalTo("test2")); + } + + private CopyValueProcessor createObjectUnderTest() { + return new CopyValueProcessor(pluginMetrics, mockConfig); + } + + private CopyValueProcessorConfig.Entry createEntry(final String fromKey, final String toKey, final boolean overwriteIfToKeyExists) { + return new CopyValueProcessorConfig.Entry(fromKey, toKey, overwriteIfToKeyExists); + } + + private List createListOfEntries(final CopyValueProcessorConfig.Entry... entries) { + return new LinkedList<>(Arrays.asList(entries)); + } + + private Record getEvent(String message) { + final Map testData = new HashMap(); + testData.put("message", message); + return buildRecordWithEvent(testData); + } + + private static Record buildRecordWithEvent(final Map data) { + return new Record<>(JacksonEvent.builder() + .withData(data) + .withEventType("event") + .build()); + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/com/amazon/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessorTests.java b/data-prepper-plugins/mutate-event-processors/src/test/java/com/amazon/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessorTests.java new file mode 100644 index 0000000000..a32694465d --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/com/amazon/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessorTests.java @@ -0,0 +1,92 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.processor.mutateevent; + +import com.amazon.dataprepper.metrics.PluginMetrics; +import com.amazon.dataprepper.model.event.Event; +import com.amazon.dataprepper.model.event.JacksonEvent; +import com.amazon.dataprepper.model.record.Record; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class DeleteEntryProcessorTests { + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private DeleteEntryProcessorConfig mockConfig; + + @Test + public void testSingleDeleteProcessorTest() { + when(mockConfig.getWithKeys()).thenReturn(new String[] { "message" }); + + final DeleteEntryProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + record.getData().put("newMessage", "test"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(false)); + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + } + + @Test + public void testWithKeyDneDeleteProcessorTest() { + when(mockConfig.getWithKeys()).thenReturn(new String[] { "message2" }); + + final DeleteEntryProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + record.getData().put("newMessage", "test"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().containsKey("message2"), is(false)); + } + + @Test + public void testMultiDeleteProcessorTest() { + when(mockConfig.getWithKeys()).thenReturn(new String[] { "message", "message2" }); + + final DeleteEntryProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + record.getData().put("message2", "test"); + record.getData().put("newMessage", "test"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(false)); + assertThat(editedRecords.get(0).getData().containsKey("message2"), is(false)); + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + } + + private DeleteEntryProcessor createObjectUnderTest() { + return new DeleteEntryProcessor(pluginMetrics, mockConfig); + } + + private Record getEvent(String message) { + final Map testData = new HashMap(); + testData.put("message", message); + return buildRecordWithEvent(testData); + } + + private static Record buildRecordWithEvent(final Map data) { + return new Record<>(JacksonEvent.builder() + .withData(data) + .withEventType("event") + .build()); + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/com/amazon/dataprepper/plugins/processor/mutateevent/RenameKeyProcessorTests.java b/data-prepper-plugins/mutate-event-processors/src/test/java/com/amazon/dataprepper/plugins/processor/mutateevent/RenameKeyProcessorTests.java new file mode 100644 index 0000000000..29c5b135f9 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/com/amazon/dataprepper/plugins/processor/mutateevent/RenameKeyProcessorTests.java @@ -0,0 +1,135 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.processor.mutateevent; + +import com.amazon.dataprepper.metrics.PluginMetrics; +import com.amazon.dataprepper.model.event.Event; +import com.amazon.dataprepper.model.event.JacksonEvent; +import com.amazon.dataprepper.model.record.Record; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class RenameKeyProcessorTests { + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private RenameKeyProcessorConfig mockConfig; + + @Test + public void testSingleOverwriteRenameProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", true))); + + final RenameKeyProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + record.getData().put("newMessage", "test2"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().containsKey("message"), is(false)); + assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo("thisisamessage")); + } + + @Test + public void testSingleNoOverwriteRenameProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", false))); + + final RenameKeyProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + record.getData().put("newMessage", "test2"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo("test2")); + } + + @Test + public void testFromKeyDneRenameProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message2", "newMessage", false))); + + final RenameKeyProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(false)); + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage")); + } + + @Test + public void testMultiMixedOverwriteRenameProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", true), + createEntry("message2", "existingMessage", false))); + + final RenameKeyProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + record.getData().put("newMessage", "test2"); + record.getData().put("existingMessage", "test3"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("existingMessage"), is(true)); + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().containsKey("message"), is(false)); + assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo("thisisamessage")); + assertThat(editedRecords.get(0).getData().get("existingMessage", Object.class), equalTo("test3")); + } + + @Test + public void testChainRenamingRenameProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", true), + createEntry("newMessage", "message3", true))); + + final RenameKeyProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message3"), is(true)); + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(false)); + assertThat(editedRecords.get(0).getData().containsKey("message"), is(false)); + assertThat(editedRecords.get(0).getData().get("message3", Object.class), equalTo("thisisamessage")); + } + + private RenameKeyProcessor createObjectUnderTest() { + return new RenameKeyProcessor(pluginMetrics, mockConfig); + } + + private RenameKeyProcessorConfig.Entry createEntry(final String fromKey, final String toKey, final boolean overwriteIfToKeyExists) { + return new RenameKeyProcessorConfig.Entry(fromKey, toKey, overwriteIfToKeyExists); + } + + private List createListOfEntries(final RenameKeyProcessorConfig.Entry... entries) { + return new LinkedList<>(Arrays.asList(entries)); + } + + private Record getEvent(String message) { + final Map testData = new HashMap(); + testData.put("message", message); + return buildRecordWithEvent(testData); + } + + private static Record buildRecordWithEvent(final Map data) { + return new Record<>(JacksonEvent.builder() + .withData(data) + .withEventType("event") + .build()); + } +} diff --git a/settings.gradle b/settings.gradle index 74f9dbf73d..9bc0f618c5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -40,6 +40,7 @@ include 'data-prepper-plugins:blocking-buffer' include 'data-prepper-plugins:http-source' include 'data-prepper-plugins:drop-events-processor' include 'data-prepper-plugins:key-value-processor' +include 'data-prepper-plugins:mutate-event-processors' include 'data-prepper-plugins:grok-prepper' include 'data-prepper-plugins:aggregate-processor' include 'data-prepper-logstash-configuration'