From a871e28ab9adfd1f239493297101d15b3cfe906b Mon Sep 17 00:00:00 2001 From: Taylor Gray <33740195+graytaylor0@users.noreply.github.com> Date: Wed, 19 Jan 2022 22:33:10 -0600 Subject: [PATCH 1/3] Load AggregateAction, implement AggregateAction.handleEvent in AggregateProcessor, add GroupStateManager and AggregateIdentificationKeysHasher classes for use in AggregateProcessor Signed-off-by: Taylor Gray <33740195+graytaylor0@users.noreply.github.com> --- .../aggregate-processor/build.gradle | 17 ++- .../aggregate/AggregateActionResponse.java | 5 +- .../AggregateIdentificationKeysHasher.java | 26 ++++ .../aggregate/AggregateProcessor.java | 41 ++++++- .../aggregate/AggregateProcessorConfig.java | 7 ++ .../aggregate/GroupStateManager.java | 41 +++++++ ...AggregateIdentificationKeysHasherTest.java | 77 ++++++++++++ .../aggregate/AggregateProcessorTest.java | 115 ++++++++++++++++++ .../aggregate/GroupStateManagerTest.java | 58 +++++++++ 9 files changed, 375 insertions(+), 12 deletions(-) create mode 100644 data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasher.java create mode 100644 data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManager.java create mode 100644 data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasherTest.java create mode 100644 data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java create mode 100644 data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManagerTest.java diff --git a/data-prepper-plugins/aggregate-processor/build.gradle b/data-prepper-plugins/aggregate-processor/build.gradle index ee445bb69a..9107057eb9 100644 --- a/data-prepper-plugins/aggregate-processor/build.gradle +++ b/data-prepper-plugins/aggregate-processor/build.gradle @@ -7,12 +7,21 @@ plugins { id 'java' } -repositories { - mavenCentral() -} - dependencies { implementation project(':data-prepper-api') implementation project(':data-prepper-plugins:common') implementation 'com.fasterxml.jackson.core:jackson-databind' + testImplementation 'org.hamcrest:hamcrest:2.2' + testImplementation "org.mockito:mockito-inline:${versionMap.mockito}" +} + +jacocoTestCoverageVerification { + dependsOn jacocoTestReport + violationRules { + rule { + limit { + minimum = 1.0 + } + } + } } \ No newline at end of file diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateActionResponse.java b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateActionResponse.java index 3cef7f2cc5..e4d292a2eb 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateActionResponse.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateActionResponse.java @@ -8,12 +8,11 @@ import com.amazon.dataprepper.model.event.Event; /** - * Model class to be returned in {@link com.amazon.dataprepper.plugins.processor.aggregate.AggregateAction}. Contains both the Event to be processed and an option - * to close the current window for {@link com.amazon.dataprepper.plugins.processor.aggregate.AggregateProcessor} immediately after an event is handled. + * Model class to be returned in {@link com.amazon.dataprepper.plugins.processor.aggregate.AggregateAction}. Contains the Event to be processed, which is null if no Event should be processed. * @since 1.3 */ public class AggregateActionResponse { - private Event event; + private final Event event; public AggregateActionResponse(final Event event) { this.event = event; diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasher.java b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasher.java new file mode 100644 index 0000000000..da92b3c3af --- /dev/null +++ b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasher.java @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.processor.aggregate; + +import com.amazon.dataprepper.model.event.Event; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +class AggregateIdentificationKeysHasher { + AggregateIdentificationKeysHasher() { + + } + + Map createIdentificationKeyHashFromEvent(final Event event, final List identificationKeys) { + final Map identificationKeysHash = new HashMap<>(); + for (final String identificationKey : identificationKeys) { + identificationKeysHash.put(identificationKey, event.get(identificationKey, Object.class)); + } + return identificationKeysHash; + } +} diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessor.java b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessor.java index f4c957ecfd..48fef39bf7 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessor.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessor.java @@ -8,8 +8,8 @@ import com.amazon.dataprepper.metrics.PluginMetrics; import com.amazon.dataprepper.model.annotations.DataPrepperPlugin; import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor; -import com.amazon.dataprepper.model.annotations.SingleThread; -import com.amazon.dataprepper.model.configuration.PipelineDescription; +import com.amazon.dataprepper.model.configuration.PluginModel; +import com.amazon.dataprepper.model.configuration.PluginSetting; import com.amazon.dataprepper.model.event.Event; import com.amazon.dataprepper.model.plugin.PluginFactory; import com.amazon.dataprepper.model.processor.AbstractProcessor; @@ -19,23 +19,54 @@ import org.slf4j.LoggerFactory; import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; -@SingleThread @DataPrepperPlugin(name = "aggregate", pluginType = Processor.class, pluginConfigurationType = AggregateProcessorConfig.class) public class AggregateProcessor extends AbstractProcessor, Record> { private static final Logger LOG = LoggerFactory.getLogger(AggregateProcessor.class); private final AggregateProcessorConfig aggregateProcessorConfig; + private final GroupStateManager groupStateManager; + private final AggregateAction aggregateAction; + private final AggregateIdentificationKeysHasher aggregateIdentificationKeysHasher; @DataPrepperPluginConstructor - public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, final PipelineDescription pipelineDescription) { + public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory) { + this(aggregateProcessorConfig, pluginMetrics, pluginFactory, new GroupStateManager(), new AggregateIdentificationKeysHasher()); + } + public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, final GroupStateManager groupStateManager, final AggregateIdentificationKeysHasher aggregateIdentificationKeysHasher) { super(pluginMetrics); this.aggregateProcessorConfig = aggregateProcessorConfig; + this.groupStateManager = groupStateManager; + this.aggregateIdentificationKeysHasher = aggregateIdentificationKeysHasher; + this.aggregateAction = loadAggregateAction(pluginFactory); + } + + private AggregateAction loadAggregateAction(final PluginFactory pluginFactory) { + final PluginModel actionConfiguration = aggregateProcessorConfig.getAggregateAction(); + final PluginSetting actionPluginSetting = new PluginSetting(actionConfiguration.getPluginName(), actionConfiguration.getPluginSettings()); + return pluginFactory.loadPlugin(AggregateAction.class, actionPluginSetting); } @Override public Collection> doExecute(Collection> records) { - return records; + final List> recordsOut = new LinkedList<>(); + + for (final Record record : records) { + final Event event = record.getData(); + final Map identificationKeysHash = aggregateIdentificationKeysHasher.createIdentificationKeyHashFromEvent(event, aggregateProcessorConfig.getIdentificationKeys()); + final Map groupStateForEvent = groupStateManager.getGroupState(identificationKeysHash); + + final AggregateActionResponse handleEventResponse = aggregateAction.handleEvent(event, groupStateForEvent); + + final Optional aggregateActionResponseEvent = Optional.ofNullable(handleEventResponse.getEvent()); + aggregateActionResponseEvent.ifPresent(value -> recordsOut.add(new Record<>(value, record.getMetadata()))); + + } + return recordsOut; } @Override diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessorConfig.java b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessorConfig.java index cf14b9531c..6f9500ce22 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessorConfig.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessorConfig.java @@ -5,6 +5,7 @@ package com.amazon.dataprepper.plugins.processor.aggregate; +import com.amazon.dataprepper.model.configuration.PluginModel; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.AssertTrue; @@ -31,6 +32,10 @@ public class AggregateProcessorConfig { @NotEmpty private String dbPath = DEFAULT_DB_PATH; + @JsonProperty("action") + @NotEmpty + private PluginModel aggregateAction; + @JsonIgnore private File dbFile; @@ -46,6 +51,8 @@ public String getDbPath() { return dbPath; } + public PluginModel getAggregateAction() { return aggregateAction; } + @AssertTrue(message = "db_path is not a valid file path") boolean isDbPathValid() { dbFile = new File(dbPath); diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManager.java b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManager.java new file mode 100644 index 0000000000..ad61a18d50 --- /dev/null +++ b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManager.java @@ -0,0 +1,41 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.processor.aggregate; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import com.google.common.collect.Maps; + +class GroupStateManager { + + private Map> allGroupStates = Maps.newConcurrentMap(); + + GroupStateManager() { + + } + + Map getGroupState(final Map identificationKeysHash) { + final Optional> groupState = Optional.ofNullable(allGroupStates.get(identificationKeysHash)); + + if (groupState.isPresent()) { + return groupState.get(); + } + + final Map newGroupState = new HashMap<>(); + allGroupStates.put(identificationKeysHash, newGroupState); + return newGroupState; + } + + void setGroupStateForIdentificationKeysHash(final Map identificationKeysHash, final Map groupState) { + allGroupStates.put(identificationKeysHash, groupState); + } + + Map> getAllGroupStates() { + return this.allGroupStates; + } +} diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasherTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasherTest.java new file mode 100644 index 0000000000..a56b81c6f9 --- /dev/null +++ b/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasherTest.java @@ -0,0 +1,77 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.processor.aggregate; + +import com.amazon.dataprepper.model.event.Event; +import com.amazon.dataprepper.model.event.JacksonEvent; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +public class AggregateIdentificationKeysHasherTest { + private Event event; + private List identificationKeys; + private AggregateIdentificationKeysHasher aggregateIdentificationKeysHasher; + + @BeforeEach + void setup() { + identificationKeys = new ArrayList<>(); + identificationKeys.add("firstIdentificationKey"); + identificationKeys.add("secondIdentificationKey"); + } + + private AggregateIdentificationKeysHasher createObjectUnderTest() { + return new AggregateIdentificationKeysHasher(); + } + + @Test + void createIdentificationKeyHashFromEvent_returns_expected_Map() { + aggregateIdentificationKeysHasher = createObjectUnderTest(); + final Map eventMap = new HashMap<>(); + eventMap.put("firstIdentificationKey", UUID.randomUUID().toString()); + eventMap.put("secondIdentificationKey", UUID.randomUUID().toString()); + + final Map expectedResult = new HashMap<>(eventMap); + + eventMap.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + + event = JacksonEvent.builder() + .withEventType("event") + .withData(eventMap) + .build(); + + final Map result = aggregateIdentificationKeysHasher.createIdentificationKeyHashFromEvent(event, identificationKeys); + assertThat(result, equalTo(expectedResult)); + } + + @Test + void createIdentificationKeysHashFromEvent_where_Event_does_not_contain_one_of_the_identification_keys_returns_expected_Map() { + aggregateIdentificationKeysHasher = createObjectUnderTest(); + final Map eventMap = new HashMap<>(); + eventMap.put("firstIdentificationKey", UUID.randomUUID().toString()); + + final Map expectedResult = new HashMap<>(eventMap); + expectedResult.put("secondIdentificationKey", null); + + eventMap.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + + event = JacksonEvent.builder() + .withEventType("event") + .withData(eventMap) + .build(); + + final Map result = aggregateIdentificationKeysHasher.createIdentificationKeyHashFromEvent(event, identificationKeys); + assertThat(result, equalTo(expectedResult)); + } +} diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java new file mode 100644 index 0000000000..3f83678965 --- /dev/null +++ b/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java @@ -0,0 +1,115 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.processor.aggregate; + +import com.amazon.dataprepper.metrics.PluginMetrics; +import com.amazon.dataprepper.model.configuration.PluginModel; +import com.amazon.dataprepper.model.configuration.PluginSetting; +import com.amazon.dataprepper.model.event.Event; +import com.amazon.dataprepper.model.event.JacksonEvent; +import com.amazon.dataprepper.model.plugin.PluginFactory; +import com.amazon.dataprepper.model.record.Record; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class AggregateProcessorTest { + + @Mock + private PluginFactory pluginFactory; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private AggregateIdentificationKeysHasher aggregateIdentificationKeysHasher; + + @Mock + private AggregateProcessorConfig aggregateProcessorConfig; + + @Mock + private AggregateAction aggregateAction; + + @Mock + private PluginModel actionConfiguration; + + @Mock + private GroupStateManager groupStateManager; + + @Mock + private AggregateActionResponse aggregateActionResponse; + + private AggregateProcessor aggregateProcessor; + private Event event; + + @BeforeEach + void setup() { + when(aggregateProcessorConfig.getAggregateAction()).thenReturn(actionConfiguration); + when(actionConfiguration.getPluginName()).thenReturn(UUID.randomUUID().toString()); + when(actionConfiguration.getPluginSettings()).thenReturn(Collections.emptyMap()); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) + .thenReturn(aggregateAction); + + when(aggregateProcessorConfig.getIdentificationKeys()).thenReturn(Collections.emptyList()); + + final Map eventMap = new HashMap<>(); + eventMap.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + + event = JacksonEvent.builder() + .withData(eventMap) + .withEventType("event") + .build(); + + final Map expectedIdentificationKeyHash = new HashMap<>(eventMap); + + when(aggregateIdentificationKeysHasher.createIdentificationKeyHashFromEvent(event, Collections.emptyList())) + .thenReturn(expectedIdentificationKeyHash); + when(groupStateManager.getGroupState(expectedIdentificationKeyHash)).thenReturn(Collections.emptyMap()); + when(aggregateAction.handleEvent(eq(event), eq(Collections.emptyMap()))).thenReturn(aggregateActionResponse); + } + + private AggregateProcessor createObjectUnderTest() { + return new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, groupStateManager, aggregateIdentificationKeysHasher); + } + + @Test + void handleEvent_returing_with_no_event_does_not_add_event_to_records_out() { + aggregateProcessor = createObjectUnderTest(); + when(aggregateActionResponse.getEvent()).thenReturn(null); + + final List> recordsOut = (List>)aggregateProcessor.doExecute(Collections.singletonList(new Record<>(event))); + + assertThat(recordsOut.size(), equalTo(0)); + } + + @Test + void handleEvent_returning_with_event_adds_event_to_records_out() { + aggregateProcessor = createObjectUnderTest(); + when(aggregateActionResponse.getEvent()).thenReturn(event); + + final List> recordsOut = (List>)aggregateProcessor.doExecute(Collections.singletonList(new Record<>(event))); + + assertThat(recordsOut.size(), equalTo(1)); + assertThat(recordsOut.get(0), notNullValue()); + assertThat(recordsOut.get(0).getData(), equalTo(event)); + } +} diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManagerTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManagerTest.java new file mode 100644 index 0000000000..4c73078dec --- /dev/null +++ b/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManagerTest.java @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.processor.aggregate; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.hasKey; + +public class GroupStateManagerTest { + + private GroupStateManager groupStateManager; + private Map identificationKeysHash; + private Map startingGroupState; + + @BeforeEach + void setup() { + identificationKeysHash = new HashMap<>(); + identificationKeysHash.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + + startingGroupState = new HashMap<>(); + startingGroupState.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } + + private GroupStateManager createObjectUnderTest() { + return new GroupStateManager(); + } + + @Test + void getGroupState_with_existing_group_state_returns_correct_group_state() { + groupStateManager = createObjectUnderTest(); + groupStateManager.setGroupStateForIdentificationKeysHash(identificationKeysHash, startingGroupState); + + final Map resultingGroupState = groupStateManager.getGroupState(identificationKeysHash); + assertThat(resultingGroupState, equalTo(startingGroupState)); + } + + @Test + void getGroupState_with_non_existing_group_state_creates_and_returns_new_group_state_and_adds_to_allGroupStates() { + groupStateManager = createObjectUnderTest(); + + final Map emptyGroupState = groupStateManager.getGroupState(identificationKeysHash); + assertThat(emptyGroupState, equalTo(Collections.emptyMap())); + + final Map> allGroupStates = groupStateManager.getAllGroupStates(); + assertThat(allGroupStates, hasKey(identificationKeysHash)); + } +} From 1c625233605e7db2ae990fcad02d440a7f9ac659 Mon Sep 17 00:00:00 2001 From: Taylor Gray <33740195+graytaylor0@users.noreply.github.com> Date: Fri, 21 Jan 2022 16:32:47 -0600 Subject: [PATCH 2/3] Add GroupState and IdentificationHash classes, address PR comments Signed-off-by: Taylor Gray <33740195+graytaylor0@users.noreply.github.com> --- .../AggregateIdentificationKeysHasher.java | 27 +++++++++++--- .../aggregate/AggregateProcessor.java | 16 ++++----- .../aggregate/AggregateProcessorConfig.java | 3 +- .../processor/aggregate/GroupState.java | 21 +++++++++++ .../aggregate/GroupStateManager.java | 24 ++++--------- ...AggregateIdentificationKeysHasherTest.java | 6 ++-- .../aggregate/AggregateProcessorTest.java | 28 ++++++++------- .../aggregate/GroupStateManagerTest.java | 36 +++++++++++-------- 8 files changed, 100 insertions(+), 61 deletions(-) create mode 100644 data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupState.java diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasher.java b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasher.java index da92b3c3af..5f51875348 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasher.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasher.java @@ -12,15 +12,34 @@ import java.util.Map; class AggregateIdentificationKeysHasher { - AggregateIdentificationKeysHasher() { - + private final List identificationKeys; + AggregateIdentificationKeysHasher(final List identificationKeys) { + this.identificationKeys = identificationKeys; } - Map createIdentificationKeyHashFromEvent(final Event event, final List identificationKeys) { + IdentificationHash createIdentificationKeyHashFromEvent(final Event event) { final Map identificationKeysHash = new HashMap<>(); for (final String identificationKey : identificationKeys) { identificationKeysHash.put(identificationKey, event.get(identificationKey, Object.class)); } - return identificationKeysHash; + return new IdentificationHash(identificationKeysHash); + } + + public static class IdentificationHash { + private final Map hash; + + IdentificationHash(final Map hash) { + this.hash = hash; + } + + @Override + public boolean equals(Object other) { + return this.hash.equals(other); + } + + @Override + public int hashCode() { + return hash.hashCode(); + } } } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessor.java b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessor.java index 48fef39bf7..53e59dc2c1 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessor.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessor.java @@ -21,8 +21,6 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.Optional; @DataPrepperPlugin(name = "aggregate", pluginType = Processor.class, pluginConfigurationType = AggregateProcessorConfig.class) public class AggregateProcessor extends AbstractProcessor, Record> { @@ -35,7 +33,7 @@ public class AggregateProcessor extends AbstractProcessor, Record< @DataPrepperPluginConstructor public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory) { - this(aggregateProcessorConfig, pluginMetrics, pluginFactory, new GroupStateManager(), new AggregateIdentificationKeysHasher()); + this(aggregateProcessorConfig, pluginMetrics, pluginFactory, new GroupStateManager(), new AggregateIdentificationKeysHasher(aggregateProcessorConfig.getIdentificationKeys())); } public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, final GroupStateManager groupStateManager, final AggregateIdentificationKeysHasher aggregateIdentificationKeysHasher) { super(pluginMetrics); @@ -57,14 +55,16 @@ public Collection> doExecute(Collection> records) { for (final Record record : records) { final Event event = record.getData(); - final Map identificationKeysHash = aggregateIdentificationKeysHasher.createIdentificationKeyHashFromEvent(event, aggregateProcessorConfig.getIdentificationKeys()); - final Map groupStateForEvent = groupStateManager.getGroupState(identificationKeysHash); + final AggregateIdentificationKeysHasher.IdentificationHash identificationKeysHash = aggregateIdentificationKeysHasher.createIdentificationKeyHashFromEvent(event); + final GroupState groupStateForEvent = groupStateManager.getGroupState(identificationKeysHash); - final AggregateActionResponse handleEventResponse = aggregateAction.handleEvent(event, groupStateForEvent); + final AggregateActionResponse handleEventResponse = aggregateAction.handleEvent(event, groupStateForEvent.getGroupState()); - final Optional aggregateActionResponseEvent = Optional.ofNullable(handleEventResponse.getEvent()); - aggregateActionResponseEvent.ifPresent(value -> recordsOut.add(new Record<>(value, record.getMetadata()))); + final Event aggregateActionResponseEvent = handleEventResponse.getEvent(); + if (aggregateActionResponseEvent != null) { + recordsOut.add(new Record<>(aggregateActionResponseEvent, record.getMetadata())); + } } return recordsOut; } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessorConfig.java b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessorConfig.java index 6f9500ce22..f6d2171047 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessorConfig.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessorConfig.java @@ -11,6 +11,7 @@ import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; import java.io.File; import java.util.List; @@ -33,7 +34,7 @@ public class AggregateProcessorConfig { private String dbPath = DEFAULT_DB_PATH; @JsonProperty("action") - @NotEmpty + @NotNull private PluginModel aggregateAction; @JsonIgnore diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupState.java b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupState.java new file mode 100644 index 0000000000..7065f96e09 --- /dev/null +++ b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupState.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.processor.aggregate; + +import java.util.HashMap; +import java.util.Map; + +public class GroupState { + private final Map groupState = new HashMap<>(); + + GroupState() { + + } + + public Map getGroupState() { + return groupState; + } +} diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManager.java b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManager.java index ad61a18d50..c776d3b353 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManager.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManager.java @@ -5,37 +5,27 @@ package com.amazon.dataprepper.plugins.processor.aggregate; -import java.util.HashMap; import java.util.Map; -import java.util.Optional; import com.google.common.collect.Maps; class GroupStateManager { - private Map> allGroupStates = Maps.newConcurrentMap(); + private final Map allGroupStates = Maps.newConcurrentMap(); GroupStateManager() { } - Map getGroupState(final Map identificationKeysHash) { - final Optional> groupState = Optional.ofNullable(allGroupStates.get(identificationKeysHash)); + GroupState getGroupState(final AggregateIdentificationKeysHasher.IdentificationHash identificationHash) { + final GroupState groupState = allGroupStates.get(identificationHash); - if (groupState.isPresent()) { - return groupState.get(); + if (groupState != null) { + return groupState; } - final Map newGroupState = new HashMap<>(); - allGroupStates.put(identificationKeysHash, newGroupState); + final GroupState newGroupState = new GroupState(); + allGroupStates.put(identificationHash, newGroupState); return newGroupState; } - - void setGroupStateForIdentificationKeysHash(final Map identificationKeysHash, final Map groupState) { - allGroupStates.put(identificationKeysHash, groupState); - } - - Map> getAllGroupStates() { - return this.allGroupStates; - } } diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasherTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasherTest.java index a56b81c6f9..6b70debe99 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasherTest.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasherTest.java @@ -32,7 +32,7 @@ void setup() { } private AggregateIdentificationKeysHasher createObjectUnderTest() { - return new AggregateIdentificationKeysHasher(); + return new AggregateIdentificationKeysHasher(identificationKeys); } @Test @@ -51,7 +51,7 @@ void createIdentificationKeyHashFromEvent_returns_expected_Map() { .withData(eventMap) .build(); - final Map result = aggregateIdentificationKeysHasher.createIdentificationKeyHashFromEvent(event, identificationKeys); + final AggregateIdentificationKeysHasher.IdentificationHash result = aggregateIdentificationKeysHasher.createIdentificationKeyHashFromEvent(event); assertThat(result, equalTo(expectedResult)); } @@ -71,7 +71,7 @@ void createIdentificationKeysHashFromEvent_where_Event_does_not_contain_one_of_t .withData(eventMap) .build(); - final Map result = aggregateIdentificationKeysHasher.createIdentificationKeyHashFromEvent(event, identificationKeys); + final AggregateIdentificationKeysHasher.IdentificationHash result = aggregateIdentificationKeysHasher.createIdentificationKeyHashFromEvent(event); assertThat(result, equalTo(expectedResult)); } } diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java index 3f83678965..f9a1d2bb53 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java @@ -43,6 +43,9 @@ public class AggregateProcessorTest { @Mock private AggregateIdentificationKeysHasher aggregateIdentificationKeysHasher; + @Mock + private AggregateIdentificationKeysHasher.IdentificationHash identificationHash; + @Mock private AggregateProcessorConfig aggregateProcessorConfig; @@ -55,10 +58,12 @@ public class AggregateProcessorTest { @Mock private GroupStateManager groupStateManager; + @Mock + private GroupState groupState; + @Mock private AggregateActionResponse aggregateActionResponse; - private AggregateProcessor aggregateProcessor; private Event event; @BeforeEach @@ -69,8 +74,6 @@ void setup() { when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) .thenReturn(aggregateAction); - when(aggregateProcessorConfig.getIdentificationKeys()).thenReturn(Collections.emptyList()); - final Map eventMap = new HashMap<>(); eventMap.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); @@ -79,11 +82,10 @@ void setup() { .withEventType("event") .build(); - final Map expectedIdentificationKeyHash = new HashMap<>(eventMap); - - when(aggregateIdentificationKeysHasher.createIdentificationKeyHashFromEvent(event, Collections.emptyList())) - .thenReturn(expectedIdentificationKeyHash); - when(groupStateManager.getGroupState(expectedIdentificationKeyHash)).thenReturn(Collections.emptyMap()); + when(aggregateIdentificationKeysHasher.createIdentificationKeyHashFromEvent(event)) + .thenReturn(identificationHash); + when(groupStateManager.getGroupState(identificationHash)).thenReturn(groupState); + when(groupState.getGroupState()).thenReturn(Collections.emptyMap()); when(aggregateAction.handleEvent(eq(event), eq(Collections.emptyMap()))).thenReturn(aggregateActionResponse); } @@ -92,21 +94,21 @@ private AggregateProcessor createObjectUnderTest() { } @Test - void handleEvent_returing_with_no_event_does_not_add_event_to_records_out() { - aggregateProcessor = createObjectUnderTest(); + void handleEvent_returning_with_no_event_does_not_add_event_to_records_out() { + final AggregateProcessor objectUnderTest = createObjectUnderTest(); when(aggregateActionResponse.getEvent()).thenReturn(null); - final List> recordsOut = (List>)aggregateProcessor.doExecute(Collections.singletonList(new Record<>(event))); + final List> recordsOut = (List>)objectUnderTest.doExecute(Collections.singletonList(new Record<>(event))); assertThat(recordsOut.size(), equalTo(0)); } @Test void handleEvent_returning_with_event_adds_event_to_records_out() { - aggregateProcessor = createObjectUnderTest(); + final AggregateProcessor objectUnderTest = createObjectUnderTest(); when(aggregateActionResponse.getEvent()).thenReturn(event); - final List> recordsOut = (List>)aggregateProcessor.doExecute(Collections.singletonList(new Record<>(event))); + final List> recordsOut = (List>)objectUnderTest.doExecute(Collections.singletonList(new Record<>(event))); assertThat(recordsOut.size(), equalTo(1)); assertThat(recordsOut.get(0), notNullValue()); diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManagerTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManagerTest.java index 4c73078dec..c13e45fc63 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManagerTest.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManagerTest.java @@ -13,23 +13,24 @@ import java.util.Map; import java.util.UUID; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.CoreMatchers.sameInstance; public class GroupStateManagerTest { private GroupStateManager groupStateManager; - private Map identificationKeysHash; - private Map startingGroupState; + + private AggregateIdentificationKeysHasher.IdentificationHash identificationHash; @BeforeEach void setup() { - identificationKeysHash = new HashMap<>(); + final Map identificationKeysHash = new HashMap<>(); identificationKeysHash.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); - startingGroupState = new HashMap<>(); - startingGroupState.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + identificationHash = new AggregateIdentificationKeysHasher.IdentificationHash(identificationKeysHash); } private GroupStateManager createObjectUnderTest() { @@ -37,22 +38,27 @@ private GroupStateManager createObjectUnderTest() { } @Test - void getGroupState_with_existing_group_state_returns_correct_group_state() { + void getGroupState_with_non_existing_group_state_creates_and_returns_new_group_state_and_adds_to_allGroupStates() { groupStateManager = createObjectUnderTest(); - groupStateManager.setGroupStateForIdentificationKeysHash(identificationKeysHash, startingGroupState); - final Map resultingGroupState = groupStateManager.getGroupState(identificationKeysHash); - assertThat(resultingGroupState, equalTo(startingGroupState)); + final GroupState emptyGroupState = groupStateManager.getGroupState(identificationHash); + assertThat(emptyGroupState, notNullValue()); + assertThat(emptyGroupState.getGroupState(), equalTo(Collections.emptyMap())); + + final GroupState secondGroupState = groupStateManager.getGroupState(identificationHash); + assertThat(secondGroupState, notNullValue()); + assertThat(secondGroupState, is(sameInstance(emptyGroupState))); } @Test - void getGroupState_with_non_existing_group_state_creates_and_returns_new_group_state_and_adds_to_allGroupStates() { + void getGroupState_returns_a_mutable_GroupState_Map() { groupStateManager = createObjectUnderTest(); - final Map emptyGroupState = groupStateManager.getGroupState(identificationKeysHash); - assertThat(emptyGroupState, equalTo(Collections.emptyMap())); + final GroupState firstGroupState = groupStateManager.getGroupState(identificationHash); + firstGroupState.getGroupState().put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + + final GroupState secondGroupState = groupStateManager.getGroupState(identificationHash); + assertThat(secondGroupState, equalTo(firstGroupState)); - final Map> allGroupStates = groupStateManager.getAllGroupStates(); - assertThat(allGroupStates, hasKey(identificationKeysHash)); } } From c5a7806bfa5468bbe94743ee1f04b8ff690de6c1 Mon Sep 17 00:00:00 2001 From: Taylor Gray <33740195+graytaylor0@users.noreply.github.com> Date: Mon, 24 Jan 2022 11:13:30 -0600 Subject: [PATCH 3/3] Rename GroupStateManager to AggregateGroupManager, rename GroupState to AggregateGroup Signed-off-by: Taylor Gray <33740195+graytaylor0@users.noreply.github.com> --- .../{GroupState.java => AggregateGroup.java} | 4 +- .../aggregate/AggregateGroupManager.java | 31 +++++++++ .../aggregate/AggregateProcessor.java | 12 ++-- .../aggregate/GroupStateManager.java | 31 --------- .../aggregate/AggregateGroupManagerTest.java | 64 +++++++++++++++++++ .../aggregate/AggregateProcessorTest.java | 10 +-- .../aggregate/GroupStateManagerTest.java | 64 ------------------- 7 files changed, 108 insertions(+), 108 deletions(-) rename data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/{GroupState.java => AggregateGroup.java} (87%) create mode 100644 data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateGroupManager.java delete mode 100644 data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManager.java create mode 100644 data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateGroupManagerTest.java delete mode 100644 data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManagerTest.java diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupState.java b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateGroup.java similarity index 87% rename from data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupState.java rename to data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateGroup.java index 7065f96e09..9cdab44682 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupState.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateGroup.java @@ -8,10 +8,10 @@ import java.util.HashMap; import java.util.Map; -public class GroupState { +public class AggregateGroup { private final Map groupState = new HashMap<>(); - GroupState() { + AggregateGroup() { } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateGroupManager.java b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateGroupManager.java new file mode 100644 index 0000000000..e9ce32f321 --- /dev/null +++ b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateGroupManager.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.processor.aggregate; + +import java.util.Map; + +import com.google.common.collect.Maps; + +class AggregateGroupManager { + + private final Map allGroupStates = Maps.newConcurrentMap(); + + AggregateGroupManager() { + + } + + AggregateGroup getAggregateGroup(final AggregateIdentificationKeysHasher.IdentificationHash identificationHash) { + final AggregateGroup aggregateGroup = allGroupStates.get(identificationHash); + + if (aggregateGroup != null) { + return aggregateGroup; + } + + final AggregateGroup newAggregateGroup = new AggregateGroup(); + allGroupStates.put(identificationHash, newAggregateGroup); + return newAggregateGroup; + } +} diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessor.java b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessor.java index 53e59dc2c1..4072028fc7 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessor.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessor.java @@ -27,18 +27,18 @@ public class AggregateProcessor extends AbstractProcessor, Record< private static final Logger LOG = LoggerFactory.getLogger(AggregateProcessor.class); private final AggregateProcessorConfig aggregateProcessorConfig; - private final GroupStateManager groupStateManager; + private final AggregateGroupManager aggregateGroupManager; private final AggregateAction aggregateAction; private final AggregateIdentificationKeysHasher aggregateIdentificationKeysHasher; @DataPrepperPluginConstructor public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory) { - this(aggregateProcessorConfig, pluginMetrics, pluginFactory, new GroupStateManager(), new AggregateIdentificationKeysHasher(aggregateProcessorConfig.getIdentificationKeys())); + this(aggregateProcessorConfig, pluginMetrics, pluginFactory, new AggregateGroupManager(), new AggregateIdentificationKeysHasher(aggregateProcessorConfig.getIdentificationKeys())); } - public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, final GroupStateManager groupStateManager, final AggregateIdentificationKeysHasher aggregateIdentificationKeysHasher) { + public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, final AggregateGroupManager aggregateGroupManager, final AggregateIdentificationKeysHasher aggregateIdentificationKeysHasher) { super(pluginMetrics); this.aggregateProcessorConfig = aggregateProcessorConfig; - this.groupStateManager = groupStateManager; + this.aggregateGroupManager = aggregateGroupManager; this.aggregateIdentificationKeysHasher = aggregateIdentificationKeysHasher; this.aggregateAction = loadAggregateAction(pluginFactory); } @@ -56,9 +56,9 @@ public Collection> doExecute(Collection> records) { for (final Record record : records) { final Event event = record.getData(); final AggregateIdentificationKeysHasher.IdentificationHash identificationKeysHash = aggregateIdentificationKeysHasher.createIdentificationKeyHashFromEvent(event); - final GroupState groupStateForEvent = groupStateManager.getGroupState(identificationKeysHash); + final AggregateGroup aggregateGroupForEvent = aggregateGroupManager.getAggregateGroup(identificationKeysHash); - final AggregateActionResponse handleEventResponse = aggregateAction.handleEvent(event, groupStateForEvent.getGroupState()); + final AggregateActionResponse handleEventResponse = aggregateAction.handleEvent(event, aggregateGroupForEvent.getGroupState()); final Event aggregateActionResponseEvent = handleEventResponse.getEvent(); diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManager.java b/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManager.java deleted file mode 100644 index c776d3b353..0000000000 --- a/data-prepper-plugins/aggregate-processor/src/main/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManager.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package com.amazon.dataprepper.plugins.processor.aggregate; - -import java.util.Map; - -import com.google.common.collect.Maps; - -class GroupStateManager { - - private final Map allGroupStates = Maps.newConcurrentMap(); - - GroupStateManager() { - - } - - GroupState getGroupState(final AggregateIdentificationKeysHasher.IdentificationHash identificationHash) { - final GroupState groupState = allGroupStates.get(identificationHash); - - if (groupState != null) { - return groupState; - } - - final GroupState newGroupState = new GroupState(); - allGroupStates.put(identificationHash, newGroupState); - return newGroupState; - } -} diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateGroupManagerTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateGroupManagerTest.java new file mode 100644 index 0000000000..320b28b2c3 --- /dev/null +++ b/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateGroupManagerTest.java @@ -0,0 +1,64 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.processor.aggregate; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.CoreMatchers.sameInstance; + +public class AggregateGroupManagerTest { + + private AggregateGroupManager aggregateGroupManager; + + private AggregateIdentificationKeysHasher.IdentificationHash identificationHash; + + @BeforeEach + void setup() { + final Map identificationKeysHash = new HashMap<>(); + identificationKeysHash.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + + identificationHash = new AggregateIdentificationKeysHasher.IdentificationHash(identificationKeysHash); + } + + private AggregateGroupManager createObjectUnderTest() { + return new AggregateGroupManager(); + } + + @Test + void getGroupState_with_non_existing_group_state_creates_and_returns_new_group_state_and_adds_to_allGroupStates() { + aggregateGroupManager = createObjectUnderTest(); + + final AggregateGroup emptyAggregateGroup = aggregateGroupManager.getAggregateGroup(identificationHash); + assertThat(emptyAggregateGroup, notNullValue()); + assertThat(emptyAggregateGroup.getGroupState(), equalTo(Collections.emptyMap())); + + final AggregateGroup secondAggregateGroup = aggregateGroupManager.getAggregateGroup(identificationHash); + assertThat(secondAggregateGroup, notNullValue()); + assertThat(secondAggregateGroup, is(sameInstance(emptyAggregateGroup))); + } + + @Test + void getGroupState_returns_a_mutable_GroupState_Map() { + aggregateGroupManager = createObjectUnderTest(); + + final AggregateGroup firstAggregateGroup = aggregateGroupManager.getAggregateGroup(identificationHash); + firstAggregateGroup.getGroupState().put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + + final AggregateGroup secondAggregateGroup = aggregateGroupManager.getAggregateGroup(identificationHash); + assertThat(secondAggregateGroup, equalTo(firstAggregateGroup)); + + } +} diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java index f9a1d2bb53..0a593a9c22 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java @@ -56,10 +56,10 @@ public class AggregateProcessorTest { private PluginModel actionConfiguration; @Mock - private GroupStateManager groupStateManager; + private AggregateGroupManager aggregateGroupManager; @Mock - private GroupState groupState; + private AggregateGroup aggregateGroup; @Mock private AggregateActionResponse aggregateActionResponse; @@ -84,13 +84,13 @@ void setup() { when(aggregateIdentificationKeysHasher.createIdentificationKeyHashFromEvent(event)) .thenReturn(identificationHash); - when(groupStateManager.getGroupState(identificationHash)).thenReturn(groupState); - when(groupState.getGroupState()).thenReturn(Collections.emptyMap()); + when(aggregateGroupManager.getAggregateGroup(identificationHash)).thenReturn(aggregateGroup); + when(aggregateGroup.getGroupState()).thenReturn(Collections.emptyMap()); when(aggregateAction.handleEvent(eq(event), eq(Collections.emptyMap()))).thenReturn(aggregateActionResponse); } private AggregateProcessor createObjectUnderTest() { - return new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, groupStateManager, aggregateIdentificationKeysHasher); + return new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, aggregateGroupManager, aggregateIdentificationKeysHasher); } @Test diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManagerTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManagerTest.java deleted file mode 100644 index c13e45fc63..0000000000 --- a/data-prepper-plugins/aggregate-processor/src/test/java/com/amazon/dataprepper/plugins/processor/aggregate/GroupStateManagerTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package com.amazon.dataprepper.plugins.processor.aggregate; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.CoreMatchers.sameInstance; - -public class GroupStateManagerTest { - - private GroupStateManager groupStateManager; - - private AggregateIdentificationKeysHasher.IdentificationHash identificationHash; - - @BeforeEach - void setup() { - final Map identificationKeysHash = new HashMap<>(); - identificationKeysHash.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); - - identificationHash = new AggregateIdentificationKeysHasher.IdentificationHash(identificationKeysHash); - } - - private GroupStateManager createObjectUnderTest() { - return new GroupStateManager(); - } - - @Test - void getGroupState_with_non_existing_group_state_creates_and_returns_new_group_state_and_adds_to_allGroupStates() { - groupStateManager = createObjectUnderTest(); - - final GroupState emptyGroupState = groupStateManager.getGroupState(identificationHash); - assertThat(emptyGroupState, notNullValue()); - assertThat(emptyGroupState.getGroupState(), equalTo(Collections.emptyMap())); - - final GroupState secondGroupState = groupStateManager.getGroupState(identificationHash); - assertThat(secondGroupState, notNullValue()); - assertThat(secondGroupState, is(sameInstance(emptyGroupState))); - } - - @Test - void getGroupState_returns_a_mutable_GroupState_Map() { - groupStateManager = createObjectUnderTest(); - - final GroupState firstGroupState = groupStateManager.getGroupState(identificationHash); - firstGroupState.getGroupState().put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); - - final GroupState secondGroupState = groupStateManager.getGroupState(identificationHash); - assertThat(secondGroupState, equalTo(firstGroupState)); - - } -}