Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add key_value_when conditional to key_value processor #4246

Merged
merged 1 commit into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

package org.opensearch.dataprepper.plugins.processor.keyvalue;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -43,6 +45,8 @@ public class KeyValueProcessor extends AbstractProcessor<Record<Event>, Record<E

private final KeyValueProcessorConfig keyValueProcessorConfig;

private final ExpressionEvaluator expressionEvaluator;

private final Pattern fieldDelimiterPattern;
private final Pattern keyValueDelimiterPattern;
private final Set<String> includeKeysSet = new HashSet<String>();
Expand All @@ -61,7 +65,9 @@ public class KeyValueProcessor extends AbstractProcessor<Record<Event>, Record<E
private final List<String> tagsOnFailure;

@DataPrepperPluginConstructor
public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProcessorConfig keyValueProcessorConfig) {
public KeyValueProcessor(final PluginMetrics pluginMetrics,
final KeyValueProcessorConfig keyValueProcessorConfig,
final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.keyValueProcessorConfig = keyValueProcessorConfig;

Expand Down Expand Up @@ -184,6 +190,13 @@ public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProces
if (keyValueProcessorConfig.getRemoveBrackets() && keyValueProcessorConfig.getRecursive()) {
throw new IllegalArgumentException("Cannot remove brackets needed for determining levels of recursion");
}

this.expressionEvaluator = expressionEvaluator;
if (keyValueProcessorConfig.getKeyValueWhen() != null
&& !expressionEvaluator.isValidExpressionStatement(keyValueProcessorConfig.getKeyValueWhen())) {
throw new InvalidPluginConfigurationException(
String.format("key_value_when %s is not a valid expression statement", keyValueProcessorConfig.getKeyValueWhen()));
}
}

private String buildRegexFromCharacters(String s) {
Expand Down Expand Up @@ -239,6 +252,11 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
final Event recordEvent = record.getData();

try {

if (keyValueProcessorConfig.getKeyValueWhen() != null && !expressionEvaluator.evaluateConditional(keyValueProcessorConfig.getKeyValueWhen(), recordEvent)) {
continue;
}

final String groupsRaw = recordEvent.get(keyValueProcessorConfig.getSource(), String.class);
if (groupsRaw == null) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ public class KeyValueProcessorConfig {
@JsonProperty("overwrite_if_destination_exists")
private boolean overwriteIfDestinationExists = true;

@JsonProperty("key_value_when")
private String keyValueWhen;

public String getSource() {
return source;
}
Expand Down Expand Up @@ -180,4 +183,6 @@ public List<String> getTagsOnFailure() {
public boolean getOverwriteIfDestinationExists() {
return overwriteIfDestinationExists;
}

public String getKeyValueWhen() { return keyValueWhen; }
}
Loading
Loading