Skip to content

Commit

Permalink
ENH: docdb credential auto refreshment (opensearch-project#4399)
Browse files Browse the repository at this point in the history
* ENH: docdb credential refreshment

Signed-off-by: George Chen <[email protected]>
  • Loading branch information
chenqi0805 authored and srikanthjg committed Apr 24, 2024
1 parent ad1b173 commit 9f01409
Show file tree
Hide file tree
Showing 14 changed files with 715 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ public class ExportProgressState {
@JsonProperty("status")
private String status;

@JsonProperty("lastEndDocId")
private Object lastEndDocId;


public String getDatabaseName() {
return databaseName;
Expand Down Expand Up @@ -53,4 +56,12 @@ public String getStatus() {
public void setStatus(String status) {
this.status = status;
}

public Object getLastEndDocId() {
return lastEndDocId;
}

public void setLastEndDocId(Object lastEndDocId) {
this.lastEndDocId = lastEndDocId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.mongo.configuration.CollectionConfig;
import org.opensearch.dataprepper.plugins.mongo.export.MongoDBExportPartitionSupplier;
import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig;
import org.opensearch.dataprepper.plugins.mongo.export.ExportScheduler;
import org.opensearch.dataprepper.plugins.mongo.export.ExportWorker;
import org.opensearch.dataprepper.plugins.mongo.leader.LeaderScheduler;
import org.opensearch.dataprepper.plugins.mongo.s3partition.S3PartitionCreatorScheduler;
import org.opensearch.dataprepper.plugins.mongo.stream.StreamScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,17 +27,18 @@ public class DocumentDBService {
private final PluginMetrics pluginMetrics;
private final MongoDBSourceConfig sourceConfig;
private final AcknowledgementSetManager acknowledgementSetManager;
private ExecutorService executor;
private final MongoDBExportPartitionSupplier mongoDBExportPartitionSupplier;
private final PluginConfigObservable pluginConfigObservable;
private ExecutorService leaderExecutor;
public DocumentDBService(final EnhancedSourceCoordinator sourceCoordinator,
final MongoDBSourceConfig sourceConfig,
final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager) {
final AcknowledgementSetManager acknowledgementSetManager,
final PluginConfigObservable pluginConfigObservable) {
this.sourceCoordinator = sourceCoordinator;
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.sourceConfig = sourceConfig;
this.mongoDBExportPartitionSupplier = new MongoDBExportPartitionSupplier(sourceConfig);
this.pluginConfigObservable = pluginConfigObservable;
}

/**
Expand All @@ -52,40 +50,34 @@ public DocumentDBService(final EnhancedSourceCoordinator sourceCoordinator,
*/
public void start(Buffer<Record<Event>> buffer) {
final List<Runnable> runnableList = new ArrayList<>();

final LeaderScheduler leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig.getCollections());
runnableList.add(leaderScheduler);

if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isExport)) {
final ExportScheduler exportScheduler = new ExportScheduler(sourceCoordinator, mongoDBExportPartitionSupplier, pluginMetrics);
final ExportWorker exportWorker = new ExportWorker(sourceCoordinator, buffer, pluginMetrics, acknowledgementSetManager, sourceConfig);
runnableList.add(exportScheduler);
runnableList.add(exportWorker);
}

final List<String> collections = sourceConfig.getCollections().stream().map(CollectionConfig::getCollection).collect(Collectors.toList());
if (!collections.isEmpty()) {
final S3PartitionCreatorScheduler s3PartitionCreatorScheduler = new S3PartitionCreatorScheduler(sourceCoordinator, collections);
runnableList.add(s3PartitionCreatorScheduler);
}
leaderExecutor = Executors.newFixedThreadPool(runnableList.size(),
BackgroundThreadFactory.defaultExecutorThreadFactory("documentdb-source"));
runnableList.forEach(leaderExecutor::submit);

if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isStream)) {
final StreamScheduler streamScheduler = new StreamScheduler(sourceCoordinator, buffer, acknowledgementSetManager, sourceConfig, pluginMetrics);
runnableList.add(streamScheduler);
}

executor = Executors.newFixedThreadPool(runnableList.size(), BackgroundThreadFactory.defaultExecutorThreadFactory("documentdb-source"));
runnableList.forEach(executor::submit);
final MongoTasksRefresher mongoTasksRefresher = new MongoTasksRefresher(
buffer, sourceCoordinator, pluginMetrics, acknowledgementSetManager,
numThread -> Executors.newFixedThreadPool(
numThread, BackgroundThreadFactory.defaultExecutorThreadFactory("documentdb-source")));
mongoTasksRefresher.initialize(sourceConfig);
pluginConfigObservable.addPluginConfigObserver(
pluginConfig -> mongoTasksRefresher.update((MongoDBSourceConfig) pluginConfig));
}

/**
* Interrupt the running of schedulers.
* Each scheduler must implement logic for gracefully shutdown.
*/
public void shutdown() {
if (executor != null) {
if (leaderExecutor != null) {
LOG.info("shutdown DocumentDB Service scheduler and worker");
executor.shutdownNow();
leaderExecutor.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem;
Expand All @@ -29,17 +30,20 @@ public class DocumentDBSource implements Source<Record<Event>>, UsesEnhancedSour

private final PluginMetrics pluginMetrics;
private final MongoDBSourceConfig sourceConfig;
private final PluginConfigObservable pluginConfigObservable;
private EnhancedSourceCoordinator sourceCoordinator;
private final AcknowledgementSetManager acknowledgementSetManager;
private DocumentDBService documentDBService;

@DataPrepperPluginConstructor
public DocumentDBSource(final PluginMetrics pluginMetrics,
final MongoDBSourceConfig sourceConfig,
final AcknowledgementSetManager acknowledgementSetManager) {
final AcknowledgementSetManager acknowledgementSetManager,
final PluginConfigObservable pluginConfigObservable) {
this.pluginMetrics = pluginMetrics;
this.sourceConfig = sourceConfig;
this.acknowledgementSetManager = acknowledgementSetManager;
this.pluginConfigObservable = pluginConfigObservable;
}

@Override
Expand All @@ -48,7 +52,7 @@ public void start(final Buffer<Record<Event>> buffer) {
sourceCoordinator.createPartition(new LeaderPartition());

documentDBService = new DocumentDBService(sourceCoordinator, sourceConfig, pluginMetrics,
acknowledgementSetManager);
acknowledgementSetManager, pluginConfigObservable);

LOG.info("Start DocumentDB service");
documentDBService.start(buffer);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.mongo.documentdb;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginConfigObserver;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.mongo.configuration.CollectionConfig;
import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig;
import org.opensearch.dataprepper.plugins.mongo.export.ExportScheduler;
import org.opensearch.dataprepper.plugins.mongo.export.ExportWorker;
import org.opensearch.dataprepper.plugins.mongo.export.MongoDBExportPartitionSupplier;
import org.opensearch.dataprepper.plugins.mongo.stream.StreamScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;

public class MongoTasksRefresher implements PluginConfigObserver<MongoDBSourceConfig> {
private static final Logger LOG = LoggerFactory.getLogger(MongoTasksRefresher.class);
static final String CREDENTIALS_CHANGED = "credentialsChanged";
static final String EXECUTOR_REFRESH_ERRORS = "executorRefreshErrors";
private final EnhancedSourceCoordinator sourceCoordinator;
private final PluginMetrics pluginMetrics;
private final AcknowledgementSetManager acknowledgementSetManager;
private final Buffer<Record<Event>> buffer;
private final Function<Integer, ExecutorService> executorServiceFunction;
private final Counter credentialsChangeCounter;
private final Counter executorRefreshErrorsCounter;
private MongoDBExportPartitionSupplier currentMongoDBExportPartitionSupplier;
private MongoDBSourceConfig currentMongoDBSourceConfig;
private ExecutorService currentExecutor;

public MongoTasksRefresher(final Buffer<Record<Event>> buffer,
final EnhancedSourceCoordinator sourceCoordinator,
final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager,
final Function<Integer, ExecutorService> executorServiceFunction) {
this.sourceCoordinator = sourceCoordinator;
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.buffer = buffer;
this.executorServiceFunction = executorServiceFunction;
credentialsChangeCounter = pluginMetrics.counter(CREDENTIALS_CHANGED);
executorRefreshErrorsCounter = pluginMetrics.counter(EXECUTOR_REFRESH_ERRORS);
}

public void initialize(final MongoDBSourceConfig sourceConfig) {
this.currentMongoDBSourceConfig = sourceConfig;
refreshJobs(sourceConfig);
}

@Override
public void update(MongoDBSourceConfig pluginConfig) {
final MongoDBSourceConfig.AuthenticationConfig newAuthConfig = pluginConfig.getCredentialsConfig();
if (basicAuthChanged(newAuthConfig)) {
credentialsChangeCounter.increment();
try {
currentExecutor.shutdownNow();
refreshJobs(pluginConfig);
currentMongoDBSourceConfig = pluginConfig;
} catch (Exception e) {
executorRefreshErrorsCounter.increment();
LOG.error("Refreshing executor failed.", e);
}
}
}

private void refreshJobs(MongoDBSourceConfig pluginConfig) {
final List<Runnable> runnables = new ArrayList<>();
if (pluginConfig.getCollections().stream().anyMatch(CollectionConfig::isExport)) {
currentMongoDBExportPartitionSupplier = new MongoDBExportPartitionSupplier(pluginConfig);
runnables.add(new ExportScheduler(sourceCoordinator, currentMongoDBExportPartitionSupplier, pluginMetrics));
runnables.add(new ExportWorker(
sourceCoordinator, buffer, pluginMetrics, acknowledgementSetManager, pluginConfig));
}
if (pluginConfig.getCollections().stream().anyMatch(CollectionConfig::isStream)) {
runnables.add(new StreamScheduler(
sourceCoordinator, buffer, acknowledgementSetManager, pluginConfig, pluginMetrics));
}
this.currentExecutor = executorServiceFunction.apply(runnables.size());
runnables.forEach(currentExecutor::submit);
}

private boolean basicAuthChanged(final MongoDBSourceConfig.AuthenticationConfig newAuthConfig) {
final MongoDBSourceConfig.AuthenticationConfig currentAuthConfig = currentMongoDBSourceConfig
.getCredentialsConfig();
return !Objects.equals(currentAuthConfig.getUsername(), newAuthConfig.getUsername()) ||
!Objects.equals(currentAuthConfig.getPassword(), newAuthConfig.getPassword());
}
}
Loading

0 comments on commit 9f01409

Please sign in to comment.