Skip to content

Commit

Permalink
Implement opensearch index partition creation supplier and PitWorker …
Browse files Browse the repository at this point in the history
…without processing indices (opensearch-project#2821)

Implement opensearch index partition creation supplier and PitWorker without processing indices

Signed-off-by: Taylor Gray <[email protected]>
Signed-off-by: Marcos_Gonzalez_Mayedo <[email protected]>
  • Loading branch information
graytaylor0 authored and Marcos_Gonzalez_Mayedo committed Jun 21, 2023
1 parent 43e5cd6 commit b741aaa
Show file tree
Hide file tree
Showing 12 changed files with 558 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,55 +8,96 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.OpenSearchIndexPartitionCreationSupplier;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.ScrollWorker;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.SearchWorker;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.ClusterClientFactory;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class OpenSearchService {

private static final Logger LOG = LoggerFactory.getLogger(OpenSearchService.class);

static final Duration EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT = Duration.ofSeconds(30);

private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private final Buffer<Record<Event>> buffer;
private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier;
private final ScheduledExecutorService scheduledExecutorService;

private Thread searchWorkerThread;
private SearchWorker searchWorker;
private ScheduledFuture<?> searchWorkerFuture;

public static OpenSearchService createOpenSearchService(final SearchAccessor searchAccessor,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final Buffer<Record<Event>> buffer) {
return new OpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer);
return new OpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, Executors.newSingleThreadScheduledExecutor());
}

private OpenSearchService(final SearchAccessor searchAccessor,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final Buffer<Record<Event>> buffer) {
final Buffer<Record<Event>> buffer,
final ScheduledExecutorService scheduledExecutorService) {
this.searchAccessor = searchAccessor;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.buffer = buffer;
this.sourceCoordinator = sourceCoordinator;
this.sourceCoordinator.initialize();
this.openSearchIndexPartitionCreationSupplier = new OpenSearchIndexPartitionCreationSupplier(openSearchSourceConfiguration, (ClusterClientFactory) searchAccessor);
this.scheduledExecutorService = scheduledExecutorService;
}

public void start() {
switch(searchAccessor.getSearchContextType()) {
case POINT_IN_TIME:
searchWorkerThread = new Thread(new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer));
searchWorker = new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer, openSearchIndexPartitionCreationSupplier);
break;
case SCROLL:
searchWorkerThread = new Thread(new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer));
searchWorker = new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer, openSearchIndexPartitionCreationSupplier);
break;
default:
throw new IllegalArgumentException(
String.format("Search context type must be POINT_IN_TIME or SCROLL, type %s was given instead",
searchAccessor.getSearchContextType()));
}

searchWorkerThread.start();
final Instant startTime = openSearchSourceConfiguration.getSchedulingParameterConfiguration().getStartTime();
final long waitTimeBeforeStartMillis = startTime.toEpochMilli() - Instant.now().toEpochMilli() < 0 ? 0L :
startTime.toEpochMilli() - Instant.now().toEpochMilli();

LOG.info("The opensearch source will start processing data at {}. It is currently {}", startTime, Instant.now());

searchWorkerFuture = scheduledExecutorService.schedule(() -> searchWorker.run(), waitTimeBeforeStartMillis, TimeUnit.MILLISECONDS);
}

public void stop() {
scheduledExecutorService.shutdown();
try {
searchWorkerFuture.cancel(true);
if (scheduledExecutorService.awaitTermination(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT.getSeconds(), TimeUnit.SECONDS)) {
LOG.info("Successfully waited for the search worker to terminate");
} else {
LOG.warn("Search worker did not terminate in time, forcing termination");
scheduledExecutorService.shutdownNow();
}
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for the search worker to terminate", e);
scheduledExecutorService.shutdownNow();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessorStrategy;

@DataPrepperPlugin(name="opensearch", pluginType = Source.class , pluginConfigurationType = OpenSearchSourceConfiguration.class )
@DataPrepperPlugin(name="opensearch", pluginType = Source.class, pluginConfigurationType = OpenSearchSourceConfiguration.class)
public class OpenSearchSource implements Source<Record<Event>>, UsesSourceCoordination {

private final AwsCredentialsSupplier awsCredentialsSupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class OpenSearchSourceConfiguration {

@JsonProperty("scheduling")
@Valid
private SchedulingParameterConfiguration schedulingParameterConfiguration;
private SchedulingParameterConfiguration schedulingParameterConfiguration = new SchedulingParameterConfiguration();

@JsonProperty("search_options")
@Valid
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.opensearch.worker;

import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch.cat.IndicesResponse;
import org.opensearch.client.opensearch.cat.indices.IndicesRecord;
import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.IndexParametersConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.OpenSearchIndex;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.ClusterClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.stream.Collectors;

public class OpenSearchIndexPartitionCreationSupplier implements Function<Map<String, Object>, List<PartitionIdentifier>> {

private static final Logger LOG = LoggerFactory.getLogger(OpenSearchIndexPartitionCreationSupplier.class);

private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final IndexParametersConfiguration indexParametersConfiguration;
private final OpenSearchClient openSearchClient;

public OpenSearchIndexPartitionCreationSupplier(final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final ClusterClientFactory clusterClientFactory) {
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.indexParametersConfiguration = openSearchSourceConfiguration.getIndexParametersConfiguration();

final Object client = clusterClientFactory.getClient();

if (client instanceof OpenSearchClient) {
this.openSearchClient = (OpenSearchClient) client;
} else {
throw new IllegalArgumentException(String.format("ClusterClientFactory provided an invalid client object to the index partition creation supplier. " +
"The client must be of type OpenSearchClient. The client passed is of class %s", client.getClass()));
}

}

@Override
public List<PartitionIdentifier> apply(final Map<String, Object> globalStateMap) {

if (Objects.nonNull(openSearchClient)) {
return applyForOpenSearchClient(globalStateMap);
}

return Collections.emptyList();
}

private List<PartitionIdentifier> applyForOpenSearchClient(final Map<String, Object> globalStateMap) {
IndicesResponse indicesResponse;
try {
indicesResponse = openSearchClient.cat().indices();
} catch (IOException | OpenSearchException e) {
LOG.error("There was an exception when calling /_cat/indices to create new index partitions", e);
return Collections.emptyList();
}

return indicesResponse.valueBody().stream()
.filter(this::shouldIndexBeProcessed)
.map(indexRecord -> PartitionIdentifier.builder().withPartitionKey(indexRecord.index()).build())
.collect(Collectors.toList());
}

private boolean shouldIndexBeProcessed(final IndicesRecord indicesRecord) {
if (Objects.isNull(indicesRecord.index())) {
return false;
}

if (Objects.isNull(indexParametersConfiguration)) {
return true;
}

final List<OpenSearchIndex> includedIndices = indexParametersConfiguration.getIncludedIndices();
final List<OpenSearchIndex> excludedIndices = indexParametersConfiguration.getExcludedIndices();

final boolean matchesIncludedPattern = includedIndices.isEmpty() || doesIndexMatchPattern(includedIndices, indicesRecord);
final boolean matchesExcludePattern = doesIndexMatchPattern(excludedIndices, indicesRecord);


return matchesIncludedPattern && !matchesExcludePattern;
}

private boolean doesIndexMatchPattern(final List<OpenSearchIndex> indices, final IndicesRecord indicesRecord) {
for (final OpenSearchIndex index : indices) {
final Matcher matcher = index.getIndexNamePattern().matcher(indicesRecord.index());

if (matcher.matches()) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,63 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.SourcePartition;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;

/**
* PitWorker polls the source cluster via Point-In-Time contexts.
*/
public class PitWorker implements SearchWorker, Runnable {

private static final Logger LOG = LoggerFactory.getLogger(PitWorker.class);

private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private final Buffer<Record<Event>> buffer;
private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier;

private static final int STANDARD_BACKOFF_MILLIS = 30_000;

public PitWorker(final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final Buffer<Record<Event>> buffer) {
final Buffer<Record<Event>> buffer,
final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier) {
this.searchAccessor = searchAccessor;
this.sourceCoordinator = sourceCoordinator;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.buffer = buffer;
this.openSearchIndexPartitionCreationSupplier = openSearchIndexPartitionCreationSupplier;
}

@Override
public void run() {
// todo: implement
while (!Thread.currentThread().isInterrupted()) {
final Optional<SourcePartition<OpenSearchIndexProgressState>> indexPartition = sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier);

if (indexPartition.isEmpty()) {
try {
Thread.sleep(STANDARD_BACKOFF_MILLIS);
continue;
} catch (InterruptedException e) {
LOG.info("The PitWorker was interrupted while sleeping after acquiring no indices to process, stopping processing");
return;
}
}

// todo: process the index and write to the buffer

sourceCoordinator.closePartition(
indexPartition.get().getPartitionKey(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@ public class ScrollWorker implements SearchWorker {
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private final Buffer<Record<Event>> buffer;
private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier;

public ScrollWorker(final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final Buffer<Record<Event>> buffer) {
final Buffer<Record<Event>> buffer,
final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier) {
this.searchAccessor = searchAccessor;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.sourceCoordinator = sourceCoordinator;
this.buffer = buffer;
this.openSearchIndexPartitionCreationSupplier = openSearchIndexPartitionCreationSupplier;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.opensearch.worker.client;

public interface ClusterClientFactory {
Object getClient();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse;

public class ElasticsearchAccessor implements SearchAccessor {
public class ElasticsearchAccessor implements SearchAccessor, ClusterClientFactory {
@Override
public SearchContextType getSearchContextType() {
// todo: implement
Expand Down Expand Up @@ -56,4 +56,9 @@ public SearchScrollResponse searchWithScroll(SearchScrollRequest searchScrollReq
public void deleteScroll(DeleteScrollRequest deleteScrollRequest) {
//todo: implement
}

@Override
public Object getClient() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse;

public class OpenSearchAccessor implements SearchAccessor {
public class OpenSearchAccessor implements SearchAccessor, ClusterClientFactory {

private final OpenSearchClient openSearchClient;
private final SearchContextType searchContextType;
Expand All @@ -27,7 +27,6 @@ public OpenSearchAccessor(final OpenSearchClient openSearchClient, final SearchC
this.searchContextType = searchContextType;
}


@Override
public SearchContextType getSearchContextType() {
return searchContextType;
Expand Down Expand Up @@ -66,4 +65,9 @@ public SearchScrollResponse searchWithScroll(SearchScrollRequest searchScrollReq
public void deleteScroll(DeleteScrollRequest deleteScrollRequest) {
//todo: implement
}

@Override
public Object getClient() {
return openSearchClient;
}
}
Loading

0 comments on commit b741aaa

Please sign in to comment.