Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load AggregateAction, implement AggregateAction.handleEvent in AggregateProcessor, add GroupStateManager and AggregateIdentificationKeysHasher classes for use in AggregateProcessor #931

Merged
merged 3 commits into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions data-prepper-plugins/aggregate-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,21 @@ plugins {
id 'java'
}

repositories {
mavenCentral()
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
testImplementation 'org.hamcrest:hamcrest:2.2'
testImplementation "org.mockito:mockito-inline:${versionMap.mockito}"
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule {
limit {
minimum = 1.0
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@
import com.amazon.dataprepper.model.event.Event;

/**
* Model class to be returned in {@link com.amazon.dataprepper.plugins.processor.aggregate.AggregateAction}. Contains both the Event to be processed and an option
* to close the current window for {@link com.amazon.dataprepper.plugins.processor.aggregate.AggregateProcessor} immediately after an event is handled.
* Model class to be returned in {@link com.amazon.dataprepper.plugins.processor.aggregate.AggregateAction}. Contains the Event to be processed, which is null if no Event should be processed.
* @since 1.3
*/
public class AggregateActionResponse {
private Event event;
private final Event event;

public AggregateActionResponse(final Event event) {
this.event = event;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.processor.aggregate;

import java.util.HashMap;
import java.util.Map;

public class AggregateGroup {
private final Map<Object, Object> groupState = new HashMap<>();

AggregateGroup() {

}

public Map<Object, Object> getGroupState() {
Copy link
Contributor

@cmanning09 cmanning09 Jan 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am worried we have created a class for the groupState but are still exposing the underlying data structure and therefore coupling the data to the processor and action.

@dlvenable & @graytaylor0 , have we thought about providing methods / interface to alter the group state without exposing the underlying dats structure?

(I don't want to block this PR but I think this is good conversation to have given our community will be interacting with the state as they write aggregate actions.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a valid point. I have thought about it, and I just don't think we want to restrict the AggregateAction creators like that. It's going to be more work on their part, as instead of getting a java Map that they likely already understand and know how to interact with, they will get some Java class that we've made, that they will have to both read about to understand how to use, and may also find themselves limited in the control that they have over it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely don't want to give them control of the AggregateGroup class, so the option is to extract the groupState map into a GroupState class that wraps the Map. However, this doesn't seem to add much value as we would just be adding functions for put, get, putAll, etc. that just call the actual Map functions, and there is nothing that I am aware of that we would really want to restrict the AggregateAction creator from calling on the Map. As far as validation goes, everything put into the Map will be turned into an Event, so the validation should happen there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a one way door decision that will couple the group state to the map interface. Maybe not completely one way door but it would require a breaking change to extend group state beyond the map interface. I just want to clarify, I do not want to limit the creator of an Aggregate Action from updating the group state as the desire. My biggest concern in raising this point is extensibility.

I don't want to debate the design here and I think others across the group may be interested in this conversation. I will raise an issue for this.

return groupState;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.processor.aggregate;

import java.util.Map;

import com.google.common.collect.Maps;

class AggregateGroupManager {

private final Map<AggregateIdentificationKeysHasher.IdentificationHash, AggregateGroup> allGroupStates = Maps.newConcurrentMap();

AggregateGroupManager() {

}

AggregateGroup getAggregateGroup(final AggregateIdentificationKeysHasher.IdentificationHash identificationHash) {
final AggregateGroup aggregateGroup = allGroupStates.get(identificationHash);

if (aggregateGroup != null) {
return aggregateGroup;
}

final AggregateGroup newAggregateGroup = new AggregateGroup();
allGroupStates.put(identificationHash, newAggregateGroup);
return newAggregateGroup;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.processor.aggregate;

import com.amazon.dataprepper.model.event.Event;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

class AggregateIdentificationKeysHasher {
private final List<String> identificationKeys;
AggregateIdentificationKeysHasher(final List<String> identificationKeys) {
this.identificationKeys = identificationKeys;
}

IdentificationHash createIdentificationKeyHashFromEvent(final Event event) {
final Map<Object, Object> identificationKeysHash = new HashMap<>();
for (final String identificationKey : identificationKeys) {
identificationKeysHash.put(identificationKey, event.get(identificationKey, Object.class));
}
return new IdentificationHash(identificationKeysHash);
}

public static class IdentificationHash {
private final Map<Object, Object> hash;

IdentificationHash(final Map<Object, Object> hash) {
this.hash = hash;
}

@Override
public boolean equals(Object other) {
return this.hash.equals(other);
}

@Override
public int hashCode() {
return hash.hashCode();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import com.amazon.dataprepper.metrics.PluginMetrics;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor;
import com.amazon.dataprepper.model.annotations.SingleThread;
import com.amazon.dataprepper.model.configuration.PipelineDescription;
import com.amazon.dataprepper.model.configuration.PluginModel;
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.event.Event;
import com.amazon.dataprepper.model.plugin.PluginFactory;
import com.amazon.dataprepper.model.processor.AbstractProcessor;
Expand All @@ -19,23 +19,54 @@
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.LinkedList;
import java.util.List;

@SingleThread
@DataPrepperPlugin(name = "aggregate", pluginType = Processor.class, pluginConfigurationType = AggregateProcessorConfig.class)
public class AggregateProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(AggregateProcessor.class);

private final AggregateProcessorConfig aggregateProcessorConfig;
private final AggregateGroupManager aggregateGroupManager;
private final AggregateAction aggregateAction;
private final AggregateIdentificationKeysHasher aggregateIdentificationKeysHasher;

@DataPrepperPluginConstructor
public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, final PipelineDescription pipelineDescription) {
public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory) {
this(aggregateProcessorConfig, pluginMetrics, pluginFactory, new AggregateGroupManager(), new AggregateIdentificationKeysHasher(aggregateProcessorConfig.getIdentificationKeys()));
}
public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, final AggregateGroupManager aggregateGroupManager, final AggregateIdentificationKeysHasher aggregateIdentificationKeysHasher) {
super(pluginMetrics);
this.aggregateProcessorConfig = aggregateProcessorConfig;
this.aggregateGroupManager = aggregateGroupManager;
this.aggregateIdentificationKeysHasher = aggregateIdentificationKeysHasher;
this.aggregateAction = loadAggregateAction(pluginFactory);
}

private AggregateAction loadAggregateAction(final PluginFactory pluginFactory) {
final PluginModel actionConfiguration = aggregateProcessorConfig.getAggregateAction();
final PluginSetting actionPluginSetting = new PluginSetting(actionConfiguration.getPluginName(), actionConfiguration.getPluginSettings());
return pluginFactory.loadPlugin(AggregateAction.class, actionPluginSetting);
}

@Override
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
return records;
final List<Record<Event>> recordsOut = new LinkedList<>();

for (final Record<Event> record : records) {
final Event event = record.getData();
final AggregateIdentificationKeysHasher.IdentificationHash identificationKeysHash = aggregateIdentificationKeysHasher.createIdentificationKeyHashFromEvent(event);
final AggregateGroup aggregateGroupForEvent = aggregateGroupManager.getAggregateGroup(identificationKeysHash);

final AggregateActionResponse handleEventResponse = aggregateAction.handleEvent(event, aggregateGroupForEvent.getGroupState());

final Event aggregateActionResponseEvent = handleEventResponse.getEvent();

if (aggregateActionResponseEvent != null) {
recordsOut.add(new Record<>(aggregateActionResponseEvent, record.getMetadata()));
}
}
return recordsOut;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

package com.amazon.dataprepper.plugins.processor.aggregate;

import com.amazon.dataprepper.model.configuration.PluginModel;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;

import java.io.File;
import java.util.List;
Expand All @@ -31,6 +33,10 @@ public class AggregateProcessorConfig {
@NotEmpty
private String dbPath = DEFAULT_DB_PATH;

@JsonProperty("action")
@NotNull
private PluginModel aggregateAction;

@JsonIgnore
private File dbFile;

Expand All @@ -46,6 +52,8 @@ public String getDbPath() {
return dbPath;
}

public PluginModel getAggregateAction() { return aggregateAction; }

@AssertTrue(message = "db_path is not a valid file path")
boolean isDbPathValid() {
dbFile = new File(dbPath);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.processor.aggregate;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.CoreMatchers.sameInstance;

public class AggregateGroupManagerTest {

private AggregateGroupManager aggregateGroupManager;

private AggregateIdentificationKeysHasher.IdentificationHash identificationHash;

@BeforeEach
void setup() {
final Map<Object, Object> identificationKeysHash = new HashMap<>();
identificationKeysHash.put(UUID.randomUUID().toString(), UUID.randomUUID().toString());

identificationHash = new AggregateIdentificationKeysHasher.IdentificationHash(identificationKeysHash);
}

private AggregateGroupManager createObjectUnderTest() {
return new AggregateGroupManager();
}

@Test
void getGroupState_with_non_existing_group_state_creates_and_returns_new_group_state_and_adds_to_allGroupStates() {
aggregateGroupManager = createObjectUnderTest();

final AggregateGroup emptyAggregateGroup = aggregateGroupManager.getAggregateGroup(identificationHash);
assertThat(emptyAggregateGroup, notNullValue());
assertThat(emptyAggregateGroup.getGroupState(), equalTo(Collections.emptyMap()));

final AggregateGroup secondAggregateGroup = aggregateGroupManager.getAggregateGroup(identificationHash);
assertThat(secondAggregateGroup, notNullValue());
assertThat(secondAggregateGroup, is(sameInstance(emptyAggregateGroup)));
}

@Test
void getGroupState_returns_a_mutable_GroupState_Map() {
aggregateGroupManager = createObjectUnderTest();

final AggregateGroup firstAggregateGroup = aggregateGroupManager.getAggregateGroup(identificationHash);
firstAggregateGroup.getGroupState().put(UUID.randomUUID().toString(), UUID.randomUUID().toString());

final AggregateGroup secondAggregateGroup = aggregateGroupManager.getAggregateGroup(identificationHash);
assertThat(secondAggregateGroup, equalTo(firstAggregateGroup));

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.processor.aggregate;

import com.amazon.dataprepper.model.event.Event;
import com.amazon.dataprepper.model.event.JacksonEvent;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

public class AggregateIdentificationKeysHasherTest {
private Event event;
private List<String> identificationKeys;
private AggregateIdentificationKeysHasher aggregateIdentificationKeysHasher;

@BeforeEach
void setup() {
identificationKeys = new ArrayList<>();
identificationKeys.add("firstIdentificationKey");
identificationKeys.add("secondIdentificationKey");
}

private AggregateIdentificationKeysHasher createObjectUnderTest() {
return new AggregateIdentificationKeysHasher(identificationKeys);
}

@Test
void createIdentificationKeyHashFromEvent_returns_expected_Map() {
aggregateIdentificationKeysHasher = createObjectUnderTest();
final Map<Object, Object> eventMap = new HashMap<>();
eventMap.put("firstIdentificationKey", UUID.randomUUID().toString());
eventMap.put("secondIdentificationKey", UUID.randomUUID().toString());

final Map<Object, Object> expectedResult = new HashMap<>(eventMap);

eventMap.put(UUID.randomUUID().toString(), UUID.randomUUID().toString());

event = JacksonEvent.builder()
.withEventType("event")
.withData(eventMap)
.build();

final AggregateIdentificationKeysHasher.IdentificationHash result = aggregateIdentificationKeysHasher.createIdentificationKeyHashFromEvent(event);
assertThat(result, equalTo(expectedResult));
}

@Test
void createIdentificationKeysHashFromEvent_where_Event_does_not_contain_one_of_the_identification_keys_returns_expected_Map() {
aggregateIdentificationKeysHasher = createObjectUnderTest();
final Map<Object, Object> eventMap = new HashMap<>();
eventMap.put("firstIdentificationKey", UUID.randomUUID().toString());

final Map<Object, Object> expectedResult = new HashMap<>(eventMap);
expectedResult.put("secondIdentificationKey", null);

eventMap.put(UUID.randomUUID().toString(), UUID.randomUUID().toString());

event = JacksonEvent.builder()
.withEventType("event")
.withData(eventMap)
.build();

final AggregateIdentificationKeysHasher.IdentificationHash result = aggregateIdentificationKeysHasher.createIdentificationKeyHashFromEvent(event);
assertThat(result, equalTo(expectedResult));
}
}
Loading