Skip to content

Commit

Permalink
Enhancements to list_to_map processor (opensearch-project#4038)
Browse files Browse the repository at this point in the history
* Test key as optional
* Add new options; simplify existing code
* Add options to use source key as key in result map
* Add tags_on_failure option
* Remove restrictions on  option
* Address review comments

---------

Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh authored Feb 5, 2024
1 parent 5df1621 commit 2b8ec72
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 63 deletions.
38 changes: 37 additions & 1 deletion data-prepper-plugins/mutate-event-processors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -485,11 +485,47 @@ the last element will be kept:
}
```

If `use_source_key` and `extract_value` are true:
```yaml
- list_to_map:
source: "mylist"
use_source_key: true
extract_value: true
```
we will get:
```json
{
"mylist": [
{
"name": "a",
"value": "val-a"
},
{
"name": "b",
"value": "val-b1"
},
{
"name": "b",
"value": "val-b2"
},
{
"name": "c",
"value": "val-c"
}
],
"name": ["a", "b", "b", "c"],
"value": ["val-a", "val-b1", "val-b2", "val-c"]
}
```

### Configuration
* `key` - (required) - The key of the fields that will be extracted as keys in the generated map

* `source` - (required) - The key in the event with a list of objects that will be converted to map
* `target` - (optional) - The key of the field that will hold the generated map. If not specified, the generated map will be put in the root.
* `key` - (optional) - The key of the fields that will be extracted as keys in the generated map
* `use_source_key` - (optional) - A boolean value, default to false. If it's true, will use the source key as is in the result
* `value_key` - (optional) - If specified, the values of the given `value_key` in the objects of the source list will be extracted and put into the values of the generated map; otherwise, original objects in the source list will be put into the values of the generated map.
* `extract_value` - (optional) - A boolean value, default to false. It only applies when `use_source_key` is true. If it's true, the value corresponding to the source key will be extracted into the target; otherwise, original objects in the source list will be put into the target.
* `flatten` - (optional) - A boolean value, default to false. If it's false, the values in the generated map will be lists; if it's true, the lists will be flattened into single items.
* `flattened_element` - (optional) - Valid options are "first" and "last", default is "first". This specifies which element, first one or last one, to keep if `flatten` option is true.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@

package org.opensearch.dataprepper.plugins.processor.mutateevent;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
Expand All @@ -21,7 +16,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

Expand All @@ -30,7 +28,6 @@
@DataPrepperPlugin(name = "list_to_map", pluginType = Processor.class, pluginConfigurationType = ListToMapProcessorConfig.class)
public class ListToMapProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Logger LOG = LoggerFactory.getLogger(ListToMapProcessor.class);
private final ListToMapProcessorConfig config;

Expand All @@ -52,81 +49,93 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
continue;
}

final JsonNode sourceNode;
final List<Map<String, Object>> sourceList;
try {
sourceNode = getSourceNode(recordEvent);
sourceList = recordEvent.get(config.getSource(), List.class);
} catch (final Exception e) {
LOG.warn(EVENT, "Given source path [{}] is not valid on record [{}]",
config.getSource(), recordEvent, e);
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
continue;
}

ObjectNode targetNode;
final Map<String, Object> targetMap;
try {
targetNode = constructTargetNode(sourceNode);
} catch (IllegalArgumentException e) {
targetMap = constructTargetMap(sourceList);
} catch (final IllegalArgumentException e) {
LOG.warn(EVENT, "Cannot find a list at the given source path [{}} on record [{}]",
config.getSource(), recordEvent, e);
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
continue;
} catch (final Exception e) {
LOG.error(EVENT, "Error converting source list to map on record [{}]", recordEvent, e);
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
continue;
}

try {
updateEvent(recordEvent, targetNode);
updateEvent(recordEvent, targetMap);
} catch (final Exception e) {
LOG.error(EVENT, "Error updating record [{}] after converting source list to map", recordEvent, e);
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
}
}
return records;
}

private JsonNode getSourceNode(final Event recordEvent) {
final Object sourceObject = recordEvent.get(config.getSource(), Object.class);
return OBJECT_MAPPER.convertValue(sourceObject, JsonNode.class);
}

private ObjectNode constructTargetNode(JsonNode sourceNode) {
final ObjectNode targetNode = OBJECT_MAPPER.createObjectNode();
if (sourceNode.isArray()) {
for (final JsonNode itemNode : sourceNode) {
String itemKey = itemNode.get(config.getKey()).asText();

if (!config.getFlatten()) {
final ArrayNode itemValueNode;
if (!targetNode.has(itemKey)) {
itemValueNode = OBJECT_MAPPER.createArrayNode();
targetNode.set(itemKey, itemValueNode);
} else {
itemValueNode = (ArrayNode) targetNode.get(itemKey);
}
private Map<String, Object> constructTargetMap(final List<Map<String, Object>> sourceList) {
Map<String, Object> targetMap = new HashMap<>();
for (final Map<String, Object> itemMap : sourceList) {

if (config.getValueKey() == null) {
itemValueNode.add(itemNode);
} else {
itemValueNode.add(itemNode.get(config.getValueKey()));
if (config.getUseSourceKey()) {
if (config.getFlatten()) {
for (final String entryKey : itemMap.keySet()) {
setTargetMapFlattened(targetMap, itemMap, entryKey, entryKey, config.getExtractValue());
}
} else {
if (!targetNode.has(itemKey) || config.getFlattenedElement() == ListToMapProcessorConfig.FlattenedElement.LAST) {
if (config.getValueKey() == null) {
targetNode.set(itemKey, itemNode);
} else {
targetNode.set(itemKey, itemNode.get(config.getValueKey()));
}
for (final String entryKey : itemMap.keySet()) {
setTargetMapUnflattened(targetMap, itemMap, entryKey, entryKey, config.getExtractValue());
}
}
} else {
final String itemKey = (String) itemMap.get(config.getKey());
if (config.getFlatten()) {
setTargetMapFlattened(targetMap, itemMap, itemKey, config.getValueKey(), config.getValueKey() != null);
} else {
setTargetMapUnflattened(targetMap, itemMap, itemKey, config.getValueKey(), config.getValueKey() != null);
}
}
}
return targetMap;
}

private void setTargetMapUnflattened(
final Map<String, Object> targetMap, final Map<String, Object> itemMap, final String itemKey, final String itemValueKey, final boolean doExtractValue) {
if (!targetMap.containsKey(itemKey)) {
targetMap.put(itemKey, new ArrayList<>());
}

final List<Object> itemValue = (List<Object>) targetMap.get(itemKey);

if (doExtractValue) {
itemValue.add(itemMap.get(itemValueKey));
} else {
throw new IllegalArgumentException("Cannot find a list at the given source path [{}]" + config.getSource());
itemValue.add(itemMap);
}
return targetNode;
}

private void updateEvent(Event recordEvent, ObjectNode targetNode) {
final TypeReference<Map<String, Object>> mapTypeReference = new TypeReference<>() {};
final Map<String, Object> targetMap = OBJECT_MAPPER.convertValue(targetNode, mapTypeReference);
private void setTargetMapFlattened(
final Map<String, Object> targetMap, final Map<String, Object> itemMap, final String itemKey, final String itemValueKey, final boolean doExtractValue) {
if (!targetMap.containsKey(itemKey) || config.getFlattenedElement() == ListToMapProcessorConfig.FlattenedElement.LAST) {
if (doExtractValue) {
targetMap.put(itemKey, itemMap.get(itemValueKey));
} else {
targetMap.put(itemKey, itemMap);
}
}
}

private void updateEvent(final Event recordEvent, final Map<String, Object> targetMap) {
final boolean doWriteToRoot = Objects.isNull(config.getTarget());
if (doWriteToRoot) {
for (final Map.Entry<String, Object> entry : targetMap.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import jakarta.validation.constraints.NotNull;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -45,14 +46,18 @@ static FlattenedElement fromOptionValue(final String option) {
@JsonProperty("target")
private String target = null;

@NotEmpty
@NotNull
@JsonProperty("key")
private String key;

@JsonProperty("value_key")
private String valueKey = null;

@JsonProperty("use_source_key")
private boolean useSourceKey = false;

@JsonProperty("extract_value")
private boolean extractValue = false;

@NotNull
@JsonProperty("flatten")
private boolean flatten = false;
Expand All @@ -64,6 +69,9 @@ static FlattenedElement fromOptionValue(final String option) {
@JsonProperty("list_to_map_when")
private String listToMapWhen;

@JsonProperty("tags_on_failure")
private List<String> tagsOnFailure;

public String getSource() {
return source;
}
Expand All @@ -80,6 +88,14 @@ public String getValueKey() {
return valueKey;
}

public boolean getUseSourceKey() {
return useSourceKey;
}

public boolean getExtractValue() {
return extractValue;
}

public boolean getFlatten() {
return flatten;
}
Expand All @@ -89,4 +105,8 @@ public boolean getFlatten() {
public FlattenedElement getFlattenedElement() {
return flattenedElement;
}

public List<String> getTagsOnFailure() {
return tagsOnFailure;
}
}
Loading

0 comments on commit 2b8ec72

Please sign in to comment.