diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_ThreeRoutesDefaultIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_ThreeRoutesDefaultIT.java new file mode 100644 index 0000000000..fbc61053a5 --- /dev/null +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_ThreeRoutesDefaultIT.java @@ -0,0 +1,130 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.integration; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.InMemorySinkAccessor; +import org.opensearch.dataprepper.plugins.InMemorySourceAccessor; +import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; + +class Router_ThreeRoutesDefaultIT { + private static final String TESTING_KEY = "ConditionalRoutingIT"; + private static final String ALL_SOURCE_KEY = TESTING_KEY + "_all"; + private static final String ALPHA_SOURCE_KEY = TESTING_KEY + "_alpha"; + private static final String BETA_SOURCE_KEY = TESTING_KEY + "_beta"; + private static final String ALPHA_DEFAULT_SOURCE_KEY = TESTING_KEY + "_alpha_default"; + private static final String ALPHA_BETA_GAMMA_SOURCE_KEY = TESTING_KEY + "_alpha_beta_gamma"; + private static final String DEFAULT_SOURCE_KEY = TESTING_KEY + "_default"; + private static final String KNOWN_CONDITIONAL_KEY = "value"; + private static final String ALPHA_VALUE = "a"; + private static final String BETA_VALUE = "b"; + private static final String GAMMA_VALUE = "g"; + private static final String DEFAULT_VALUE = "z"; + private DataPrepperTestRunner dataPrepperTestRunner; + private InMemorySourceAccessor inMemorySourceAccessor; + private InMemorySinkAccessor inMemorySinkAccessor; + + @BeforeEach + void setUp() { + dataPrepperTestRunner = DataPrepperTestRunner.builder() + .withPipelinesDirectoryOrFile("route/three-route-with-default-route.yaml") + .build(); + + dataPrepperTestRunner.start(); + inMemorySourceAccessor = dataPrepperTestRunner.getInMemorySourceAccessor(); + inMemorySinkAccessor = dataPrepperTestRunner.getInMemorySinkAccessor(); + } + + @AfterEach + void tearDown() { + dataPrepperTestRunner.stop(); + } + + @Test + void test_default_route() { + final List> alphaEvents = createEvents(ALPHA_VALUE, 10); + final List> betaEvents = createEvents(BETA_VALUE, 20); + final List> gammaEvents = createEvents(GAMMA_VALUE, 20); + final List> defaultEvents = createEvents(DEFAULT_VALUE, 20); + + final List> allEvents = new ArrayList<>(alphaEvents); + allEvents.addAll(betaEvents); + allEvents.addAll(gammaEvents); + allEvents.addAll(defaultEvents); + Collections.shuffle(allEvents); + + inMemorySourceAccessor.submit(TESTING_KEY, allEvents); + + await().atMost(2, TimeUnit.SECONDS) + .untilAsserted(() -> { + assertThat(inMemorySinkAccessor.get(ALPHA_SOURCE_KEY), not(empty())); + assertThat(inMemorySinkAccessor.get(BETA_SOURCE_KEY), not(empty())); + assertThat(inMemorySinkAccessor.get(ALL_SOURCE_KEY), not(empty())); + assertThat(inMemorySinkAccessor.get(ALPHA_DEFAULT_SOURCE_KEY), not(empty())); + assertThat(inMemorySinkAccessor.get(ALPHA_BETA_GAMMA_SOURCE_KEY), not(empty())); + assertThat(inMemorySinkAccessor.get(DEFAULT_SOURCE_KEY), not(empty())); + }); + + final List> actualAlphaRecords = inMemorySinkAccessor.get(ALPHA_SOURCE_KEY); + + assertThat(actualAlphaRecords.size(), equalTo(alphaEvents.size())); + + assertThat(actualAlphaRecords, containsInAnyOrder(allEvents.stream() + .filter(alphaEvents::contains).toArray())); + + final List> actualBetaRecords = inMemorySinkAccessor.get(BETA_SOURCE_KEY); + + assertThat(actualBetaRecords.size(), equalTo(betaEvents.size())); + + assertThat(actualBetaRecords, containsInAnyOrder(allEvents.stream() + .filter(betaEvents::contains).toArray())); + + final List> actualDefaultRecords = inMemorySinkAccessor.get(DEFAULT_SOURCE_KEY); + + assertThat(actualDefaultRecords.size(), equalTo(defaultEvents.size())); + assertThat(actualDefaultRecords, containsInAnyOrder(allEvents.stream() + .filter(defaultEvents::contains).toArray())); + + final List> actualAlphaDefaultRecords = new ArrayList<>(); + actualAlphaDefaultRecords.addAll(actualAlphaRecords); + actualAlphaDefaultRecords.addAll(actualDefaultRecords); + assertThat(actualAlphaDefaultRecords.size(), equalTo(defaultEvents.size()+alphaEvents.size())); + assertThat(actualAlphaDefaultRecords, containsInAnyOrder(allEvents.stream() + .filter(event -> defaultEvents.contains(event) || alphaEvents.contains(event)).toArray())); + + } + + private List> createEvents(final String value, final int numberToCreate) { + return IntStream.range(0, numberToCreate) + .mapToObj(i -> Map.of(KNOWN_CONDITIONAL_KEY, value, "arbitrary_field", UUID.randomUUID().toString())) + .map(map -> JacksonEvent.builder().withData(map).withEventType("TEST").build()) + .map(jacksonEvent -> (Event) jacksonEvent) + .map(Record::new) + .collect(Collectors.toList()); + } +} + diff --git a/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/route/three-route-with-default-route.yaml b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/route/three-route-with-default-route.yaml new file mode 100644 index 0000000000..6d608a0d0b --- /dev/null +++ b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/route/three-route-with-default-route.yaml @@ -0,0 +1,41 @@ +routing-pipeline: + workers: 4 + delay: 10 + source: + in_memory: + testing_key: ConditionalRoutingIT + buffer: + bounded_blocking: + # Use a small batch size to help ensure that multiple threads + # are picking up the different routes. + batch_size: 10 + route: + - alpha: '/value == "a"' + - beta: '/value == "b"' + - gamma: '/value == "g"' + sink: + - in_memory: + testing_key: ConditionalRoutingIT_alpha + routes: + - alpha + - in_memory: + testing_key: ConditionalRoutingIT_beta + routes: + - beta + - in_memory: + testing_key: ConditionalRoutingIT_alpha_default + routes: + - alpha + - _default + - in_memory: + testing_key: ConditionalRoutingIT_alpha_beta_gamma + routes: + - alpha + - beta + - gamma + - in_memory: + testing_key: ConditionalRoutingIT_default + routes: + - _default + - in_memory: + testing_key: ConditionalRoutingIT_all diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/DataFlowComponentRouter.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/DataFlowComponentRouter.java index 1e8850219d..4e5c89cc29 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/DataFlowComponentRouter.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/DataFlowComponentRouter.java @@ -20,6 +20,7 @@ * intended to help break apart {@link Router} for better testing. */ class DataFlowComponentRouter { + static final String DEFAULT_ROUTE = "_default"; void route(final Collection allRecords, final DataFlowComponent dataFlowComponent, final Map> recordsToRoutes, @@ -37,7 +38,9 @@ void route(final Collection allRecords, final Set routesForEvent = recordsToRoutes .getOrDefault(record, Collections.emptySet()); - if (routesForEvent.stream().anyMatch(dataFlowComponentRoutes::contains)) { + if (routesForEvent.size() == 0 && dataFlowComponentRoutes.contains(DEFAULT_ROUTE)) { + recordsForComponent.add(getRecordStrategy.getRecord(record)); + } else if (routesForEvent.stream().anyMatch(dataFlowComponentRoutes::contains)) { recordsForComponent.add(getRecordStrategy.getRecord(record)); } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/DataFlowComponentRouterTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/DataFlowComponentRouterTest.java index 3802356592..1ea74afe70 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/DataFlowComponentRouterTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/DataFlowComponentRouterTest.java @@ -158,6 +158,17 @@ void route_no_Events_when_none_have_matching_routes() { verify(componentRecordsConsumer).accept(testComponent, Collections.emptyList()); } + @Test + void route_no_Events_when_none_have_matching_routes_with_default_route() { + when(dataFlowComponent.getRoutes()).thenReturn(Set.of(DataFlowComponentRouter.DEFAULT_ROUTE)); + final Map> noMatchingRoutes = recordsIn.stream() + .collect(Collectors.toMap(Function.identity(), r -> Collections.emptySet())); + + createObjectUnderTest().route(recordsIn, dataFlowComponent, noMatchingRoutes, getRecordStrategy, componentRecordsConsumer); + + verify(componentRecordsConsumer).accept(testComponent, recordsIn); + } + @Test void route_all_Events_when_all_have_matched_route() { @@ -236,6 +247,33 @@ void route_no_Events_when_none_have_matching_routes() { verify(componentRecordsConsumer).accept(testComponent, Collections.emptyList()); } + @Test + void route_no_Events_when_none_have_matching_routes_with_default_route() { + when(dataFlowComponent.getRoutes()).thenReturn(Set.of(DataFlowComponentRouter.DEFAULT_ROUTE)); + final Map> noMatchingRoutes = recordsIn.stream() + .collect(Collectors.toMap(Function.identity(), r -> Collections.emptySet())); + + createObjectUnderTest().route(recordsIn, dataFlowComponent, noMatchingRoutes, getRecordStrategy, componentRecordsConsumer); + + verify(componentRecordsConsumer).accept(testComponent, recordsIn); + } + + @Test + void route_matched_events_with_none_to_default_route() { + DataFlowComponent dataFlowComponent2 = mock(DataFlowComponent.class); + when(dataFlowComponent2.getRoutes()).thenReturn(Set.of(DataFlowComponentRouter.DEFAULT_ROUTE)); + final Map> allMatchingRoutes = recordsIn.stream() + .collect(Collectors.toMap(Function.identity(), r -> Collections.singleton(knownRoute))); + + createObjectUnderTest().route(recordsIn, dataFlowComponent2, allMatchingRoutes, getRecordStrategy, componentRecordsConsumer); + verify(componentRecordsConsumer).accept(null, Collections.emptyList()); + createObjectUnderTest().route(recordsIn, dataFlowComponent, allMatchingRoutes, getRecordStrategy, componentRecordsConsumer); + + verify(componentRecordsConsumer).accept(testComponent, recordsIn); + + } + + @Test void route_all_Events_when_all_have_matched_route() {