Skip to content

Commit

Permalink
Add append option to add_entries processor (opensearch-project#4143)
Browse files Browse the repository at this point in the history
* Add append option

Signed-off-by: Hai Yan <[email protected]>

* Address comments: combine two mergeValue methods

Signed-off-by: Hai Yan <[email protected]>

* Address comments: update assertion message

Signed-off-by: Hai Yan <[email protected]>

---------

Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh authored Feb 20, 2024
1 parent 6117222 commit 94e9e65
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;

Expand Down Expand Up @@ -67,12 +70,15 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
if (!Objects.isNull(key)) {
if (!recordEvent.containsKey(key) || entry.getOverwriteIfKeyExists()) {
recordEvent.put(key, value);
} else if (recordEvent.containsKey(key) && entry.getAppendIfKeyExists()) {
mergeValueToEvent(recordEvent, key, value);
}
} else {
Map<String, Object> attributes = recordEvent.getMetadata().getAttributes();
if (!attributes.containsKey(metadataKey) || entry.getOverwriteIfKeyExists()) {
recordEvent.getMetadata().setAttribute(metadataKey, value);

} else if (attributes.containsKey(metadataKey) && entry.getAppendIfKeyExists()) {
mergeValueToEventMetadata(recordEvent, metadataKey, value);
}
}
} catch (Exception e) {
Expand All @@ -97,4 +103,25 @@ public boolean isReadyForShutdown() {
@Override
public void shutdown() {
}

private void mergeValueToEvent(final Event recordEvent, final String key, final Object value) {
mergeValue(value, () -> recordEvent.get(key, Object.class), newValue -> recordEvent.put(key, newValue));
}

private void mergeValueToEventMetadata(final Event recordEvent, final String key, final Object value) {
mergeValue(value, () -> recordEvent.getMetadata().getAttribute(key), newValue -> recordEvent.getMetadata().setAttribute(key, newValue));
}

private void mergeValue(final Object value, Supplier<Object> getter, Consumer<Object> setter) {
final Object currentValue = getter.get();
final List<Object> mergedValue = new ArrayList<>();
if (currentValue instanceof List) {
mergedValue.addAll((List<Object>) currentValue);
} else {
mergedValue.add(currentValue);
}

mergedValue.add(value);
setter.accept(mergedValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public static class Entry {
@JsonProperty("overwrite_if_key_exists")
private boolean overwriteIfKeyExists = false;

@JsonProperty("append_if_key_exists")
private boolean appendIfKeyExists = false;

public String getKey() {
return key;
}
Expand All @@ -58,19 +61,29 @@ public boolean getOverwriteIfKeyExists() {
return overwriteIfKeyExists;
}

public boolean getAppendIfKeyExists() {
return appendIfKeyExists;
}

public String getAddWhen() { return addWhen; }

@AssertTrue(message = "Either value or format or expression must be specified, and only one of them can be specified")
public boolean hasValueOrFormatOrExpression() {
return Stream.of(value, format, valueExpression).filter(n -> n!=null).count() == 1;
}

@AssertTrue(message = "overwrite_if_key_exists and append_if_key_exists can not be set to true at the same time.")
boolean overwriteAndAppendNotBothSet() {
return !(overwriteIfKeyExists && appendIfKeyExists);
}

public Entry(final String key,
final String metadataKey,
final Object value,
final String format,
final String valueExpression,
final boolean overwriteIfKeyExists,
final boolean appendIfKeyExists,
final String addWhen)
{
if (key != null && metadataKey != null) {
Expand All @@ -85,6 +98,7 @@ public Entry(final String key,
this.format = format;
this.valueExpression = valueExpression;
this.overwriteIfKeyExists = overwriteIfKeyExists;
this.appendIfKeyExists = appendIfKeyExists;
this.addWhen = addWhen;
}

Expand Down
Loading

0 comments on commit 94e9e65

Please sign in to comment.