Skip to content

Commit

Permalink
Update the mutate string processors to use the EventKey. #4646 (#4649)
Browse files Browse the repository at this point in the history
Change the source and keys properties for mutate string processors to use EventKey such that they are parsed by Data Prepper core. Also, use the TestEventFactory in the tests to avoid use of JacksonEvent directly. Removes an unused class.

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Jul 10, 2024
1 parent ce88765 commit 0f5e10f
Show file tree
Hide file tree
Showing 16 changed files with 107 additions and 85 deletions.
1 change: 1 addition & 0 deletions data-prepper-plugins/mutate-string-processors/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
testImplementation project(':data-prepper-test-event')
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.record.Record;

Expand Down Expand Up @@ -46,8 +47,8 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
private void performStringAction(final Event recordEvent)
{
try {
for(T entry : entries) {
final String key = getKey(entry);
for(final T entry : entries) {
final EventKey key = getKey(entry);

if(recordEvent.containsKey(key)) {
final Object value = recordEvent.get(key, Object.class);
Expand All @@ -64,7 +65,7 @@ private void performStringAction(final Event recordEvent)

protected abstract void performKeyAction(final Event recordEvent, final T entry, final String value);

protected abstract String getKey(final T entry);
protected abstract EventKey getKey(final T entry);

@Override
public void prepareForShutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.processor.Processor;

import java.util.Locale;
Expand All @@ -18,20 +19,20 @@
* no action is performed.
*/
@DataPrepperPlugin(name = "lowercase_string", pluginType = Processor.class, pluginConfigurationType = WithKeysConfig.class)
public class LowercaseStringProcessor extends AbstractStringProcessor<String> {
public class LowercaseStringProcessor extends AbstractStringProcessor<EventKey> {
@DataPrepperPluginConstructor
public LowercaseStringProcessor(final PluginMetrics pluginMetrics, final WithKeysConfig config) {
super(pluginMetrics, config);
}

@Override
protected void performKeyAction(final Event recordEvent, final String key, final String value)
protected void performKeyAction(final Event recordEvent, final EventKey key, final String value)
{
recordEvent.put(key, value.toLowerCase(Locale.ROOT));
}

@Override
protected String getKey(final String entry) {
protected EventKey getKey(final EventKey entry) {
return entry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.processor.Processor;

import java.util.HashMap;
Expand Down Expand Up @@ -64,7 +65,7 @@ protected void performKeyAction(final Event recordEvent, final SplitStringProces
}

@Override
protected String getKey(final SplitStringProcessorConfig.Entry entry) {
protected EventKey getKey(final SplitStringProcessorConfig.Entry entry) {
return entry.getSource();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import org.opensearch.dataprepper.model.event.EventKey;

import java.util.List;

Expand All @@ -19,7 +20,7 @@ public static class Entry {

@NotEmpty
@NotNull
private String source;
private EventKey source;

@JsonProperty("delimiter_regex")
private String delimiterRegex;
Expand All @@ -30,7 +31,7 @@ public static class Entry {
@JsonProperty("split_when")
private String splitWhen;

public String getSource() {
public EventKey getSource() {
return source;
}

Expand All @@ -44,7 +45,7 @@ public String getDelimiter() {

public String getSplitWhen() { return splitWhen; }

public Entry(final String source, final String delimiterRegex, final String delimiter, final String splitWhen) {
public Entry(final EventKey source, final String delimiterRegex, final String delimiter, final String splitWhen) {
this.source = source;
this.delimiterRegex = delimiterRegex;
this.delimiter = delimiter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.processor.Processor;

import java.util.HashMap;
Expand Down Expand Up @@ -51,7 +52,7 @@ protected void performKeyAction(final Event recordEvent, final SubstituteStringP
}

@Override
protected String getKey(final SubstituteStringProcessorConfig.Entry entry) {
protected EventKey getKey(final SubstituteStringProcessorConfig.Entry entry) {
return entry.getSource();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@
package org.opensearch.dataprepper.plugins.processor.mutatestring;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.opensearch.dataprepper.model.event.EventKey;

import java.util.List;

public class SubstituteStringProcessorConfig implements StringProcessorConfig<SubstituteStringProcessorConfig.Entry> {
public static class Entry {
private String source;
private EventKey source;
private String from;
private String to;

@JsonProperty("substitute_when")
private String substituteWhen;

public String getSource() {
public EventKey getSource() {
return source;
}

Expand All @@ -32,7 +33,7 @@ public String getTo() {

public String getSubstituteWhen() { return substituteWhen; }

public Entry(final String source, final String from, final String to, final String substituteWhen) {
public Entry(final EventKey source, final String from, final String to, final String substituteWhen) {
this.source = source;
this.from = from;
this.to = to;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,28 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.processor.Processor;

/**
* This processor takes in a key and changes its value to a string with the leading and trailing spaces trimmed.
* If the value is not a string, no action is performed.
*/
@DataPrepperPlugin(name = "trim_string", pluginType = Processor.class, pluginConfigurationType = WithKeysConfig.class)
public class TrimStringProcessor extends AbstractStringProcessor<String> {
public class TrimStringProcessor extends AbstractStringProcessor<EventKey> {
@DataPrepperPluginConstructor
public TrimStringProcessor(final PluginMetrics pluginMetrics, final WithKeysConfig config) {
super(pluginMetrics, config);
}

@Override
protected void performKeyAction(final Event recordEvent, final String key, final String value)
protected void performKeyAction(final Event recordEvent, final EventKey key, final String value)
{
recordEvent.put(key, value.trim());
}

@Override
protected String getKey(final String entry) {
protected EventKey getKey(final EventKey entry) {
return entry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.processor.Processor;

import java.util.Locale;
Expand All @@ -18,19 +19,19 @@
* no action is performed.
*/
@DataPrepperPlugin(name = "uppercase_string", pluginType = Processor.class, pluginConfigurationType = WithKeysConfig.class)
public class UppercaseStringProcessor extends AbstractStringProcessor<String> {
public class UppercaseStringProcessor extends AbstractStringProcessor<EventKey> {
@DataPrepperPluginConstructor
public UppercaseStringProcessor(final PluginMetrics pluginMetrics, final WithKeysConfig config) {
super(pluginMetrics, config);
}

@Override
protected String getKey(final String entry) {
protected EventKey getKey(final EventKey entry) {
return entry;
}

@Override
protected void performKeyAction(final Event recordEvent, final String entry, final String value)
protected void performKeyAction(final Event recordEvent, final EventKey entry, final String value)
{
recordEvent.put(entry, value.toUpperCase(Locale.ROOT));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,23 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.model.event.EventKey;

import java.util.List;

public class WithKeysConfig implements StringProcessorConfig<String> {
public class WithKeysConfig implements StringProcessorConfig<EventKey> {

@NotNull
@NotEmpty
@JsonProperty("with_keys")
private List<String> withKeys;
private List<EventKey> withKeys;

@Override
public List<String> getIterativeConfig() {
public List<EventKey> getIterativeConfig() {
return withKeys;
}

public List<String> getWithKeys() {
public List<EventKey> getWithKeys() {
return withKeys;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,26 @@

package org.opensearch.dataprepper.plugins.processor.mutatestring;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.event.TestEventFactory;
import org.opensearch.dataprepper.event.TestEventKeyFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventBuilder;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.record.Record;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
Expand All @@ -29,6 +34,9 @@

@ExtendWith(MockitoExtension.class)
public class LowercaseStringProcessorTests {
private static final EventFactory TEST_EVENT_FACTORY = TestEventFactory.getTestEventFactory();
private final EventKeyFactory eventKeyFactory = TestEventKeyFactory.getTestEventFactory();

@Mock
private PluginMetrics pluginMetrics;

Expand All @@ -37,7 +45,7 @@ public class LowercaseStringProcessorTests {

@BeforeEach
public void setup() {
lenient().when(config.getIterativeConfig()).thenReturn(Collections.singletonList("message"));
lenient().when(config.getIterativeConfig()).thenReturn(Stream.of("message").map(eventKeyFactory::createEventKey).collect(Collectors.toList()));
}

@Test
Expand All @@ -52,7 +60,7 @@ public void testHappyPathLowercaseStringProcessor() {

@Test
public void testHappyPathMultiLowercaseStringProcessor() {
when(config.getIterativeConfig()).thenReturn(Arrays.asList("message", "message2"));
when(config.getIterativeConfig()).thenReturn(Stream.of("message", "message2").map(eventKeyFactory::createEventKey).collect(Collectors.toList()));

final LowercaseStringProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("THISISAMESSAGE");
Expand All @@ -67,7 +75,7 @@ public void testHappyPathMultiLowercaseStringProcessor() {

@Test
public void testHappyPathMultiMixedLowercaseStringProcessor() {
lenient().when(config.getIterativeConfig()).thenReturn(Arrays.asList("message", "message2"));
lenient().when(config.getIterativeConfig()).thenReturn(Stream.of("message", "message2").map(eventKeyFactory::createEventKey).collect(Collectors.toList()));

final LowercaseStringProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("THISISAMESSAGE");
Expand Down Expand Up @@ -137,7 +145,7 @@ private Record<Event> getEvent(Object message) {
}

private static Record<Event> buildRecordWithEvent(final Map<String, Object> data) {
return new Record<>(JacksonEvent.builder()
return new Record<>(TEST_EVENT_FACTORY.eventBuilder(EventBuilder.class)
.withData(data)
.withEventType("event")
.build());
Expand Down
Loading

0 comments on commit 0f5e10f

Please sign in to comment.