Skip to content

Commit

Permalink
Support default route option for Events that match no other route (op…
Browse files Browse the repository at this point in the history
…ensearch-project#4662)

Support default route option for Events that match no other route

Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka authored and Krishna Kondaka committed Aug 8, 2024
1 parent e544ef4 commit b03a0d0
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.integration;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;

class Router_ThreeRoutesDefaultIT {
private static final String TESTING_KEY = "ConditionalRoutingIT";
private static final String ALL_SOURCE_KEY = TESTING_KEY + "_all";
private static final String ALPHA_SOURCE_KEY = TESTING_KEY + "_alpha";
private static final String BETA_SOURCE_KEY = TESTING_KEY + "_beta";
private static final String ALPHA_DEFAULT_SOURCE_KEY = TESTING_KEY + "_alpha_default";
private static final String ALPHA_BETA_GAMMA_SOURCE_KEY = TESTING_KEY + "_alpha_beta_gamma";
private static final String DEFAULT_SOURCE_KEY = TESTING_KEY + "_default";
private static final String KNOWN_CONDITIONAL_KEY = "value";
private static final String ALPHA_VALUE = "a";
private static final String BETA_VALUE = "b";
private static final String GAMMA_VALUE = "g";
private static final String DEFAULT_VALUE = "z";
private DataPrepperTestRunner dataPrepperTestRunner;
private InMemorySourceAccessor inMemorySourceAccessor;
private InMemorySinkAccessor inMemorySinkAccessor;

@BeforeEach
void setUp() {
dataPrepperTestRunner = DataPrepperTestRunner.builder()
.withPipelinesDirectoryOrFile("route/three-route-with-default-route.yaml")
.build();

dataPrepperTestRunner.start();
inMemorySourceAccessor = dataPrepperTestRunner.getInMemorySourceAccessor();
inMemorySinkAccessor = dataPrepperTestRunner.getInMemorySinkAccessor();
}

@AfterEach
void tearDown() {
dataPrepperTestRunner.stop();
}

@Test
void test_default_route() {
final List<Record<Event>> alphaEvents = createEvents(ALPHA_VALUE, 10);
final List<Record<Event>> betaEvents = createEvents(BETA_VALUE, 20);
final List<Record<Event>> gammaEvents = createEvents(GAMMA_VALUE, 20);
final List<Record<Event>> defaultEvents = createEvents(DEFAULT_VALUE, 20);

final List<Record<Event>> allEvents = new ArrayList<>(alphaEvents);
allEvents.addAll(betaEvents);
allEvents.addAll(gammaEvents);
allEvents.addAll(defaultEvents);
Collections.shuffle(allEvents);

inMemorySourceAccessor.submit(TESTING_KEY, allEvents);

await().atMost(2, TimeUnit.SECONDS)
.untilAsserted(() -> {
assertThat(inMemorySinkAccessor.get(ALPHA_SOURCE_KEY), not(empty()));
assertThat(inMemorySinkAccessor.get(BETA_SOURCE_KEY), not(empty()));
assertThat(inMemorySinkAccessor.get(ALL_SOURCE_KEY), not(empty()));
assertThat(inMemorySinkAccessor.get(ALPHA_DEFAULT_SOURCE_KEY), not(empty()));
assertThat(inMemorySinkAccessor.get(ALPHA_BETA_GAMMA_SOURCE_KEY), not(empty()));
assertThat(inMemorySinkAccessor.get(DEFAULT_SOURCE_KEY), not(empty()));
});

final List<Record<Event>> actualAlphaRecords = inMemorySinkAccessor.get(ALPHA_SOURCE_KEY);

assertThat(actualAlphaRecords.size(), equalTo(alphaEvents.size()));

assertThat(actualAlphaRecords, containsInAnyOrder(allEvents.stream()
.filter(alphaEvents::contains).toArray()));

final List<Record<Event>> actualBetaRecords = inMemorySinkAccessor.get(BETA_SOURCE_KEY);

assertThat(actualBetaRecords.size(), equalTo(betaEvents.size()));

assertThat(actualBetaRecords, containsInAnyOrder(allEvents.stream()
.filter(betaEvents::contains).toArray()));

final List<Record<Event>> actualDefaultRecords = inMemorySinkAccessor.get(DEFAULT_SOURCE_KEY);

assertThat(actualDefaultRecords.size(), equalTo(defaultEvents.size()));
assertThat(actualDefaultRecords, containsInAnyOrder(allEvents.stream()
.filter(defaultEvents::contains).toArray()));

final List<Record<Event>> actualAlphaDefaultRecords = new ArrayList<>();
actualAlphaDefaultRecords.addAll(actualAlphaRecords);
actualAlphaDefaultRecords.addAll(actualDefaultRecords);
assertThat(actualAlphaDefaultRecords.size(), equalTo(defaultEvents.size()+alphaEvents.size()));
assertThat(actualAlphaDefaultRecords, containsInAnyOrder(allEvents.stream()
.filter(event -> defaultEvents.contains(event) || alphaEvents.contains(event)).toArray()));

}

private List<Record<Event>> createEvents(final String value, final int numberToCreate) {
return IntStream.range(0, numberToCreate)
.mapToObj(i -> Map.of(KNOWN_CONDITIONAL_KEY, value, "arbitrary_field", UUID.randomUUID().toString()))
.map(map -> JacksonEvent.builder().withData(map).withEventType("TEST").build())
.map(jacksonEvent -> (Event) jacksonEvent)
.map(Record::new)
.collect(Collectors.toList());
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
routing-pipeline:
workers: 4
delay: 10
source:
in_memory:
testing_key: ConditionalRoutingIT
buffer:
bounded_blocking:
# Use a small batch size to help ensure that multiple threads
# are picking up the different routes.
batch_size: 10
route:
- alpha: '/value == "a"'
- beta: '/value == "b"'
- gamma: '/value == "g"'
sink:
- in_memory:
testing_key: ConditionalRoutingIT_alpha
routes:
- alpha
- in_memory:
testing_key: ConditionalRoutingIT_beta
routes:
- beta
- in_memory:
testing_key: ConditionalRoutingIT_alpha_default
routes:
- alpha
- _default
- in_memory:
testing_key: ConditionalRoutingIT_alpha_beta_gamma
routes:
- alpha
- beta
- gamma
- in_memory:
testing_key: ConditionalRoutingIT_default
routes:
- _default
- in_memory:
testing_key: ConditionalRoutingIT_all
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* intended to help break apart {@link Router} for better testing.
*/
class DataFlowComponentRouter {
static final String DEFAULT_ROUTE = "_default";
<C> void route(final Collection<Record> allRecords,
final DataFlowComponent<C> dataFlowComponent,
final Map<Record, Set<String>> recordsToRoutes,
Expand All @@ -37,7 +38,9 @@ <C> void route(final Collection<Record> allRecords,
final Set<String> routesForEvent = recordsToRoutes
.getOrDefault(record, Collections.emptySet());

if (routesForEvent.stream().anyMatch(dataFlowComponentRoutes::contains)) {
if (routesForEvent.size() == 0 && dataFlowComponentRoutes.contains(DEFAULT_ROUTE)) {
recordsForComponent.add(getRecordStrategy.getRecord(record));
} else if (routesForEvent.stream().anyMatch(dataFlowComponentRoutes::contains)) {
recordsForComponent.add(getRecordStrategy.getRecord(record));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,17 @@ void route_no_Events_when_none_have_matching_routes() {
verify(componentRecordsConsumer).accept(testComponent, Collections.emptyList());
}

@Test
void route_no_Events_when_none_have_matching_routes_with_default_route() {
when(dataFlowComponent.getRoutes()).thenReturn(Set.of(DataFlowComponentRouter.DEFAULT_ROUTE));
final Map<Record, Set<String>> noMatchingRoutes = recordsIn.stream()
.collect(Collectors.toMap(Function.identity(), r -> Collections.emptySet()));

createObjectUnderTest().route(recordsIn, dataFlowComponent, noMatchingRoutes, getRecordStrategy, componentRecordsConsumer);

verify(componentRecordsConsumer).accept(testComponent, recordsIn);
}


@Test
void route_all_Events_when_all_have_matched_route() {
Expand Down Expand Up @@ -236,6 +247,33 @@ void route_no_Events_when_none_have_matching_routes() {
verify(componentRecordsConsumer).accept(testComponent, Collections.emptyList());
}

@Test
void route_no_Events_when_none_have_matching_routes_with_default_route() {
when(dataFlowComponent.getRoutes()).thenReturn(Set.of(DataFlowComponentRouter.DEFAULT_ROUTE));
final Map<Record, Set<String>> noMatchingRoutes = recordsIn.stream()
.collect(Collectors.toMap(Function.identity(), r -> Collections.emptySet()));

createObjectUnderTest().route(recordsIn, dataFlowComponent, noMatchingRoutes, getRecordStrategy, componentRecordsConsumer);

verify(componentRecordsConsumer).accept(testComponent, recordsIn);
}

@Test
void route_matched_events_with_none_to_default_route() {
DataFlowComponent<TestComponent> dataFlowComponent2 = mock(DataFlowComponent.class);
when(dataFlowComponent2.getRoutes()).thenReturn(Set.of(DataFlowComponentRouter.DEFAULT_ROUTE));
final Map<Record, Set<String>> allMatchingRoutes = recordsIn.stream()
.collect(Collectors.toMap(Function.identity(), r -> Collections.singleton(knownRoute)));

createObjectUnderTest().route(recordsIn, dataFlowComponent2, allMatchingRoutes, getRecordStrategy, componentRecordsConsumer);
verify(componentRecordsConsumer).accept(null, Collections.emptyList());
createObjectUnderTest().route(recordsIn, dataFlowComponent, allMatchingRoutes, getRecordStrategy, componentRecordsConsumer);

verify(componentRecordsConsumer).accept(testComponent, recordsIn);

}



@Test
void route_all_Events_when_all_have_matched_route() {
Expand Down

0 comments on commit b03a0d0

Please sign in to comment.