Skip to content

Commit

Permalink
Updates rename_keys to use EventKey and creates a duplicate of the ol…
Browse files Browse the repository at this point in the history
…d implementation named rename_keys_old to do a performance evaluation between the two.

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable committed Jun 14, 2024
1 parent 7f64a6d commit 961922e
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 9 deletions.
2 changes: 2 additions & 0 deletions data-prepper-plugins/mutate-event-processors/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
testImplementation project(':data-prepper-test-event')
testImplementation testLibs.slf4j.simple
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.mutateevent;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.List;
import java.util.Objects;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;

@DataPrepperPlugin(name = "rename_keys_old", pluginType = Processor.class, pluginConfigurationType = RenameKeyProcessorConfig.class)
public class RenameKeyOldProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(RenameKeyProcessor.class);
private final List<RenameKeyProcessorConfig.Entry> entries;

private final ExpressionEvaluator expressionEvaluator;

@DataPrepperPluginConstructor
public RenameKeyOldProcessor(final PluginMetrics pluginMetrics, final RenameKeyProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.entries = config.getEntries();
this.expressionEvaluator = expressionEvaluator;
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
for(final Record<Event> record : records) {
final Event recordEvent = record.getData();

try {

for (RenameKeyProcessorConfig.Entry entry : entries) {
if (Objects.nonNull(entry.getRenameWhen()) && !expressionEvaluator.evaluateConditional(entry.getRenameWhen(), recordEvent)) {
continue;
}

if (entry.getFromKey().equals(entry.getToKey()) || !recordEvent.containsKey(entry.getFromKey())) {
continue;
}

if (!recordEvent.containsKey(entry.getToKey()) || entry.getOverwriteIfToKeyExists()) {
final Object source = recordEvent.get(entry.getFromKey(), Object.class);
recordEvent.put(entry.getToKey(), source);
recordEvent.delete(entry.getFromKey());
}
}
} catch (final Exception e) {
LOG.error(EVENT, "There was an exception while processing Event [{}]", recordEvent, e);
}
}

return records;
}

@Override
public void prepareForShutdown() {
}

@Override
public boolean isReadyForShutdown() {
return true;
}

@Override
public void shutdown() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
Expand All @@ -29,12 +32,39 @@ public class RenameKeyProcessor extends AbstractProcessor<Record<Event>, Record<
private final List<RenameKeyProcessorConfig.Entry> entries;

private final ExpressionEvaluator expressionEvaluator;
private final List<RenameEntry> renameEntries;

private static class RenameEntry {
RenameKeyProcessorConfig.Entry configEntry;
EventKey fromEventKey;
EventKey toEventKey;
}

@DataPrepperPluginConstructor
public RenameKeyProcessor(final PluginMetrics pluginMetrics, final RenameKeyProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
public RenameKeyProcessor(
final RenameKeyProcessorConfig config,
final EventKeyFactory eventKeyFactory,
final ExpressionEvaluator expressionEvaluator,
final PluginMetrics pluginMetrics) {
super(pluginMetrics);
this.entries = config.getEntries();
this.expressionEvaluator = expressionEvaluator;

renameEntries = new ArrayList<>();
for (final RenameKeyProcessorConfig.Entry entry : entries) {
final String fromKey = entry.getFromKey();
final String toKey = entry.getToKey();
//if(fromKey.equals(toKey))
// continue;

final RenameEntry renameEntry = new RenameEntry();
renameEntry.configEntry = entry;
renameEntry.fromEventKey = eventKeyFactory.createEventKey(fromKey, EventKeyFactory.EventAction.GET, EventKeyFactory.EventAction.DELETE);
renameEntry.toEventKey = eventKeyFactory.createEventKey(toKey);

renameEntries.add(renameEntry);
}

}

@Override
Expand All @@ -44,19 +74,19 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor

try {

for (RenameKeyProcessorConfig.Entry entry : entries) {
if (Objects.nonNull(entry.getRenameWhen()) && !expressionEvaluator.evaluateConditional(entry.getRenameWhen(), recordEvent)) {
for (final RenameEntry entry : renameEntries) {
if (Objects.nonNull(entry.configEntry.getRenameWhen()) && !expressionEvaluator.evaluateConditional(entry.configEntry.getRenameWhen(), recordEvent)) {
continue;
}

if (entry.getFromKey().equals(entry.getToKey()) || !recordEvent.containsKey(entry.getFromKey())) {
if (entry.fromEventKey.getKey().equals(entry.toEventKey.getKey()) || !recordEvent.containsKey(entry.fromEventKey)) {
continue;
}

if (!recordEvent.containsKey(entry.getToKey()) || entry.getOverwriteIfToKeyExists()) {
final Object source = recordEvent.get(entry.getFromKey(), Object.class);
recordEvent.put(entry.getToKey(), source);
recordEvent.delete(entry.getFromKey());
if (!recordEvent.containsKey(entry.toEventKey) || entry.configEntry.getOverwriteIfToKeyExists()) {
final Object source = recordEvent.get(entry.fromEventKey, Object.class);
recordEvent.put(entry.toEventKey, source);
recordEvent.delete(entry.fromEventKey);
}
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@

package org.opensearch.dataprepper.plugins.processor.mutateevent;

import org.opensearch.dataprepper.event.TestEventKeyFactory;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -39,6 +41,8 @@ public class RenameKeyProcessorTests {
@Mock
private ExpressionEvaluator expressionEvaluator;

private final EventKeyFactory eventKeyFactory = TestEventKeyFactory.getTestEventFactory();

@Test
public void testSingleOverwriteRenameProcessorTests() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", true, null)));
Expand Down Expand Up @@ -132,7 +136,7 @@ public void testNoRename_when_RenameWhen_returns_false() {
}

private RenameKeyProcessor createObjectUnderTest() {
return new RenameKeyProcessor(pluginMetrics, mockConfig, expressionEvaluator);
return new RenameKeyProcessor(mockConfig, eventKeyFactory, expressionEvaluator, pluginMetrics);
}

private RenameKeyProcessorConfig.Entry createEntry(final String fromKey, final String toKey, final boolean overwriteIfToKeyExists, final String renameWhen) {
Expand Down

0 comments on commit 961922e

Please sign in to comment.