Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add append option to add_entries processor #4143

Merged
merged 3 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;

Expand Down Expand Up @@ -67,12 +70,15 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
if (!Objects.isNull(key)) {
if (!recordEvent.containsKey(key) || entry.getOverwriteIfKeyExists()) {
recordEvent.put(key, value);
} else if (recordEvent.containsKey(key) && entry.getAppendIfKeyExists()) {
mergeValueToEvent(recordEvent, key, value);
}
} else {
Map<String, Object> attributes = recordEvent.getMetadata().getAttributes();
if (!attributes.containsKey(metadataKey) || entry.getOverwriteIfKeyExists()) {
recordEvent.getMetadata().setAttribute(metadataKey, value);

} else if (attributes.containsKey(metadataKey) && entry.getAppendIfKeyExists()) {
mergeValueToEventMetadata(recordEvent, metadataKey, value);
}
}
} catch (Exception e) {
Expand All @@ -97,4 +103,25 @@ public boolean isReadyForShutdown() {
@Override
public void shutdown() {
}

private void mergeValueToEvent(final Event recordEvent, final String key, final Object value) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider combining these with the use of Java 8 lambdas.

Here is some starting point for one way to do it.

private void mergeValueToEvent(final Event recordEvent, final String key, final Object value)
  mergeValue(recordEvent, () -> recordEvent.getMetadata().getAttribute(key), newValue -> recordEvent.put(key, newValue);
}

mergeValue(Supplier<Object> getter, Consumer<Object> setter) {
          final Object currentValue = getter.get();
        final List<Object> mergedValue = new ArrayList<>();
        if (currentValue instanceof List) {
            mergedValue.addAll((List<Object>) currentValue);
        } else {
            mergedValue.add(currentValue);
        }

        mergedValue.add(value);
        setter.accept(mergedValue);
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion!

mergeValue(value, () -> recordEvent.get(key, Object.class), newValue -> recordEvent.put(key, newValue));
}

private void mergeValueToEventMetadata(final Event recordEvent, final String key, final Object value) {
mergeValue(value, () -> recordEvent.getMetadata().getAttribute(key), newValue -> recordEvent.getMetadata().setAttribute(key, newValue));
}

private void mergeValue(final Object value, Supplier<Object> getter, Consumer<Object> setter) {
final Object currentValue = getter.get();
final List<Object> mergedValue = new ArrayList<>();
if (currentValue instanceof List) {
mergedValue.addAll((List<Object>) currentValue);
} else {
mergedValue.add(currentValue);
}

mergedValue.add(value);
setter.accept(mergedValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public static class Entry {
@JsonProperty("overwrite_if_key_exists")
private boolean overwriteIfKeyExists = false;

@JsonProperty("append_if_key_exists")
private boolean appendIfKeyExists = false;

public String getKey() {
return key;
}
Expand All @@ -58,19 +61,29 @@ public boolean getOverwriteIfKeyExists() {
return overwriteIfKeyExists;
}

public boolean getAppendIfKeyExists() {
return appendIfKeyExists;
}

public String getAddWhen() { return addWhen; }

@AssertTrue(message = "Either value or format or expression must be specified, and only one of them can be specified")
public boolean hasValueOrFormatOrExpression() {
return Stream.of(value, format, valueExpression).filter(n -> n!=null).count() == 1;
}

@AssertTrue(message = "overwrite_if_key_exists and append_if_key_exists can not be set to true at the same time.")
boolean overwriteAndAppendNotBothSet() {
return !(overwriteIfKeyExists && appendIfKeyExists);
}

public Entry(final String key,
final String metadataKey,
final Object value,
final String format,
final String valueExpression,
final boolean overwriteIfKeyExists,
final boolean appendIfKeyExists,
final String addWhen)
{
if (key != null && metadataKey != null) {
Expand All @@ -85,6 +98,7 @@ public Entry(final String key,
this.format = format;
this.valueExpression = valueExpression;
this.overwriteIfKeyExists = overwriteIfKeyExists;
this.appendIfKeyExists = appendIfKeyExists;
this.addWhen = addWhen;
}

Expand Down
Loading
Loading