Skip to content

Commit

Permalink
Split String Processor (#1167)
Browse files Browse the repository at this point in the history
Signed-off-by: Shivani Shukla <[email protected]>
  • Loading branch information
sshivanii authored Mar 11, 2022
1 parent 22870cf commit 5ebe995
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ public SubstituteConfig(final String source, final String from, final String to)
}
}

public static class SplitConfig {
public final String source;
public final String delimiter;

public SplitConfig(final String source, final String delimiter) {
this.source = source;
this.delimiter = delimiter;

}
}

public List<PluginModel> mapAttributes(final List<LogstashAttribute> logstashAttributes, final LogstashAttributesMappings logstashAttributesMappings) {
final List<PluginModel> models = new LinkedList<>();
final List<AddEntryConfig> adds = new LinkedList<>();
Expand All @@ -59,6 +70,7 @@ public List<PluginModel> mapAttributes(final List<LogstashAttribute> logstashAtt
final List<String> lowercases = new LinkedList<>();
final List<String> trims = new LinkedList<>();
final List<SubstituteConfig> substitutes = new LinkedList<>();
final List<SplitConfig> splits = new LinkedList<>();

for(final LogstashAttribute attr : logstashAttributes) {
final String name = attr.getAttributeName();
Expand Down Expand Up @@ -95,6 +107,10 @@ public List<PluginModel> mapAttributes(final List<LogstashAttribute> logstashAtt
final SubstituteConfig newConfig = new SubstituteConfig(gsubSource, gsubPatternToReplace, gsubStringToReplaceWith);
substitutes.add(newConfig);
}
} else if(Objects.equals(name, "split_string")) {
((Map<String, String>) attr.getAttributeValue().getValue()).forEach(
(key, value) -> splits.add(new SplitConfig(NestedSyntaxConverter.convertNestedSyntaxToJsonPointer(key),
NestedSyntaxConverter.convertNestedSyntaxToJsonPointer(value))));
}
}

Expand Down Expand Up @@ -170,6 +186,15 @@ public List<PluginModel> mapAttributes(final List<LogstashAttribute> logstashAtt
models.add(substituteModel);
}

if(!splits.isEmpty()) {
final Map<String, Object> splitStringMap = new HashMap<>();
splitStringMap.put("entries", splits);

final PluginModel splitStringModel = new PluginModel("split_string", splitStringMap);

models.add(splitStringModel);
}

return models;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ filter {
lowercase => ["lowercaseField"]
trim => ["trimField"]
gsub => ["source", "from", "to", "source2", "from2", "to2"]
split_string => { "hello,world" => ","}
}
}
output {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ logstash-converted-pipeline:
- source: "source2"
from: "from2"
to: "to2"
- split_string:
entries:
- source: "hello,world"
delimiter: ","
sink:
- opensearch:
hosts:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.processor.mutatestring;

import com.amazon.dataprepper.metrics.PluginMetrics;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor;
import com.amazon.dataprepper.model.event.Event;
import com.amazon.dataprepper.model.processor.Processor;

import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

@DataPrepperPlugin(name = "split_string", pluginType = Processor.class, pluginConfigurationType = SplitStringProcessorConfig.class)
public class SplitStringProcessor extends AbstractStringProcessor<SplitStringProcessorConfig.Entry> {

private final Map<String, Pattern> patternMap;

@DataPrepperPluginConstructor
public SplitStringProcessor(final PluginMetrics pluginMetrics, final StringProcessorConfig<SplitStringProcessorConfig.Entry> config) {
super(pluginMetrics, config);

patternMap = new HashMap<>();
for (SplitStringProcessorConfig.Entry entry: config.getIterativeConfig()) {
patternMap.put(entry.getDelimiter(), Pattern.compile(entry.getDelimiter()));
}
}

@Override
protected void performKeyAction(final Event recordEvent, final SplitStringProcessorConfig.Entry entry, final String value) {

final Pattern pattern = patternMap.get(entry.getDelimiter());
final String[] splitValue = pattern.split(value);
recordEvent.put(entry.getSource(), splitValue);
}

@Override
protected String getKey(final SplitStringProcessorConfig.Entry entry) {
return entry.getSource();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.processor.mutatestring;

import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;

import java.util.List;

public class SplitStringProcessorConfig {
public static class Entry {

@NotEmpty
@NotNull
private String source;

@NotEmpty
@NotNull
private String delimiter;

public String getSource() {
return source;
}

public String getDelimiter() {
return delimiter;
}

public Entry(final String source, final String delimiter) {
this.source = source;
this.delimiter = delimiter;
}

public Entry() {};

private List<Entry> entries;

public List<Entry> getEntries() {
return entries;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.processor.mutatestring;

import com.amazon.dataprepper.metrics.PluginMetrics;
import com.amazon.dataprepper.model.event.Event;
import com.amazon.dataprepper.model.event.JacksonEvent;
import com.amazon.dataprepper.model.record.Record;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class SplitStringProcessorTests {

@Mock
private PluginMetrics pluginMetrics;

@Mock
private StringProcessorConfig<SplitStringProcessorConfig.Entry> config;

private SplitStringProcessor createObjectUnderTest() {
return new SplitStringProcessor(pluginMetrics, config);
}

@ParameterizedTest
@ArgumentsSource(SplitStringArgumentsProvider.class)
void testSingleSplitProcessor(String message, List<String> splitMessage) {

when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", ",")));

final SplitStringProcessor splitStringProcessor = createObjectUnderTest();
final Record<Event> record = createEvent(message);
final List<Record<Event>> splitRecords = (List<Record<Event>>) splitStringProcessor.doExecute(Collections.singletonList(record));

assertThat(splitRecords.get(0).getData().get("message", Object.class), notNullValue());
assertThat(splitRecords.get(0).getData().get("message", Object.class), equalTo(splitMessage));
}

private SplitStringProcessorConfig.Entry createEntry(final String source, final String delimiter) {
return new SplitStringProcessorConfig.Entry(source, delimiter);
}

private Record<Event> createEvent(String message) {
final Map<String, Object> eventData = new HashMap<>();
eventData.put("message", message);
return new Record<>(JacksonEvent.builder()
.withEventType("event")
.withData(eventData)
.build());
}

static class SplitStringArgumentsProvider implements ArgumentsProvider {

@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
return Stream.of(
Arguments.arguments("hello,world,no-split", Arrays.asList("hello","world","no-split")),
Arguments.arguments("hello,world", Arrays.asList("hello", "world")),
Arguments.arguments("hello,,world", Arrays.asList("hello","","world")),
Arguments.arguments("hello,", Arrays.asList("hello")),
Arguments.arguments("hello,,", Arrays.asList("hello")),
Arguments.arguments(",hello", Arrays.asList("","hello")),
Arguments.arguments(",,hello", Arrays.asList("","","hello"))
);
}
}

}

0 comments on commit 5ebe995

Please sign in to comment.