-
Notifications
You must be signed in to change notification settings - Fork 207
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add truncate string processor (#3924)
* Add truncate string processor Signed-off-by: Krishna Kondaka <[email protected]> * Addressed review comments Signed-off-by: Krishna Kondaka <[email protected]> * Added check for negative numbers in the config input Signed-off-by: Krishna Kondaka <[email protected]> * Fixed checkstyle error Signed-off-by: Krishna Kondaka <[email protected]> * Modified to make truncate processor a top level processor, not specific to strings Signed-off-by: Krishna Kondaka <[email protected]> * Addressed review comments Signed-off-by: Krishna Kondaka <[email protected]> * Updated documentation with correct configuration Signed-off-by: Krishna Kondaka <[email protected]> * Fixed typos in the documentation Signed-off-by: Krishna Kondaka <[email protected]> * Modified to allow more than one source keys in the config Signed-off-by: Krishna Kondaka <[email protected]> * Modified to allow multiple entries under configuration Signed-off-by: Krishna Kondaka <[email protected]> --------- Signed-off-by: Krishna Kondaka <[email protected]> Co-authored-by: Krishna Kondaka <[email protected]>
- Loading branch information
Showing
7 changed files
with
534 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
# Truncate Processor | ||
|
||
This is a processor that truncates key's value at the beginning or at the end or at both sides of a string as per the configuration. If the key's value is a list, then each of the string members of the list are truncated. Non-string members of the list are left untouched. If `truncate_when` option is provided, the truncation of the input is done only when the condition specified is true for the event being processed. | ||
|
||
## Basic Usage | ||
To get started, create the following `pipeline.yaml`. | ||
```yaml | ||
pipeline: | ||
source: | ||
file: | ||
path: "/full/path/to/logs_json.log" | ||
record_type: "event" | ||
format: "json" | ||
processor: | ||
- truncate: | ||
entries: | ||
- source_keys: ["message1", "message2"] | ||
length: 5 | ||
- source_keys: ["info"] | ||
length: 6 | ||
start_at: 4 | ||
- source_keys: ["log"] | ||
start_at: 5 | ||
sink: | ||
- stdout: | ||
``` | ||
Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file. | ||
|
||
```json | ||
{"message1": "hello,world", "message2": "test message", "info", "new information", "log": "test log message"} | ||
``` | ||
When you run Data Prepper with this `pipeline.yaml`, you should see the following output: | ||
|
||
```json | ||
{"message1":"hello", "message2":"test ", "info":"inform", "log": "log message"} | ||
``` | ||
where `message1` and `message2` have input values truncated to length 5, starting from index 0, `info` input value truncated to length 6 starting from index 4 and `log` input value truncated at the front by 5 characters. | ||
|
||
Example configuration with `truncate_when` option: | ||
```yaml | ||
pipeline: | ||
source: | ||
file: | ||
path: "/full/path/to/logs_json.log" | ||
record_type: "event" | ||
format: "json" | ||
processor: | ||
- truncate: | ||
entries: | ||
- source: ["message"] | ||
length: 5 | ||
start_at: 8 | ||
truncate_when: '/id == 1' | ||
sink: | ||
- stdout: | ||
``` | ||
|
||
When the pipeline started with the above configuration receives the following two events | ||
```json | ||
{"message": "hello, world", "id": 1} | ||
{"message": "hello, world,not-truncated", "id": 2} | ||
``` | ||
the output would be | ||
```json | ||
{"message": "world", "id": 1} | ||
{"message": "hello, world,not-truncated", "id": 2} | ||
``` | ||
|
||
### Configuration | ||
* `entries` - (required) - A list of entries to add to an event | ||
* `source_keys` - (required) - The list of key to be modified | ||
* `truncate_when` - (optional) - a condition, when it is true the truncate operation is performed. | ||
* `start_at` - (optional) - starting index of the string. Defaults to 0. | ||
* `length` - (optional) - length of the string after truncation. Defaults to end of the string. | ||
Either `start_at` or `length` or both must be present |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
plugins { | ||
id 'java' | ||
} | ||
|
||
dependencies { | ||
implementation project(':data-prepper-api') | ||
implementation project(':data-prepper-test-common') | ||
implementation project(':data-prepper-plugins:common') | ||
implementation 'com.fasterxml.jackson.core:jackson-databind' | ||
testImplementation libs.commons.lang3 | ||
} | ||
|
100 changes: 100 additions & 0 deletions
100
...rc/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.processor.truncate; | ||
|
||
import org.opensearch.dataprepper.expression.ExpressionEvaluator; | ||
import org.opensearch.dataprepper.metrics.PluginMetrics; | ||
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; | ||
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; | ||
import org.opensearch.dataprepper.model.event.Event; | ||
import org.opensearch.dataprepper.model.record.Record; | ||
import org.opensearch.dataprepper.model.processor.AbstractProcessor; | ||
import org.opensearch.dataprepper.model.processor.Processor; | ||
|
||
import java.util.Collection; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
/** | ||
* This processor takes in a key and truncates its value to a string with | ||
* characters from the front or at the end or at both removed. | ||
* If the value is not a string, no action is performed. | ||
*/ | ||
@DataPrepperPlugin(name = "truncate", pluginType = Processor.class, pluginConfigurationType = TruncateProcessorConfig.class) | ||
public class TruncateProcessor extends AbstractProcessor<Record<Event>, Record<Event>>{ | ||
private final ExpressionEvaluator expressionEvaluator; | ||
private final List<TruncateProcessorConfig.Entry> entries; | ||
|
||
@DataPrepperPluginConstructor | ||
public TruncateProcessor(final PluginMetrics pluginMetrics, final TruncateProcessorConfig config, final ExpressionEvaluator expressionEvaluator) { | ||
super(pluginMetrics); | ||
this.expressionEvaluator = expressionEvaluator; | ||
this.entries = config.getEntries(); | ||
} | ||
|
||
private String getTruncatedValue(final String value, final int startIndex, final Integer length) { | ||
String truncatedValue = | ||
(length == null || startIndex+length >= value.length()) ? | ||
value.substring(startIndex) : | ||
value.substring(startIndex, startIndex + length); | ||
|
||
return truncatedValue; | ||
} | ||
|
||
@Override | ||
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) { | ||
for(final Record<Event> record : records) { | ||
final Event recordEvent = record.getData(); | ||
for (TruncateProcessorConfig.Entry entry: entries) { | ||
final List<String> sourceKeys = entry.getSourceKeys(); | ||
final String truncateWhen = entry.getTruncateWhen(); | ||
final int startIndex = entry.getStartAt() == null ? 0 : entry.getStartAt(); | ||
final Integer length = entry.getLength(); | ||
if (truncateWhen != null && !expressionEvaluator.evaluateConditional(truncateWhen, recordEvent)) { | ||
continue; | ||
} | ||
for (String sourceKey: sourceKeys) { | ||
if (!recordEvent.containsKey(sourceKey)) { | ||
continue; | ||
} | ||
|
||
final Object value = recordEvent.get(sourceKey, Object.class); | ||
if (value instanceof String) { | ||
recordEvent.put(sourceKey, getTruncatedValue((String)value, startIndex, length)); | ||
} else if (value instanceof List) { | ||
List<Object> result = new ArrayList<>(); | ||
for (Object listItem: (List)value) { | ||
if (listItem instanceof String) { | ||
result.add(getTruncatedValue((String)listItem, startIndex, length)); | ||
} else { | ||
result.add(listItem); | ||
} | ||
} | ||
recordEvent.put(sourceKey, result); | ||
} | ||
} | ||
} | ||
} | ||
|
||
return records; | ||
} | ||
|
||
@Override | ||
public void prepareForShutdown() { | ||
|
||
} | ||
|
||
@Override | ||
public boolean isReadyForShutdown() { | ||
return true; | ||
} | ||
|
||
@Override | ||
public void shutdown() { | ||
|
||
} | ||
} | ||
|
81 changes: 81 additions & 0 deletions
81
...n/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.processor.truncate; | ||
|
||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import jakarta.validation.constraints.NotEmpty; | ||
import jakarta.validation.constraints.NotNull; | ||
import jakarta.validation.constraints.AssertTrue; | ||
import jakarta.validation.Valid; | ||
|
||
import java.util.List; | ||
|
||
public class TruncateProcessorConfig { | ||
public static class Entry { | ||
@NotEmpty | ||
@NotNull | ||
@JsonProperty("source_keys") | ||
private List<String> sourceKeys; | ||
|
||
@JsonProperty("start_at") | ||
private Integer startAt; | ||
|
||
@JsonProperty("length") | ||
private Integer length; | ||
|
||
@JsonProperty("truncate_when") | ||
private String truncateWhen; | ||
|
||
public Entry(final List<String> sourceKeys, final Integer startAt, final Integer length, final String truncateWhen) { | ||
this.sourceKeys = sourceKeys; | ||
this.startAt = startAt; | ||
this.length = length; | ||
this.truncateWhen = truncateWhen; | ||
} | ||
|
||
public Entry() {} | ||
|
||
public List<String> getSourceKeys() { | ||
return sourceKeys; | ||
} | ||
|
||
public Integer getStartAt() { | ||
return startAt; | ||
} | ||
|
||
public Integer getLength() { | ||
return length; | ||
} | ||
|
||
public String getTruncateWhen() { | ||
return truncateWhen; | ||
} | ||
|
||
@AssertTrue(message = "source_keys must be specified and at least one of start_at or length or both must be specified and the values must be positive integers") | ||
public boolean isValidConfig() { | ||
if (length == null && startAt == null) { | ||
return false; | ||
} | ||
if (length != null && length < 0) { | ||
return false; | ||
} | ||
if (startAt != null && startAt < 0) { | ||
return false; | ||
} | ||
return true; | ||
} | ||
} | ||
|
||
@NotEmpty | ||
@NotNull | ||
private List<@Valid Entry> entries; | ||
|
||
public List<Entry> getEntries() { | ||
return entries; | ||
} | ||
|
||
} | ||
|
115 changes: 115 additions & 0 deletions
115
...a/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfigTests.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.processor.truncate; | ||
|
||
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.api.BeforeEach; | ||
|
||
import static org.hamcrest.CoreMatchers.equalTo; | ||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.junit.jupiter.api.Assertions.assertTrue; | ||
import static org.junit.jupiter.api.Assertions.assertFalse; | ||
|
||
import org.apache.commons.lang3.RandomStringUtils; | ||
import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; | ||
|
||
import java.util.List; | ||
import java.util.Random; | ||
|
||
class TruncateProcessorConfigTests { | ||
TruncateProcessorConfig truncateProcessorConfig; | ||
|
||
Random random; | ||
|
||
@BeforeEach | ||
void setUp() { | ||
truncateProcessorConfig = new TruncateProcessorConfig(); | ||
random = new Random(); | ||
} | ||
|
||
@Test | ||
void testDefaults() { | ||
assertThat(truncateProcessorConfig.getEntries(), equalTo(null)); | ||
} | ||
|
||
@Test | ||
void testEntryDefaults() { | ||
TruncateProcessorConfig.Entry entry = new TruncateProcessorConfig.Entry(); | ||
assertThat(entry.getStartAt(), equalTo(null)); | ||
assertThat(entry.getLength(), equalTo(null)); | ||
assertThat(entry.getTruncateWhen(), equalTo(null)); | ||
} | ||
|
||
@Test | ||
void testValidConfiguration_withStartAt() throws NoSuchFieldException, IllegalAccessException { | ||
TruncateProcessorConfig.Entry entry = new TruncateProcessorConfig.Entry(); | ||
String source = RandomStringUtils.randomAlphabetic(10); | ||
List<String> sourceKeys = List.of(source); | ||
setField(TruncateProcessorConfig.Entry.class, entry, "sourceKeys", sourceKeys); | ||
int startAt = random.nextInt(100); | ||
setField(TruncateProcessorConfig.Entry.class, entry, "startAt", startAt); | ||
assertThat(entry.getSourceKeys(), equalTo(sourceKeys)); | ||
assertThat(entry.getStartAt(), equalTo(startAt)); | ||
setField(TruncateProcessorConfig.class, truncateProcessorConfig, "entries", List.of(entry)); | ||
assertTrue(entry.isValidConfig()); | ||
} | ||
|
||
@Test | ||
void testValidConfiguration_withLength() throws NoSuchFieldException, IllegalAccessException { | ||
TruncateProcessorConfig.Entry entry = new TruncateProcessorConfig.Entry(); | ||
String source1 = RandomStringUtils.randomAlphabetic(10); | ||
String source2 = RandomStringUtils.randomAlphabetic(10); | ||
List<String> sourceKeys = List.of(source1, source2); | ||
setField(TruncateProcessorConfig.Entry.class, entry, "sourceKeys", sourceKeys); | ||
int length = random.nextInt(100); | ||
setField(TruncateProcessorConfig.Entry.class, entry, "length", length); | ||
assertThat(entry.getSourceKeys(), equalTo(sourceKeys)); | ||
assertThat(entry.getLength(), equalTo(length)); | ||
setField(TruncateProcessorConfig.class, truncateProcessorConfig, "entries", List.of(entry)); | ||
assertTrue(entry.isValidConfig()); | ||
} | ||
|
||
@Test | ||
void testValidConfiguration_withLength_withTruncateWhen() throws NoSuchFieldException, IllegalAccessException { | ||
TruncateProcessorConfig.Entry entry = new TruncateProcessorConfig.Entry(); | ||
String source = RandomStringUtils.randomAlphabetic(10); | ||
String condition = RandomStringUtils.randomAlphabetic(10); | ||
List<String> sourceKeys = List.of(source); | ||
setField(TruncateProcessorConfig.Entry.class, entry, "sourceKeys", sourceKeys); | ||
int length = random.nextInt(100); | ||
int startAt = random.nextInt(100); | ||
setField(TruncateProcessorConfig.Entry.class, entry, "length", length); | ||
setField(TruncateProcessorConfig.Entry.class, entry, "startAt", startAt); | ||
setField(TruncateProcessorConfig.Entry.class, entry, "truncateWhen", condition); | ||
assertThat(entry.getSourceKeys(), equalTo(sourceKeys)); | ||
assertThat(entry.getLength(), equalTo(length)); | ||
assertThat(entry.getStartAt(), equalTo(startAt)); | ||
assertThat(entry.getTruncateWhen(), equalTo(condition)); | ||
assertTrue(entry.isValidConfig()); | ||
} | ||
|
||
@Test | ||
void testInvalidConfiguration_StartAt_Length_BothNull() throws NoSuchFieldException, IllegalAccessException { | ||
TruncateProcessorConfig.Entry entry = new TruncateProcessorConfig.Entry(); | ||
String source = RandomStringUtils.randomAlphabetic(10); | ||
setField(TruncateProcessorConfig.Entry.class, entry, "sourceKeys", List.of(source)); | ||
assertFalse(entry.isValidConfig()); | ||
} | ||
|
||
@Test | ||
void testInvalidConfiguration_StartAt_Length_Negative() throws NoSuchFieldException, IllegalAccessException { | ||
TruncateProcessorConfig.Entry entry = new TruncateProcessorConfig.Entry(); | ||
String source = RandomStringUtils.randomAlphabetic(10); | ||
int length = random.nextInt(100); | ||
int startAt = random.nextInt(100); | ||
setField(TruncateProcessorConfig.Entry.class, entry, "sourceKeys", List.of(source)); | ||
setField(TruncateProcessorConfig.Entry.class, entry, "startAt", -startAt); | ||
assertFalse(entry.isValidConfig()); | ||
setField(TruncateProcessorConfig.Entry.class, entry, "startAt", startAt); | ||
setField(TruncateProcessorConfig.Entry.class, entry, "length", -length); | ||
assertFalse(entry.isValidConfig()); | ||
} | ||
} |
Oops, something went wrong.