Skip to content

Commit

Permalink
Add key_value_when conditional to key_value processor (#4246)
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Mar 7, 2024
1 parent 2a20655 commit 93f560d
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

package org.opensearch.dataprepper.plugins.processor.keyvalue;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -43,6 +45,8 @@ public class KeyValueProcessor extends AbstractProcessor<Record<Event>, Record<E

private final KeyValueProcessorConfig keyValueProcessorConfig;

private final ExpressionEvaluator expressionEvaluator;

private final Pattern fieldDelimiterPattern;
private final Pattern keyValueDelimiterPattern;
private final Set<String> includeKeysSet = new HashSet<String>();
Expand All @@ -61,7 +65,9 @@ public class KeyValueProcessor extends AbstractProcessor<Record<Event>, Record<E
private final List<String> tagsOnFailure;

@DataPrepperPluginConstructor
public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProcessorConfig keyValueProcessorConfig) {
public KeyValueProcessor(final PluginMetrics pluginMetrics,
final KeyValueProcessorConfig keyValueProcessorConfig,
final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.keyValueProcessorConfig = keyValueProcessorConfig;

Expand Down Expand Up @@ -184,6 +190,13 @@ public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProces
if (keyValueProcessorConfig.getRemoveBrackets() && keyValueProcessorConfig.getRecursive()) {
throw new IllegalArgumentException("Cannot remove brackets needed for determining levels of recursion");
}

this.expressionEvaluator = expressionEvaluator;
if (keyValueProcessorConfig.getKeyValueWhen() != null
&& !expressionEvaluator.isValidExpressionStatement(keyValueProcessorConfig.getKeyValueWhen())) {
throw new InvalidPluginConfigurationException(
String.format("key_value_when %s is not a valid expression statement", keyValueProcessorConfig.getKeyValueWhen()));
}
}

private String buildRegexFromCharacters(String s) {
Expand Down Expand Up @@ -239,6 +252,11 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
final Event recordEvent = record.getData();

try {

if (keyValueProcessorConfig.getKeyValueWhen() != null && !expressionEvaluator.evaluateConditional(keyValueProcessorConfig.getKeyValueWhen(), recordEvent)) {
continue;
}

final String groupsRaw = recordEvent.get(keyValueProcessorConfig.getSource(), String.class);
if (groupsRaw == null) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ public class KeyValueProcessorConfig {
@JsonProperty("overwrite_if_destination_exists")
private boolean overwriteIfDestinationExists = true;

@JsonProperty("key_value_when")
private String keyValueWhen;

public String getSource() {
return source;
}
Expand Down Expand Up @@ -180,4 +183,6 @@ public List<String> getTagsOnFailure() {
public boolean getOverwriteIfDestinationExists() {
return overwriteIfDestinationExists;
}

public String getKeyValueWhen() { return keyValueWhen; }
}
Loading

0 comments on commit 93f560d

Please sign in to comment.