Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement opensearch index partition creation supplier and PitWorker without processing indices #2821

Merged
merged 2 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,55 +8,96 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.OpenSearchIndexPartitionCreationSupplier;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.ScrollWorker;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.SearchWorker;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.ClusterClientFactory;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class OpenSearchService {

private static final Logger LOG = LoggerFactory.getLogger(OpenSearchService.class);

static final Duration EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT = Duration.ofSeconds(30);

private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private final Buffer<Record<Event>> buffer;
private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier;
private final ScheduledExecutorService scheduledExecutorService;

private Thread searchWorkerThread;
private SearchWorker searchWorker;
private ScheduledFuture<?> searchWorkerFuture;

public static OpenSearchService createOpenSearchService(final SearchAccessor searchAccessor,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final Buffer<Record<Event>> buffer) {
return new OpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer);
return new OpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, Executors.newSingleThreadScheduledExecutor());
}

private OpenSearchService(final SearchAccessor searchAccessor,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final Buffer<Record<Event>> buffer) {
final Buffer<Record<Event>> buffer,
final ScheduledExecutorService scheduledExecutorService) {
this.searchAccessor = searchAccessor;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.buffer = buffer;
this.sourceCoordinator = sourceCoordinator;
this.sourceCoordinator.initialize();
this.openSearchIndexPartitionCreationSupplier = new OpenSearchIndexPartitionCreationSupplier(openSearchSourceConfiguration, (ClusterClientFactory) searchAccessor);
this.scheduledExecutorService = scheduledExecutorService;
}

public void start() {
switch(searchAccessor.getSearchContextType()) {
case POINT_IN_TIME:
searchWorkerThread = new Thread(new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer));
searchWorker = new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer, openSearchIndexPartitionCreationSupplier);
break;
case SCROLL:
searchWorkerThread = new Thread(new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer));
searchWorker = new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer, openSearchIndexPartitionCreationSupplier);
break;
default:
throw new IllegalArgumentException(
String.format("Search context type must be POINT_IN_TIME or SCROLL, type %s was given instead",
searchAccessor.getSearchContextType()));
}

searchWorkerThread.start();
final Instant startTime = openSearchSourceConfiguration.getSchedulingParameterConfiguration().getStartTime();
final long waitTimeBeforeStartMillis = startTime.toEpochMilli() - Instant.now().toEpochMilli() < 0 ? 0L :
startTime.toEpochMilli() - Instant.now().toEpochMilli();

LOG.info("The opensearch source will start processing data at {}. It is currently {}", startTime, Instant.now());

searchWorkerFuture = scheduledExecutorService.schedule(() -> searchWorker.run(), waitTimeBeforeStartMillis, TimeUnit.MILLISECONDS);
}

public void stop() {
scheduledExecutorService.shutdown();
try {
searchWorkerFuture.cancel(true);
if (scheduledExecutorService.awaitTermination(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT.getSeconds(), TimeUnit.SECONDS)) {
LOG.info("Successfully waited for the search worker to terminate");
} else {
LOG.warn("Search worker did not terminate in time, forcing termination");
scheduledExecutorService.shutdownNow();
}
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for the search worker to terminate", e);
scheduledExecutorService.shutdownNow();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessorStrategy;

@DataPrepperPlugin(name="opensearch", pluginType = Source.class , pluginConfigurationType = OpenSearchSourceConfiguration.class )
@DataPrepperPlugin(name="opensearch", pluginType = Source.class, pluginConfigurationType = OpenSearchSourceConfiguration.class)
public class OpenSearchSource implements Source<Record<Event>>, UsesSourceCoordination {

private final AwsCredentialsSupplier awsCredentialsSupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class OpenSearchSourceConfiguration {

@JsonProperty("scheduling")
@Valid
private SchedulingParameterConfiguration schedulingParameterConfiguration;
private SchedulingParameterConfiguration schedulingParameterConfiguration = new SchedulingParameterConfiguration();

@JsonProperty("search_options")
@Valid
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.opensearch.worker;

import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch.cat.IndicesResponse;
import org.opensearch.client.opensearch.cat.indices.IndicesRecord;
import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.IndexParametersConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.OpenSearchIndex;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.ClusterClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.stream.Collectors;

public class OpenSearchIndexPartitionCreationSupplier implements Function<Map<String, Object>, List<PartitionIdentifier>> {

private static final Logger LOG = LoggerFactory.getLogger(OpenSearchIndexPartitionCreationSupplier.class);

private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final IndexParametersConfiguration indexParametersConfiguration;
private final OpenSearchClient openSearchClient;

public OpenSearchIndexPartitionCreationSupplier(final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final ClusterClientFactory clusterClientFactory) {
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.indexParametersConfiguration = openSearchSourceConfiguration.getIndexParametersConfiguration();

final Object client = clusterClientFactory.getClient();

if (client instanceof OpenSearchClient) {
this.openSearchClient = (OpenSearchClient) client;
} else {
throw new IllegalArgumentException(String.format("ClusterClientFactory provided an invalid client object to the index partition creation supplier. " +
"The client must be of type OpenSearchClient. The client passed is of class %s", client.getClass()));
}

}

@Override
public List<PartitionIdentifier> apply(final Map<String, Object> globalStateMap) {

if (Objects.nonNull(openSearchClient)) {
return applyForOpenSearchClient(globalStateMap);
}

return Collections.emptyList();
}

private List<PartitionIdentifier> applyForOpenSearchClient(final Map<String, Object> globalStateMap) {
IndicesResponse indicesResponse;
try {
indicesResponse = openSearchClient.cat().indices();
} catch (IOException | OpenSearchException e) {
LOG.error("There was an exception when calling /_cat/indices to create new index partitions", e);
return Collections.emptyList();
}

return indicesResponse.valueBody().stream()
.filter(this::shouldIndexBeProcessed)
.map(indexRecord -> PartitionIdentifier.builder().withPartitionKey(indexRecord.index()).build())
.collect(Collectors.toList());
}

private boolean shouldIndexBeProcessed(final IndicesRecord indicesRecord) {
if (Objects.isNull(indicesRecord.index())) {
return false;
}

if (Objects.isNull(indexParametersConfiguration)) {
return true;
}

final List<OpenSearchIndex> includedIndices = indexParametersConfiguration.getIncludedIndices();
final List<OpenSearchIndex> excludedIndices = indexParametersConfiguration.getExcludedIndices();

final boolean matchesIncludedPattern = includedIndices.isEmpty() || doesIndexMatchPattern(includedIndices, indicesRecord);
final boolean matchesExcludePattern = doesIndexMatchPattern(excludedIndices, indicesRecord);


return matchesIncludedPattern && !matchesExcludePattern;
}

private boolean doesIndexMatchPattern(final List<OpenSearchIndex> indices, final IndicesRecord indicesRecord) {
for (final OpenSearchIndex index : indices) {
final Matcher matcher = index.getIndexNamePattern().matcher(indicesRecord.index());

if (matcher.matches()) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,63 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.SourcePartition;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;

/**
* PitWorker polls the source cluster via Point-In-Time contexts.
*/
public class PitWorker implements SearchWorker, Runnable {

private static final Logger LOG = LoggerFactory.getLogger(PitWorker.class);

private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private final Buffer<Record<Event>> buffer;
private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier;

private static final int STANDARD_BACKOFF_MILLIS = 30_000;

public PitWorker(final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final Buffer<Record<Event>> buffer) {
final Buffer<Record<Event>> buffer,
final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier) {
this.searchAccessor = searchAccessor;
this.sourceCoordinator = sourceCoordinator;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.buffer = buffer;
this.openSearchIndexPartitionCreationSupplier = openSearchIndexPartitionCreationSupplier;
}

@Override
public void run() {
// todo: implement
while (!Thread.currentThread().isInterrupted()) {
final Optional<SourcePartition<OpenSearchIndexProgressState>> indexPartition = sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier);

if (indexPartition.isEmpty()) {
try {
Thread.sleep(STANDARD_BACKOFF_MILLIS);
continue;
} catch (InterruptedException e) {
LOG.info("The PitWorker was interrupted while sleeping after acquiring no indices to process, stopping processing");
return;
}
}

// todo: process the index and write to the buffer

sourceCoordinator.closePartition(
indexPartition.get().getPartitionKey(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@ public class ScrollWorker implements SearchWorker {
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private final Buffer<Record<Event>> buffer;
private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier;

public ScrollWorker(final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final Buffer<Record<Event>> buffer) {
final Buffer<Record<Event>> buffer,
final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier) {
this.searchAccessor = searchAccessor;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.sourceCoordinator = sourceCoordinator;
this.buffer = buffer;
this.openSearchIndexPartitionCreationSupplier = openSearchIndexPartitionCreationSupplier;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.opensearch.worker.client;

public interface ClusterClientFactory {
Object getClient();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse;

public class ElasticsearchAccessor implements SearchAccessor {
public class ElasticsearchAccessor implements SearchAccessor, ClusterClientFactory {
@Override
public SearchContextType getSearchContextType() {
// todo: implement
Expand Down Expand Up @@ -56,4 +56,9 @@ public SearchScrollResponse searchWithScroll(SearchScrollRequest searchScrollReq
public void deleteScroll(DeleteScrollRequest deleteScrollRequest) {
//todo: implement
}

@Override
public Object getClient() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we implement ClusterClientFactory here if we are returning null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ElasticSearch accessor is not implemented yet

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse;

public class OpenSearchAccessor implements SearchAccessor {
public class OpenSearchAccessor implements SearchAccessor, ClusterClientFactory {

private final OpenSearchClient openSearchClient;
private final SearchContextType searchContextType;
Expand All @@ -27,7 +27,6 @@ public OpenSearchAccessor(final OpenSearchClient openSearchClient, final SearchC
this.searchContextType = searchContextType;
}


@Override
public SearchContextType getSearchContextType() {
return searchContextType;
Expand Down Expand Up @@ -66,4 +65,9 @@ public SearchScrollResponse searchWithScroll(SearchScrollRequest searchScrollReq
public void deleteScroll(DeleteScrollRequest deleteScrollRequest) {
//todo: implement
}

@Override
public Object getClient() {
return openSearchClient;
}
}
Loading