diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchService.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchService.java index a78ba0c22e..3af54291c7 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchService.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchService.java @@ -8,44 +8,66 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.OpenSearchIndexPartitionCreationSupplier; import org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker; import org.opensearch.dataprepper.plugins.source.opensearch.worker.ScrollWorker; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.SearchWorker; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.ClusterClientFactory; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; public class OpenSearchService { + private static final Logger LOG = LoggerFactory.getLogger(OpenSearchService.class); + + static final Duration EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT = Duration.ofSeconds(30); + private final SearchAccessor searchAccessor; private final OpenSearchSourceConfiguration openSearchSourceConfiguration; private final SourceCoordinator sourceCoordinator; private final Buffer> buffer; + private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier; + private final ScheduledExecutorService scheduledExecutorService; - private Thread searchWorkerThread; + private SearchWorker searchWorker; + private ScheduledFuture searchWorkerFuture; public static OpenSearchService createOpenSearchService(final SearchAccessor searchAccessor, final SourceCoordinator sourceCoordinator, final OpenSearchSourceConfiguration openSearchSourceConfiguration, final Buffer> buffer) { - return new OpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer); + return new OpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, Executors.newSingleThreadScheduledExecutor()); } private OpenSearchService(final SearchAccessor searchAccessor, final SourceCoordinator sourceCoordinator, final OpenSearchSourceConfiguration openSearchSourceConfiguration, - final Buffer> buffer) { + final Buffer> buffer, + final ScheduledExecutorService scheduledExecutorService) { this.searchAccessor = searchAccessor; this.openSearchSourceConfiguration = openSearchSourceConfiguration; this.buffer = buffer; this.sourceCoordinator = sourceCoordinator; this.sourceCoordinator.initialize(); + this.openSearchIndexPartitionCreationSupplier = new OpenSearchIndexPartitionCreationSupplier(openSearchSourceConfiguration, (ClusterClientFactory) searchAccessor); + this.scheduledExecutorService = scheduledExecutorService; } public void start() { switch(searchAccessor.getSearchContextType()) { case POINT_IN_TIME: - searchWorkerThread = new Thread(new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer)); + searchWorker = new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer, openSearchIndexPartitionCreationSupplier); break; case SCROLL: - searchWorkerThread = new Thread(new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer)); + searchWorker = new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer, openSearchIndexPartitionCreationSupplier); break; default: throw new IllegalArgumentException( @@ -53,10 +75,29 @@ public void start() { searchAccessor.getSearchContextType())); } - searchWorkerThread.start(); + final Instant startTime = openSearchSourceConfiguration.getSchedulingParameterConfiguration().getStartTime(); + final long waitTimeBeforeStartMillis = startTime.toEpochMilli() - Instant.now().toEpochMilli() < 0 ? 0L : + startTime.toEpochMilli() - Instant.now().toEpochMilli(); + + LOG.info("The opensearch source will start processing data at {}. It is currently {}", startTime, Instant.now()); + + searchWorkerFuture = scheduledExecutorService.schedule(() -> searchWorker.run(), waitTimeBeforeStartMillis, TimeUnit.MILLISECONDS); } public void stop() { + scheduledExecutorService.shutdown(); + try { + searchWorkerFuture.cancel(true); + if (scheduledExecutorService.awaitTermination(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT.getSeconds(), TimeUnit.SECONDS)) { + LOG.info("Successfully waited for the search worker to terminate"); + } else { + LOG.warn("Search worker did not terminate in time, forcing termination"); + scheduledExecutorService.shutdownNow(); + } + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for the search worker to terminate", e); + scheduledExecutorService.shutdownNow(); + } } } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java index 141079631f..779b8a3cbd 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java @@ -16,7 +16,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessorStrategy; -@DataPrepperPlugin(name="opensearch", pluginType = Source.class , pluginConfigurationType = OpenSearchSourceConfiguration.class ) +@DataPrepperPlugin(name="opensearch", pluginType = Source.class, pluginConfigurationType = OpenSearchSourceConfiguration.class) public class OpenSearchSource implements Source>, UsesSourceCoordination { private final AwsCredentialsSupplier awsCredentialsSupplier; diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfiguration.java index 69edc9fb60..e8776ff9c0 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfiguration.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfiguration.java @@ -51,7 +51,7 @@ public class OpenSearchSourceConfiguration { @JsonProperty("scheduling") @Valid - private SchedulingParameterConfiguration schedulingParameterConfiguration; + private SchedulingParameterConfiguration schedulingParameterConfiguration = new SchedulingParameterConfiguration(); @JsonProperty("search_options") @Valid diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/OpenSearchIndexPartitionCreationSupplier.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/OpenSearchIndexPartitionCreationSupplier.java new file mode 100644 index 0000000000..e4ca82a3bc --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/OpenSearchIndexPartitionCreationSupplier.java @@ -0,0 +1,107 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.worker; + +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.OpenSearchException; +import org.opensearch.client.opensearch.cat.IndicesResponse; +import org.opensearch.client.opensearch.cat.indices.IndicesRecord; +import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; +import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.IndexParametersConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.OpenSearchIndex; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.ClusterClientFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.stream.Collectors; + +public class OpenSearchIndexPartitionCreationSupplier implements Function, List> { + + private static final Logger LOG = LoggerFactory.getLogger(OpenSearchIndexPartitionCreationSupplier.class); + + private final OpenSearchSourceConfiguration openSearchSourceConfiguration; + private final IndexParametersConfiguration indexParametersConfiguration; + private final OpenSearchClient openSearchClient; + + public OpenSearchIndexPartitionCreationSupplier(final OpenSearchSourceConfiguration openSearchSourceConfiguration, + final ClusterClientFactory clusterClientFactory) { + this.openSearchSourceConfiguration = openSearchSourceConfiguration; + this.indexParametersConfiguration = openSearchSourceConfiguration.getIndexParametersConfiguration(); + + final Object client = clusterClientFactory.getClient(); + + if (client instanceof OpenSearchClient) { + this.openSearchClient = (OpenSearchClient) client; + } else { + throw new IllegalArgumentException(String.format("ClusterClientFactory provided an invalid client object to the index partition creation supplier. " + + "The client must be of type OpenSearchClient. The client passed is of class %s", client.getClass())); + } + + } + + @Override + public List apply(final Map globalStateMap) { + + if (Objects.nonNull(openSearchClient)) { + return applyForOpenSearchClient(globalStateMap); + } + + return Collections.emptyList(); + } + + private List applyForOpenSearchClient(final Map globalStateMap) { + IndicesResponse indicesResponse; + try { + indicesResponse = openSearchClient.cat().indices(); + } catch (IOException | OpenSearchException e) { + LOG.error("There was an exception when calling /_cat/indices to create new index partitions", e); + return Collections.emptyList(); + } + + return indicesResponse.valueBody().stream() + .filter(this::shouldIndexBeProcessed) + .map(indexRecord -> PartitionIdentifier.builder().withPartitionKey(indexRecord.index()).build()) + .collect(Collectors.toList()); + } + + private boolean shouldIndexBeProcessed(final IndicesRecord indicesRecord) { + if (Objects.isNull(indicesRecord.index())) { + return false; + } + + if (Objects.isNull(indexParametersConfiguration)) { + return true; + } + + final List includedIndices = indexParametersConfiguration.getIncludedIndices(); + final List excludedIndices = indexParametersConfiguration.getExcludedIndices(); + + final boolean matchesIncludedPattern = includedIndices.isEmpty() || doesIndexMatchPattern(includedIndices, indicesRecord); + final boolean matchesExcludePattern = doesIndexMatchPattern(excludedIndices, indicesRecord); + + + return matchesIncludedPattern && !matchesExcludePattern; + } + + private boolean doesIndexMatchPattern(final List indices, final IndicesRecord indicesRecord) { + for (final OpenSearchIndex index : indices) { + final Matcher matcher = index.getIndexNamePattern().matcher(indicesRecord.index()); + + if (matcher.matches()) { + return true; + } + } + return false; + } +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java index 78b9d0328f..0161185661 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java @@ -8,32 +8,63 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartition; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; /** * PitWorker polls the source cluster via Point-In-Time contexts. */ public class PitWorker implements SearchWorker, Runnable { + private static final Logger LOG = LoggerFactory.getLogger(PitWorker.class); + private final SearchAccessor searchAccessor; private final OpenSearchSourceConfiguration openSearchSourceConfiguration; private final SourceCoordinator sourceCoordinator; private final Buffer> buffer; + private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier; + + private static final int STANDARD_BACKOFF_MILLIS = 30_000; public PitWorker(final SearchAccessor searchAccessor, final OpenSearchSourceConfiguration openSearchSourceConfiguration, final SourceCoordinator sourceCoordinator, - final Buffer> buffer) { + final Buffer> buffer, + final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier) { this.searchAccessor = searchAccessor; this.sourceCoordinator = sourceCoordinator; this.openSearchSourceConfiguration = openSearchSourceConfiguration; this.buffer = buffer; + this.openSearchIndexPartitionCreationSupplier = openSearchIndexPartitionCreationSupplier; } @Override public void run() { - // todo: implement + while (!Thread.currentThread().isInterrupted()) { + final Optional> indexPartition = sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier); + + if (indexPartition.isEmpty()) { + try { + Thread.sleep(STANDARD_BACKOFF_MILLIS); + continue; + } catch (InterruptedException e) { + LOG.info("The PitWorker was interrupted while sleeping after acquiring no indices to process, stopping processing"); + return; + } + } + + // todo: process the index and write to the buffer + + sourceCoordinator.closePartition( + indexPartition.get().getPartitionKey(), + openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(), + openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount()); + } } } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java index c6e0d049ae..ef1557c589 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java @@ -21,15 +21,18 @@ public class ScrollWorker implements SearchWorker { private final OpenSearchSourceConfiguration openSearchSourceConfiguration; private final SourceCoordinator sourceCoordinator; private final Buffer> buffer; + private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier; public ScrollWorker(final SearchAccessor searchAccessor, final OpenSearchSourceConfiguration openSearchSourceConfiguration, final SourceCoordinator sourceCoordinator, - final Buffer> buffer) { + final Buffer> buffer, + final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier) { this.searchAccessor = searchAccessor; this.openSearchSourceConfiguration = openSearchSourceConfiguration; this.sourceCoordinator = sourceCoordinator; this.buffer = buffer; + this.openSearchIndexPartitionCreationSupplier = openSearchIndexPartitionCreationSupplier; } @Override diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ClusterClientFactory.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ClusterClientFactory.java new file mode 100644 index 0000000000..fd7701a007 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ClusterClientFactory.java @@ -0,0 +1,10 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.worker.client; + +public interface ClusterClientFactory { + Object getClient(); +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java index 0c8ae1ea3a..dbc83b1127 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java @@ -16,7 +16,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse; -public class ElasticsearchAccessor implements SearchAccessor { +public class ElasticsearchAccessor implements SearchAccessor, ClusterClientFactory { @Override public SearchContextType getSearchContextType() { // todo: implement @@ -56,4 +56,9 @@ public SearchScrollResponse searchWithScroll(SearchScrollRequest searchScrollReq public void deleteScroll(DeleteScrollRequest deleteScrollRequest) { //todo: implement } + + @Override + public Object getClient() { + return null; + } } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java index 1da995bff6..bb149725ca 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java @@ -17,7 +17,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse; -public class OpenSearchAccessor implements SearchAccessor { +public class OpenSearchAccessor implements SearchAccessor, ClusterClientFactory { private final OpenSearchClient openSearchClient; private final SearchContextType searchContextType; @@ -27,7 +27,6 @@ public OpenSearchAccessor(final OpenSearchClient openSearchClient, final SearchC this.searchContextType = searchContextType; } - @Override public SearchContextType getSearchContextType() { return searchContextType; @@ -66,4 +65,9 @@ public SearchScrollResponse searchWithScroll(SearchScrollRequest searchScrollReq public void deleteScroll(DeleteScrollRequest deleteScrollRequest) { //todo: implement } + + @Override + public Object getClient() { + return openSearchClient; + } } diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchServiceTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchServiceTest.java index 7e013dc277..40fffc0436 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchServiceTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchServiceTest.java @@ -7,15 +7,39 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; -import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SchedulingParameterConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.OpenSearchIndexPartitionCreationSupplier; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.ScrollWorker; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.SearchWorker; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchAccessor; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) public class OpenSearchServiceTest { @@ -24,7 +48,7 @@ public class OpenSearchServiceTest { private OpenSearchSourceConfiguration openSearchSourceConfiguration; @Mock - private SearchAccessor searchAccessor; + private OpenSearchAccessor openSearchAccessor; @Mock private Buffer> buffer; @@ -32,8 +56,23 @@ public class OpenSearchServiceTest { @Mock private SourceCoordinator sourceCoordinator; + @Mock + private ScheduledExecutorService scheduledExecutorService; + + @Mock + private OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier; + + @Mock + private SearchWorker searchWorker; + private OpenSearchService createObjectUnderTest() { - return OpenSearchService.createOpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer); + try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class); + final MockedConstruction mockedConstruction = mockConstruction(OpenSearchIndexPartitionCreationSupplier.class, (mock, context) -> { + openSearchIndexPartitionCreationSupplier = mock; + })) { + executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor).thenReturn(scheduledExecutorService); + return OpenSearchService.createOpenSearchService(openSearchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer); + } } @Test @@ -41,4 +80,46 @@ void source_coordinator_is_initialized_on_construction() { createObjectUnderTest(); verify(sourceCoordinator).initialize(); } + + @Test + void search_context_types_get_submitted_correctly_for_point_in_time_and_executor_service_with_start_time_in_the_future() { + when(openSearchAccessor.getSearchContextType()).thenReturn(SearchContextType.POINT_IN_TIME); + final Instant startTime = Instant.now().plusSeconds(60); + + final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class); + when(schedulingParameterConfiguration.getStartTime()).thenReturn(startTime); + when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); + + try (final MockedConstruction pitWorkerMockedConstruction = mockConstruction(PitWorker.class, (pitWorker, context) -> { + searchWorker = pitWorker; + })) {} + + createObjectUnderTest().start(); + + final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Long.class); + + verify(scheduledExecutorService).schedule(ArgumentMatchers.any(Runnable.class), argumentCaptor.capture(), eq(TimeUnit.MILLISECONDS)); + + final long waitTimeMillis = argumentCaptor.getValue(); + assertThat(waitTimeMillis, lessThanOrEqualTo(Duration.ofSeconds(60).toMillis())); + assertThat(waitTimeMillis, greaterThan(Duration.ofSeconds(30).toMillis())); + } + + @Test + void search_context_types_get_submitted_correctly_for_scroll_and_executor_service_with_start_time_in_the_past() { + when(openSearchAccessor.getSearchContextType()).thenReturn(SearchContextType.SCROLL); + final Instant startTime = Instant.now().minusSeconds(60); + + final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class); + when(schedulingParameterConfiguration.getStartTime()).thenReturn(startTime); + when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); + + try (final MockedConstruction scrollWorkerMockedConstruction = mockConstruction(ScrollWorker.class, (scrollWorker, context) -> { + searchWorker = scrollWorker; + })) {} + + createObjectUnderTest().start(); + + verify(scheduledExecutorService).schedule(ArgumentMatchers.any(Runnable.class), eq(0L), eq(TimeUnit.MILLISECONDS)); + } } diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchIndexPartitionCreationSupplierTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchIndexPartitionCreationSupplierTest.java new file mode 100644 index 0000000000..0a424b0610 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchIndexPartitionCreationSupplierTest.java @@ -0,0 +1,153 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.worker.client; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.OpenSearchException; +import org.opensearch.client.opensearch.cat.IndicesResponse; +import org.opensearch.client.opensearch.cat.OpenSearchCatClient; +import org.opensearch.client.opensearch.cat.indices.IndicesRecord; +import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; +import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.IndexParametersConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.OpenSearchIndex; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.OpenSearchIndexPartitionCreationSupplier; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Stream; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class OpenSearchIndexPartitionCreationSupplierTest { + + @Mock + private OpenSearchSourceConfiguration openSearchSourceConfiguration; + + @Mock + private ClusterClientFactory clusterClientFactory; + + @Mock + private OpenSearchClient openSearchClient; + + private OpenSearchIndexPartitionCreationSupplier createObjectUnderTest() { + return new OpenSearchIndexPartitionCreationSupplier(openSearchSourceConfiguration, clusterClientFactory); + } + + @Test + void clusterClientFactory_that_is_not_OpenSearchClient_throws_IllegalArgumentException() { + when(clusterClientFactory.getClient()).thenReturn(mock(Object.class)); + + assertThrows(IllegalArgumentException.class, this::createObjectUnderTest); + } + + @ParameterizedTest + @MethodSource("opensearchCatIndicesExceptions") + void apply_with_opensearch_client_cat_indices_throws_exception_returns_empty_list(final Class exception) throws IOException { + when(clusterClientFactory.getClient()).thenReturn(openSearchClient); + + final OpenSearchCatClient openSearchCatClient = mock(OpenSearchCatClient.class); + when(openSearchCatClient.indices()).thenThrow(exception); + when(openSearchClient.cat()).thenReturn(openSearchCatClient); + + final List partitionIdentifierList = createObjectUnderTest().apply(Collections.emptyMap()); + + assertThat(partitionIdentifierList, notNullValue()); + assertThat(partitionIdentifierList.isEmpty(), equalTo(true)); + } + + @Test + void apply_with_opensearch_client_with_no_indices_return_empty_list() throws IOException { + when(clusterClientFactory.getClient()).thenReturn(openSearchClient); + + final OpenSearchCatClient openSearchCatClient = mock(OpenSearchCatClient.class); + final IndicesResponse indicesResponse = mock(IndicesResponse.class); + when(indicesResponse.valueBody()).thenReturn(Collections.emptyList()); + when(openSearchCatClient.indices()).thenReturn(indicesResponse); + when(openSearchClient.cat()).thenReturn(openSearchCatClient); + + final List partitionIdentifierList = createObjectUnderTest().apply(Collections.emptyMap()); + + assertThat(partitionIdentifierList, notNullValue()); + assertThat(partitionIdentifierList.isEmpty(), equalTo(true)); + } + + @Test + void apply_with_opensearch_client_with_indices_filters_them_correctly() throws IOException { + when(clusterClientFactory.getClient()).thenReturn(openSearchClient); + + final OpenSearchCatClient openSearchCatClient = mock(OpenSearchCatClient.class); + final IndicesResponse indicesResponse = mock(IndicesResponse.class); + + final IndexParametersConfiguration indexParametersConfiguration = mock(IndexParametersConfiguration.class); + + final List includedIndices = new ArrayList<>(); + final OpenSearchIndex includeIndex = mock(OpenSearchIndex.class); + final String includePattern = "my-pattern-[a-c].*"; + when(includeIndex.getIndexNamePattern()).thenReturn(Pattern.compile(includePattern)); + includedIndices.add(includeIndex); + + final List excludedIndices = new ArrayList<>(); + final OpenSearchIndex excludeIndex = mock(OpenSearchIndex.class); + final String excludePattern = "my-pattern-[a-c]-exclude"; + when(excludeIndex.getIndexNamePattern()).thenReturn(Pattern.compile(excludePattern)); + excludedIndices.add(excludeIndex); + + final OpenSearchIndex secondExcludeIndex = mock(OpenSearchIndex.class); + final String secondExcludePattern = "second-exclude-.*"; + when(secondExcludeIndex.getIndexNamePattern()).thenReturn(Pattern.compile(secondExcludePattern)); + excludedIndices.add(secondExcludeIndex); + + when(indexParametersConfiguration.getIncludedIndices()).thenReturn(includedIndices); + when(indexParametersConfiguration.getExcludedIndices()).thenReturn(excludedIndices); + when(openSearchSourceConfiguration.getIndexParametersConfiguration()).thenReturn(indexParametersConfiguration); + + final List indicesRecords = new ArrayList<>(); + final IndicesRecord includedIndex = mock(IndicesRecord.class); + when(includedIndex.index()).thenReturn("my-pattern-a-include"); + final IndicesRecord excludedIndex = mock(IndicesRecord.class); + when(excludedIndex.index()).thenReturn("second-exclude-test"); + final IndicesRecord includedAndThenExcluded = mock(IndicesRecord.class); + when(includedAndThenExcluded.index()).thenReturn("my-pattern-a-exclude"); + final IndicesRecord neitherIncludedOrExcluded = mock(IndicesRecord.class); + when(neitherIncludedOrExcluded.index()).thenReturn("random-index"); + + indicesRecords.add(includedIndex); + indicesRecords.add(excludedIndex); + indicesRecords.add(includedAndThenExcluded); + indicesRecords.add(neitherIncludedOrExcluded); + + when(indicesResponse.valueBody()).thenReturn(indicesRecords); + + when(openSearchCatClient.indices()).thenReturn(indicesResponse); + when(openSearchClient.cat()).thenReturn(openSearchCatClient); + + final List partitionIdentifierList = createObjectUnderTest().apply(Collections.emptyMap()); + + assertThat(partitionIdentifierList, notNullValue()); + } + + private static Stream opensearchCatIndicesExceptions() { + return Stream.of(Arguments.of(IOException.class), + Arguments.of(OpenSearchException.class)); + } +} diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/PitWorkerTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/PitWorkerTest.java new file mode 100644 index 0000000000..2e2b20ddeb --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/PitWorkerTest.java @@ -0,0 +1,106 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.worker.client; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartition; +import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState; +import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SchedulingParameterConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.OpenSearchIndexPartitionCreationSupplier; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker; + +import java.time.Duration; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class PitWorkerTest { + + @Mock + private OpenSearchSourceConfiguration openSearchSourceConfiguration; + + @Mock + private OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier; + + @Mock + private SourceCoordinator sourceCoordinator; + + @Mock + private SearchAccessor searchAccessor; + + @Mock + private Buffer> buffer; + + private ExecutorService executorService; + + @BeforeEach + void setup() { + executorService = Executors.newSingleThreadExecutor(); + } + + private PitWorker createObjectUnderTest() { + return new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer, openSearchIndexPartitionCreationSupplier); + } + + @Test + void run_with_getNextPartition_returning_empty_will_sleep_and_exit_when_interrupted() throws InterruptedException { + when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.empty()); + + + final Future future = executorService.submit(() -> createObjectUnderTest().run()); + Thread.sleep(100); + executorService.shutdown(); + future.cancel(true); + assertThat(future.isCancelled(), equalTo(true)); + + assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true)); + } + + @Test + void run_with_getNextPartition_with_non_empty_partition_closes_that_partition() throws InterruptedException { + final SourcePartition sourcePartition = mock(SourcePartition.class); + final String partitionKey = UUID.randomUUID().toString(); + when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); + + when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)); + + final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class); + when(schedulingParameterConfiguration.getJobCount()).thenReturn(1); + when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO); + when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); + + doNothing().when(sourceCoordinator).closePartition(partitionKey, + Duration.ZERO, 1); + + + final Future future = executorService.submit(() -> createObjectUnderTest().run()); + Thread.sleep(100); + executorService.shutdown(); + future.cancel(true); + assertThat(future.isCancelled(), equalTo(true)); + + assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true)); + } +}