diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java index 38f5c4cf93..14c0a30e7e 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Collectors; public class DocumentDBService { private static final Logger LOG = LoggerFactory.getLogger(DocumentDBService.class); @@ -62,9 +63,13 @@ public void start(Buffer> buffer) { runnableList.add(exportWorker); } - if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isStream)) { - final S3PartitionCreatorScheduler s3PartitionCreatorScheduler = new S3PartitionCreatorScheduler(sourceCoordinator); + final List collections = sourceConfig.getCollections().stream().map(CollectionConfig::getCollection).collect(Collectors.toList()); + if (!collections.isEmpty()) { + final S3PartitionCreatorScheduler s3PartitionCreatorScheduler = new S3PartitionCreatorScheduler(sourceCoordinator, collections); runnableList.add(s3PartitionCreatorScheduler); + } + + if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isStream)) { final StreamScheduler streamScheduler = new StreamScheduler(sourceCoordinator, buffer, acknowledgementSetManager, sourceConfig, pluginMetrics); runnableList.add(streamScheduler); } diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/DataQueryPartitionCheckpoint.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/DataQueryPartitionCheckpoint.java index 87259b7195..fa810b1a3d 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/DataQueryPartitionCheckpoint.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/DataQueryPartitionCheckpoint.java @@ -31,7 +31,7 @@ public class DataQueryPartitionCheckpoint extends S3FolderPartitionCoordinator { public DataQueryPartitionCheckpoint(EnhancedSourceCoordinator enhancedSourceCoordinator, DataQueryPartition dataQueryPartition) { - super(enhancedSourceCoordinator, dataQueryPartition.getCollection()); + super(enhancedSourceCoordinator); this.enhancedSourceCoordinator = enhancedSourceCoordinator; this.dataQueryPartition = dataQueryPartition; } diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorker.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorker.java index 301fdae6b8..b8f455f048 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorker.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorker.java @@ -101,8 +101,8 @@ public ExportPartitionWorker(final RecordBufferWriter recordBufferWriter, this.bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED); } - private boolean shouldWaitForS3Partition() { - s3PartitionStatus = partitionCheckpoint.getGlobalS3FolderCreationStatus(); + private boolean shouldWaitForS3Partition(final String collection) { + s3PartitionStatus = partitionCheckpoint.getGlobalS3FolderCreationStatus(collection); return s3PartitionStatus.isEmpty(); } @@ -120,7 +120,7 @@ public void run() { throw new RuntimeException("Invalid Collection Name. Must as db.collection format"); } long lastCheckpointTime = System.currentTimeMillis(); - while (shouldWaitForS3Partition() && !Thread.currentThread().isInterrupted()) { + while (shouldWaitForS3Partition(dataQueryPartition.getCollection()) && !Thread.currentThread().isInterrupted()) { LOG.info("S3 partition was not complete for collection {}, waiting for partitions to be created before resuming export.", dataQueryPartition.getCollection()); try { Thread.sleep(DEFAULT_PARTITION_CREATE_WAIT_INTERVAL_MILLIS); diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/s3partition/S3FolderPartitionCoordinator.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/s3partition/S3FolderPartitionCoordinator.java index eb1a66562f..3b51c4fee3 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/s3partition/S3FolderPartitionCoordinator.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/s3partition/S3FolderPartitionCoordinator.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.mongo.coordination.partition.GlobalState; +import org.opensearch.dataprepper.plugins.mongo.model.S3PartitionStatus; import java.util.Optional; @@ -16,19 +17,16 @@ */ public class S3FolderPartitionCoordinator { private final EnhancedSourceCoordinator enhancedSourceCoordinator; - private final String collection; - - public S3FolderPartitionCoordinator(final EnhancedSourceCoordinator enhancedSourceCoordinator, final String collection) { + public S3FolderPartitionCoordinator(final EnhancedSourceCoordinator enhancedSourceCoordinator) { this.enhancedSourceCoordinator = enhancedSourceCoordinator; - this.collection = collection; } - public Optional getGlobalS3FolderCreationStatus() { + public Optional getGlobalS3FolderCreationStatus(final String collection) { final Optional partition = enhancedSourceCoordinator.getPartition(S3PartitionCreatorScheduler.S3_FOLDER_PREFIX + collection); if(partition.isPresent()) { final GlobalState globalState = (GlobalState)partition.get(); - return Optional.of(org.opensearch.dataprepper.plugins.mongo.model.S3PartitionStatus.fromMap(globalState.getProgressState().get())); + return Optional.of(S3PartitionStatus.fromMap(globalState.getProgressState().get())); } else { return Optional.empty(); } diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/s3partition/S3PartitionCreator.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/s3partition/S3PartitionCreator.java index d05ce12962..70df3d7232 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/s3partition/S3PartitionCreator.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/s3partition/S3PartitionCreator.java @@ -2,48 +2,24 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.core.sync.RequestBody; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; import java.util.ArrayList; import java.util.List; public class S3PartitionCreator { private static final Logger LOG = LoggerFactory.getLogger(S3PartitionCreator.class); - final String bucketName; - final String subFolder; - final String region; - final S3Client s3Client; + private final int partitionSize; - S3PartitionCreator(final String bucketName, final String subFolder, final String region) { - this.bucketName = bucketName; - this.subFolder = subFolder; - this.region = region; - this.s3Client = S3Client.builder().region(Region.of(region)).build(); + S3PartitionCreator(final int partitionSize) { + this.partitionSize = partitionSize; } List createPartition() { final List partitions = new ArrayList<>(); - for (int i = 0; i < 256; i++) { - String folderName = String.format("%02x", i) + "/"; - String key = subFolder + "/" + folderName; - createPartition(key); - partitions.add(folderName); + for (int i = 0; i < partitionSize; i++) { + String partitionName = String.format("%02x", i) + "/"; + partitions.add(partitionName); } LOG.info("S3 partition created successfully."); return partitions; } - - private void createPartition(final String key) { - try { - s3Client.putObject(PutObjectRequest.builder() - .bucket(bucketName) - .key(key) - .build(), RequestBody.empty()); - } catch (final Exception e) { - LOG.error("Error creating partition {}", key, e); - throw new RuntimeException(e); - } - } } \ No newline at end of file diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/s3partition/S3PartitionCreatorScheduler.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/s3partition/S3PartitionCreatorScheduler.java index 22b9a57420..fcf0ed26be 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/s3partition/S3PartitionCreatorScheduler.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/s3partition/S3PartitionCreatorScheduler.java @@ -8,16 +8,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; import java.util.Optional; -public class S3PartitionCreatorScheduler implements Runnable { +public class S3PartitionCreatorScheduler extends S3FolderPartitionCoordinator implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(S3PartitionCreatorScheduler.class); public static final String S3_FOLDER_PREFIX = "S3-FOLDER-"; private static final int DEFAULT_TAKE_LEASE_INTERVAL_MILLIS = 60_000; + private static final int DEFAULT_S3_PARTITION_SIZE = 50; private final EnhancedSourceCoordinator sourceCoordinator; - public S3PartitionCreatorScheduler(final EnhancedSourceCoordinator sourceCoordinator) { + private final List collections; + public S3PartitionCreatorScheduler(final EnhancedSourceCoordinator sourceCoordinator, + final List collections) { + super(sourceCoordinator); this.sourceCoordinator = sourceCoordinator; + this.collections = new ArrayList<>(collections); } @Override @@ -27,10 +33,11 @@ public void run() { final Optional sourcePartition = sourceCoordinator.acquireAvailablePartition(S3FolderPartition.PARTITION_TYPE); if (sourcePartition.isPresent()) { final S3FolderPartition s3FolderPartition = (S3FolderPartition) sourcePartition.get(); - final List s3Folders = createS3BucketPartitions(s3FolderPartition); + final List s3Folders = createS3BucketPartitions(); sourceCoordinator.completePartition(s3FolderPartition); final S3PartitionStatus s3PartitionStatus = new S3PartitionStatus(s3Folders); sourceCoordinator.createPartition(new GlobalState(S3_FOLDER_PREFIX + s3FolderPartition.getCollection(), s3PartitionStatus.toMap())); + break; } try { @@ -39,6 +46,19 @@ public void run() { LOG.info("The S3 partition creator scheduler was interrupted while waiting to retry, stopping processing"); break; } + + collections.forEach(collection -> { + final Optional s3PartitionStatus = getGlobalS3FolderCreationStatus(collection); + if (s3PartitionStatus.isPresent()) { + collections.remove(collection); + } + }); + + if (collections.isEmpty()) { + LOG.info("The S3 folder partition global state created for all collections."); + break; + } + } catch (final Exception e) { LOG.error("Received an exception during creation of S3 partition folder, backing off and retrying", e); try { @@ -52,9 +72,8 @@ public void run() { LOG.warn("S3 partition creator scheduler interrupted, looks like shutdown has triggered"); } - private List createS3BucketPartitions(final S3FolderPartition s3FolderPartition) { - final S3PartitionCreator s3PartitionCreator = new S3PartitionCreator(s3FolderPartition.getBucketName(), s3FolderPartition.getSubFolder(), - s3FolderPartition.getRegion()); + private List createS3BucketPartitions() { + final S3PartitionCreator s3PartitionCreator = new S3PartitionCreator(DEFAULT_S3_PARTITION_SIZE); return s3PartitionCreator.createPartition(); } } diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/DataStreamPartitionCheckpoint.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/DataStreamPartitionCheckpoint.java index 6e17c34ddc..5726468ae6 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/DataStreamPartitionCheckpoint.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/DataStreamPartitionCheckpoint.java @@ -38,7 +38,7 @@ public class DataStreamPartitionCheckpoint extends S3FolderPartitionCoordinator public DataStreamPartitionCheckpoint(final EnhancedSourceCoordinator enhancedSourceCoordinator, final StreamPartition streamPartition) { - super(enhancedSourceCoordinator, streamPartition.getCollection()); + super(enhancedSourceCoordinator); this.enhancedSourceCoordinator = enhancedSourceCoordinator; this.streamPartition = streamPartition; } diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java index da25596d97..bd5d494a3f 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java @@ -125,8 +125,8 @@ private boolean shouldWaitForExport(final StreamPartition streamPartition) { return progressState.shouldWaitForExport() && loadStatus.isEmpty(); } - private boolean shouldWaitForS3Partition() { - s3PartitionStatus = partitionCheckpoint.getGlobalS3FolderCreationStatus(); + private boolean shouldWaitForS3Partition(final String collection) { + s3PartitionStatus = partitionCheckpoint.getGlobalS3FolderCreationStatus(collection); return s3PartitionStatus.isEmpty(); } @@ -148,7 +148,7 @@ public void processStream(final StreamPartition streamPartition) { MongoCollection collection = database.getCollection(collectionDBNameList.get(1)); try (MongoCursor> cursor = getChangeStreamCursor(collection, resumeToken.orElse(null))) { - while ((shouldWaitForExport(streamPartition) || shouldWaitForS3Partition()) && !Thread.currentThread().isInterrupted()) { + while ((shouldWaitForExport(streamPartition) || shouldWaitForS3Partition(streamPartition.getCollection())) && !Thread.currentThread().isInterrupted()) { LOG.info("Initial load not complete for collection {}, waiting for initial lo be complete before resuming streams.", collectionDbName); try { Thread.sleep(DEFAULT_EXPORT_COMPLETE_WAIT_INTERVAL_MILLIS); diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorkerTest.java index 16f175705a..630684e7a8 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorkerTest.java @@ -136,11 +136,13 @@ public void testProcessPartitionSuccess(final String partitionKey) { lenient().when(dataQueryPartition.getPartitionKey()).thenReturn(partitionKey); lenient().when(sourceCoordinator.acquireAvailablePartition(DataQueryPartition.PARTITION_TYPE)) .thenReturn(Optional.of(dataQueryPartition)); + final String collection = partitionKey.split("\\|")[0]; + when(dataQueryPartition.getCollection()).thenReturn(collection); S3PartitionStatus s3PartitionStatus = mock(S3PartitionStatus.class); final List partitions = List.of("first", "second"); when(s3PartitionStatus.getPartitions()).thenReturn(partitions); - when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus()).thenReturn(Optional.of(s3PartitionStatus)); + when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus(collection)).thenReturn(Optional.of(s3PartitionStatus)); final Future future = executorService.submit(() -> { try (MockedStatic mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) { diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java index 0a87349830..9be8fda3ca 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java @@ -151,7 +151,7 @@ void test_processStream_success() { S3PartitionStatus s3PartitionStatus = mock(S3PartitionStatus.class); final List partitions = List.of("first", "second"); when(s3PartitionStatus.getPartitions()).thenReturn(partitions); - when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus()).thenReturn(Optional.of(s3PartitionStatus)); + when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus(collection)).thenReturn(Optional.of(s3PartitionStatus)); try (MockedStatic mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) { mongoDBConnectionMockedStatic.when(() -> MongoDBConnection.getMongoClient(any(MongoDBSourceConfig.class))) @@ -160,7 +160,7 @@ void test_processStream_success() { } verify(mongoClient).close(); verify(mongoDatabase).getCollection(eq("collection")); - verify(mockPartitionCheckpoint).getGlobalS3FolderCreationStatus(); + verify(mockPartitionCheckpoint).getGlobalS3FolderCreationStatus(collection); verify(mockRecordConverter).initializePartitions(partitions); verify(mockRecordConverter).convert(eq(doc1Json1), eq(timeSecond1 * 1000L), eq(timeSecond1 * 1000L), eq(operationType1)); verify(mockRecordConverter).convert(eq(doc1Json2), eq(timeSecond2 * 1000L), eq(timeSecond2 * 1000L), eq(operationType2)); @@ -249,7 +249,7 @@ void test_processStream_checkPointIntervalSuccess() { S3PartitionStatus s3PartitionStatus = mock(S3PartitionStatus.class); final List partitions = List.of("first", "second"); when(s3PartitionStatus.getPartitions()).thenReturn(partitions); - when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus()).thenReturn(Optional.of(s3PartitionStatus)); + when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus(collection)).thenReturn(Optional.of(s3PartitionStatus)); try (MockedStatic mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) { mongoDBConnectionMockedStatic.when(() -> MongoDBConnection.getMongoClient(any(MongoDBSourceConfig.class))) @@ -261,7 +261,7 @@ void test_processStream_checkPointIntervalSuccess() { verify(mongoDatabase).getCollection(eq("collection")); verify(cursor).close(); verify(cursor, times(4)).hasNext(); - verify(mockPartitionCheckpoint).getGlobalS3FolderCreationStatus(); + verify(mockPartitionCheckpoint).getGlobalS3FolderCreationStatus(collection); verify(mockPartitionCheckpoint).checkpoint(resumeToken3, 3); verify(successItemsCounter).increment(1); verify(mockPartitionCheckpoint).checkpoint(resumeToken2, 2); @@ -288,7 +288,7 @@ void test_processStream_stopWorker() { when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable); when(changeStreamIterable.iterator()).thenReturn(cursor); S3PartitionStatus s3PartitionStatus = mock(S3PartitionStatus.class); - when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus()).thenReturn(Optional.of(s3PartitionStatus)); + when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus(collection)).thenReturn(Optional.of(s3PartitionStatus)); final List partitions = List.of("first", "second"); when(s3PartitionStatus.getPartitions()).thenReturn(partitions); final ExecutorService executorService = Executors.newSingleThreadExecutor();