Skip to content

Commit

Permalink
Enhance copy_values processor to selectively copy entries from lists (#…
Browse files Browse the repository at this point in the history
…4085)

* initial experiment

Signed-off-by: Hai Yan <[email protected]>

* Add list copy options

Signed-off-by: Hai Yan <[email protected]>

* Remove unused imports

Signed-off-by: Hai Yan <[email protected]>

* Extract shouldCopyEntry method

Signed-off-by: Hai Yan <[email protected]>

---------

Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh authored Feb 9, 2024
1 parent a1d7091 commit bd9ae27
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,47 +13,94 @@
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

@DataPrepperPlugin(name = "copy_values", pluginType = Processor.class, pluginConfigurationType = CopyValueProcessorConfig.class)
public class CopyValueProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(CopyValueProcessor.class);
private final CopyValueProcessorConfig config;
private final List<CopyValueProcessorConfig.Entry> entries;

private final ExpressionEvaluator expressionEvaluator;

@DataPrepperPluginConstructor
public CopyValueProcessor(final PluginMetrics pluginMetrics, final CopyValueProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.config = config;
this.entries = config.getEntries();
this.expressionEvaluator = expressionEvaluator;
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
for(final Record<Event> record : records) {
final Event recordEvent = record.getData();
for(CopyValueProcessorConfig.Entry entry : entries) {
if (Objects.nonNull(entry.getCopyWhen()) && !expressionEvaluator.evaluateConditional(entry.getCopyWhen(), recordEvent)) {
continue;
}
try {
final Event recordEvent = record.getData();
if (config.getFromList() != null || config.getToList() != null) {
// Copying entries between lists
if (recordEvent.containsKey(config.getToList()) && !config.getOverwriteIfToListExists()) {
continue;
}

if (entry.getFromKey().equals(entry.getToKey()) || !recordEvent.containsKey(entry.getFromKey())) {
continue;
}
final List<Map<String, Object>> sourceList = recordEvent.get(config.getFromList(), List.class);
final List<Map<String, Object>> targetList = new ArrayList<>();

if (!recordEvent.containsKey(entry.getToKey()) || entry.getOverwriteIfToKeyExists()) {
final Object source = recordEvent.get(entry.getFromKey(), Object.class);
recordEvent.put(entry.getToKey(), source);
final Map<CopyValueProcessorConfig.Entry, Boolean> whenConditions = new HashMap<>();
for (final CopyValueProcessorConfig.Entry entry : entries) {
if (Objects.nonNull(entry.getCopyWhen()) && !expressionEvaluator.evaluateConditional(entry.getCopyWhen(), recordEvent)) {
whenConditions.put(entry, Boolean.FALSE);
} else {
whenConditions.put(entry, Boolean.TRUE);
}
}
for (final Map<String, Object> sourceField : sourceList) {
final Map<String, Object> targetItem = new HashMap<>();
for (final CopyValueProcessorConfig.Entry entry : entries) {
if (!whenConditions.get(entry) || !sourceField.containsKey(entry.getFromKey())) {
continue;
}
targetItem.put(entry.getToKey(), sourceField.get(entry.getFromKey()));
}
targetList.add(targetItem);
}
recordEvent.put(config.getToList(), targetList);
} else {
// Copying individual entries
for (final CopyValueProcessorConfig.Entry entry : entries) {
if (shouldCopyEntry(entry, recordEvent)) {
final Object source = recordEvent.get(entry.getFromKey(), Object.class);
recordEvent.put(entry.getToKey(), source);
}
}
}
} catch (Exception e) {
LOG.error("Fail to perform copy values operation", e);
//TODO: add tagging on failure
}
}

return records;
}

private boolean shouldCopyEntry(final CopyValueProcessorConfig.Entry entry, final Event recordEvent) {
if (Objects.nonNull(entry.getCopyWhen()) && !expressionEvaluator.evaluateConditional(entry.getCopyWhen(), recordEvent)) {
return false;
}

if (entry.getFromKey().equals(entry.getToKey()) || !recordEvent.containsKey(entry.getFromKey())) {
return false;
}

return !recordEvent.containsKey(entry.getToKey()) || entry.getOverwriteIfToKeyExists();
}

@Override
public void prepareForShutdown() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;

import java.util.List;

public class CopyValueProcessorConfig {
public static class Entry {
public static class Entry {
@NotEmpty
@NotNull
@JsonProperty("from_key")
Expand Down Expand Up @@ -61,7 +62,33 @@ public Entry() {
@Valid
private List<Entry> entries;

@JsonProperty("from_list")
private String fromList;

@JsonProperty("to_list")
private String toList;

@JsonProperty("overwrite_if_to_list_exists")
private boolean overwriteIfToListExists = false;

@AssertTrue(message = "Both from_list and to_list should be specified when copying entries between lists.")
boolean isBothFromListAndToListProvided() {
return (fromList == null && toList == null) || (fromList != null && toList != null);
}

public List<Entry> getEntries() {
return entries;
}

public String getFromList() {
return fromList;
}

public String getToList() {
return toList;
}

public boolean getOverwriteIfToListExists() {
return overwriteIfToListExists;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@

package org.opensearch.dataprepper.plugins.processor.mutateevent;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -26,6 +27,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand All @@ -39,6 +41,13 @@ public class CopyValueProcessorTests {
@Mock
private ExpressionEvaluator expressionEvaluator;

@BeforeEach
void setUp() {
lenient().when(mockConfig.getFromList()).thenReturn(null);
lenient().when(mockConfig.getToList()).thenReturn(null);
lenient().when(mockConfig.getOverwriteIfToListExists()).thenReturn(false);
}

@Test
public void testSingleCopyProcessorTests() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", false, null)));
Expand Down Expand Up @@ -186,6 +195,106 @@ public void testKey_is_not_copied_when_copyWhen_returns_false() {
assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage"));
}

@Test
public void testCopyEntriesFromList() {
when(mockConfig.getFromList()).thenReturn("mylist");
when(mockConfig.getToList()).thenReturn("newlist");
when(mockConfig.getEntries()).thenReturn(createListOfEntries(
createEntry("name", "fruit_name", true, null),
createEntry("color", "fruit_color", true, null)
));

final CopyValueProcessor processor = createObjectUnderTest();
final Record<Event> record = getEventWithLists(List.of(
Map.of("name", "apple", "color", "red", "shape", "round"),
Map.of("name", "orange", "color", "orange", "shape", "round"),
Map.of("name", "banana", "color", "yellow", "shape", "curved")
));
final List<Record<Event>> resultRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));
final Record<Event> resultRecord = resultRecords.get(0);

assertThat(resultRecord.getData().containsKey("newlist"), is(true));
assertThat(resultRecord.getData().get("newlist", List.class), is(List.of(
Map.of("fruit_name", "apple", "fruit_color", "red"),
Map.of("fruit_name", "orange", "fruit_color", "orange"),
Map.of("fruit_name", "banana", "fruit_color", "yellow")
)));
}

@Test
public void testCopyEntriesFromListNotOverwriteByDefault() {
when(mockConfig.getFromList()).thenReturn("mylist");
when(mockConfig.getToList()).thenReturn("mylist");

final CopyValueProcessor processor = createObjectUnderTest();
final Record<Event> record = getEventWithLists(List.of(
Map.of("name", "apple", "color", "red", "shape", "round"),
Map.of("name", "orange", "color", "orange", "shape", "round"),
Map.of("name", "banana", "color", "yellow", "shape", "curved")
));
final List<Record<Event>> resultRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));
final Record<Event> resultRecord = resultRecords.get(0);

assertThat(resultRecord.getData().containsKey("newlist"), is(false));
}

@Test
public void testCopyEntriesFromListOverwritesExistingList() {
when(mockConfig.getFromList()).thenReturn("mylist");
when(mockConfig.getToList()).thenReturn("mylist");
when(mockConfig.getOverwriteIfToListExists()).thenReturn(true);
when(mockConfig.getEntries()).thenReturn(createListOfEntries(
createEntry("name", "fruit_name", true, null),
createEntry("color", "fruit_color", true, null)
));

final CopyValueProcessor processor = createObjectUnderTest();
final Record<Event> record = getEventWithLists(List.of(
Map.of("name", "apple", "color", "red", "shape", "round"),
Map.of("name", "orange", "color", "orange", "shape", "round"),
Map.of("name", "banana", "color", "yellow", "shape", "curved")
));
final List<Record<Event>> resultRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));
final Record<Event> resultRecord = resultRecords.get(0);

assertThat(resultRecord.getData().containsKey("mylist"), is(true));
assertThat(resultRecord.getData().get("mylist", List.class), is(List.of(
Map.of("fruit_name", "apple", "fruit_color", "red"),
Map.of("fruit_name", "orange", "fruit_color", "orange"),
Map.of("fruit_name", "banana", "fruit_color", "yellow")
)));
}

@Test
public void testCopyEntriesFromListWithWhenConditions() {
when(mockConfig.getFromList()).thenReturn("mylist");
when(mockConfig.getToList()).thenReturn("mylist");
when(mockConfig.getOverwriteIfToListExists()).thenReturn(true);
final String copyWhen = UUID.randomUUID().toString();

when(mockConfig.getEntries()).thenReturn(createListOfEntries(
createEntry("name", "fruit_name", true, null),
createEntry("color", "fruit_color", true, copyWhen)
));

final CopyValueProcessor processor = createObjectUnderTest();
final Record<Event> record = getEventWithLists(List.of(
Map.of("name", "apple", "color", "red", "shape", "round"),
Map.of("name", "orange", "color", "orange", "shape", "round"),
Map.of("name", "banana", "color", "yellow", "shape", "curved")
));
when(expressionEvaluator.evaluateConditional(copyWhen, record.getData())).thenReturn(false);
final List<Record<Event>> resultRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));
final Record<Event> resultRecord = resultRecords.get(0);

assertThat(resultRecord.getData().containsKey("mylist"), is(true));
assertThat(resultRecord.getData().get("mylist", List.class), is(List.of(
Map.of("fruit_name", "apple"),
Map.of("fruit_name", "orange"),
Map.of("fruit_name", "banana")
)));
}

private CopyValueProcessor createObjectUnderTest() {
return new CopyValueProcessor(pluginMetrics, mockConfig, expressionEvaluator);
}
Expand All @@ -204,6 +313,11 @@ private Record<Event> getEvent(String message) {
return buildRecordWithEvent(testData);
}

private Record<Event> getEventWithLists(List<Map<String, Object>> testList) {
final Map<String, Object> testData = Map.of("mylist", testList);
return buildRecordWithEvent(testData);
}

private static Record<Event> buildRecordWithEvent(final Map<String, Object> data) {
return new Record<>(JacksonEvent.builder()
.withData(data)
Expand Down

0 comments on commit bd9ae27

Please sign in to comment.