From 2d84ba9b2735bb896fb5a8e568649ae82680ff35 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka <41027584+kkondaka@users.noreply.github.com> Date: Mon, 1 Apr 2024 12:22:09 -0700 Subject: [PATCH] Aggregate Processor: local mode should work when there is no when condition (#4380) Signed-off-by: Krishna Kondaka Co-authored-by: Krishna Kondaka --- .../aggregate/AggregateProcessor.java | 6 +- .../aggregate/AggregateProcessorTest.java | 67 +++++++++++++++++++ 2 files changed, 70 insertions(+), 3 deletions(-) diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java index 1f3a58c37a..3950876cd0 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java @@ -152,12 +152,12 @@ public void shutdown() { @Override public boolean isApplicableEventForPeerForwarding(Event event) { - if (whenCondition == null) { - return true; - } if (localMode) { return false; } + if (whenCondition == null) { + return true; + } return expressionEvaluator.evaluateConditional(whenCondition, event); } diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java index 290f87d551..12f44cfcba 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java @@ -85,6 +85,8 @@ public class AggregateProcessorTest { @Mock private AggregateActionResponse firstAggregateActionResponse; + @Mock + private AggregateActionResponse secondAggregateActionResponse; @Mock private PluginMetrics pluginMetrics; @@ -311,6 +313,71 @@ void handleEvent_returning_with_condition_eliminates_one_record_local_only() { verify(aggregateGroupManager).getGroupsToConclude(eq(false)); } + @Test + void handleEvent_returning_no_condition_eliminates_one_record_local_only() { + final String eventKey = UUID.randomUUID().toString(); + final String key1 = UUID.randomUUID().toString(); + final String key2 = UUID.randomUUID().toString(); + Event firstEvent; + Event secondEvent; + final Map eventMap1 = new HashMap<>(); + eventMap1.put(eventKey, key1); + + firstEvent = JacksonEvent.builder() + .withData(eventMap1) + .withEventType("event") + .build(); + + final Map eventMap2 = new HashMap<>(); + eventMap2.put(eventKey, key2); + + secondEvent = JacksonEvent.builder() + .withData(eventMap2) + .withEventType("event") + .build(); + + + when(identificationKeysHasher.createIdentificationKeysMapFromEvent(firstEvent)) + .thenReturn(identificationKeysMap); + when(identificationKeysHasher.createIdentificationKeysMapFromEvent(secondEvent)) + .thenReturn(identificationKeysMap); + when(aggregateActionSynchronizer.handleEventForGroup(firstEvent, identificationKeysMap, aggregateGroup)).thenReturn(firstAggregateActionResponse); + when(aggregateActionSynchronizer.handleEventForGroup(secondEvent, identificationKeysMap, aggregateGroup)).thenReturn(secondAggregateActionResponse); + when(aggregateProcessorConfig.getWhenCondition()).thenReturn(null); + when(aggregateProcessorConfig.getLocalMode()).thenReturn(true); + final AggregateProcessor objectUnderTest = createObjectUnderTest(); + when(aggregateGroupManager.getGroupsToConclude(eq(false))).thenReturn(Collections.emptyList()); + when(aggregateActionResponse.getEvent()).thenReturn(event); + when(firstAggregateActionResponse.getEvent()).thenReturn(firstEvent); + when(secondAggregateActionResponse.getEvent()).thenReturn(secondEvent); + + event.toMap().put(eventKey, key1); + List> recordsIn = new ArrayList<>(); + recordsIn.add(new Record(firstEvent)); + recordsIn.add(new Record(secondEvent)); + recordsIn.add(new Record(event)); + Collection> c = recordsIn; + assertThat(objectUnderTest.isApplicableEventForPeerForwarding(event), equalTo(false)); + assertThat(objectUnderTest.isApplicableEventForPeerForwarding(firstEvent), equalTo(false)); + assertThat(objectUnderTest.isApplicableEventForPeerForwarding(secondEvent), equalTo(false)); + final List> recordsOut = (List>) objectUnderTest.doExecute(c); + + assertThat(recordsOut.size(), equalTo(3)); + assertThat(recordsOut.get(0), notNullValue()); + assertThat(recordsOut.get(0).getData(), equalTo(firstEvent)); + assertThat(recordsOut.get(1), notNullValue()); + assertThat(recordsOut.get(1).getData(), equalTo(secondEvent)); + assertThat(recordsOut.get(2), notNullValue()); + assertThat(recordsOut.get(2).getData(), equalTo(event)); + + verify(actionHandleEventsDroppedCounter).increment(0); + verify(actionHandleEventsOutCounter).increment(3); + verifyNoInteractions(actionConcludeGroupEventsDroppedCounter); + verifyNoInteractions(actionConcludeGroupEventsOutCounter); + + verify(aggregateGroupManager).getGroupsToConclude(eq(false)); + } + @Test void handleEvent_returning_with_event_adds_event_to_records_out() { final AggregateProcessor objectUnderTest = createObjectUnderTest();