Skip to content

Commit

Permalink
Add map_to_list processor (#3945)
Browse files Browse the repository at this point in the history
Add map to list processor basic functionality and unit tests

Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh authored Jan 11, 2024
1 parent 912705b commit 787064e
Show file tree
Hide file tree
Showing 4 changed files with 421 additions and 0 deletions.
57 changes: 57 additions & 0 deletions data-prepper-plugins/mutate-event-processors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,63 @@ the last element will be kept:
* `flattened_element` - (optional) - Valid options are "first" and "last", default is "first". This specifies which element, first one or last one, to keep if `flatten` option is true.


## map_to_list Processor
A processor that converts a map of key-value pairs to a list of objects, each contains the key and value in separate fields.

For example, if the input event has the following data:
```json
{
"my-map": {
"key1": "value1",
"key2": "value2",
"key3": "value3"
}
}
```
with `map_to_list` processor configured to:
```yaml
...
processor:
- map_to_list:
source: "my-map"
target: "my-list"
...
```
The processed event will have the following data:
```json
{
"my-list": [
{
"key": "key1",
"value": "value1"
},
{
"key": "key2",
"value": "value2"
},
{
"key": "key3",
"value": "value3"
}
],
"my-map": {
"key1": "value1",
"key2": "value2",
"key3": "value3"
}
}
```

### Configuration
* `source` - (required): the source map to perform the operation
* `target` - (required): the target list to put the converted list
* `key_name` - (optional): the key name of the field to hold the original key, default is "key"
* `value_name` - (optional): the key name of the field to hold the original value, default is "value"
* `exclude_keys` - (optional): the keys in source map that will be excluded from processing, default is empty list
* `remove_processed_fields` - (optional): default is false; if true, will remove processed fields from source map
* `map_to_list_when` - (optional): used to configure a condition for event processing based on certain property of the incoming event. Default is null (all events will be processed).


## Developer Guide
This plugin is compatible with Java 11 and 17. Refer to the following developer guides for plugin development:
- [Developer Guide](https://github.com/opensearch-project/data-prepper/blob/main/docs/developer_guide.md)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.mutateevent;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

@DataPrepperPlugin(name = "map_to_list", pluginType = Processor.class, pluginConfigurationType = MapToListProcessorConfig.class)
public class MapToListProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(MapToListProcessor.class);
private final MapToListProcessorConfig config;
private final ExpressionEvaluator expressionEvaluator;
private final Set<String> excludeKeySet = new HashSet<>();

@DataPrepperPluginConstructor
public MapToListProcessor(final PluginMetrics pluginMetrics, final MapToListProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.config = config;
this.expressionEvaluator = expressionEvaluator;
excludeKeySet.addAll(config.getExcludeKeys());
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
for (final Record<Event> record : records) {
final Event recordEvent = record.getData();

if (config.getMapToListWhen() != null && !expressionEvaluator.evaluateConditional(config.getMapToListWhen(), recordEvent)) {
continue;
}

try {
final Map<String, Object> sourceMap = recordEvent.get(config.getSource(), Map.class);
final List<Map<String, Object>> targetList = new ArrayList<>();

Map<String, Object> modifiedSourceMap = new HashMap<>();
for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) {
if (excludeKeySet.contains(entry.getKey())) {
if (config.getRemoveProcessedFields()) {
modifiedSourceMap.put(entry.getKey(), entry.getValue());
}
continue;
}
targetList.add(Map.of(
config.getKeyName(), entry.getKey(),
config.getValueName(), entry.getValue()
));
}

if (config.getRemoveProcessedFields()) {
recordEvent.put(config.getSource(), modifiedSourceMap);
}

recordEvent.put(config.getTarget(), targetList);
} catch (Exception e) {
LOG.error("Fail to perform Map to List operation", e);
//TODO: add tagging on failure
}
}
return records;
}

@Override
public void prepareForShutdown() {
}

@Override
public boolean isReadyForShutdown() {
return true;
}

@Override
public void shutdown() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.mutateevent;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;

import java.util.ArrayList;
import java.util.List;

public class MapToListProcessorConfig {
private static final String DEFAULT_KEY_NAME = "key";
private static final String DEFAULT_VALUE_NAME = "value";
private static final List<String> DEFAULT_EXCLUDE_KEYS = new ArrayList<>();
private static final boolean DEFAULT_REMOVE_PROCESSED_FIELDS = false;

@NotEmpty
@NotNull
@JsonProperty("source")
private String source;

@NotEmpty
@NotNull
@JsonProperty("target")
private String target;

@JsonProperty("key_name")
private String keyName = DEFAULT_KEY_NAME;

@JsonProperty("value_name")
private String valueName = DEFAULT_VALUE_NAME;

@JsonProperty("map_to_list_when")
private String mapToListWhen;

@JsonProperty("exclude_keys")
private List<String> excludeKeys = DEFAULT_EXCLUDE_KEYS;

@JsonProperty("remove_processed_fields")
private boolean removeProcessedFields = DEFAULT_REMOVE_PROCESSED_FIELDS;

public String getSource() {
return source;
}

public String getTarget() {
return target;
}

public String getKeyName() {
return keyName;
}

public String getValueName() {
return valueName;
}

public String getMapToListWhen() {
return mapToListWhen;
}

public List<String> getExcludeKeys() {
return excludeKeys;
}

public boolean getRemoveProcessedFields() {
return removeProcessedFields;
}
}
Loading

0 comments on commit 787064e

Please sign in to comment.