From c560e1f85a431d8c4fc7dddf425504f385861b5d Mon Sep 17 00:00:00 2001 From: Aiden Dai <68811299+daixba@users.noreply.github.com> Date: Wed, 1 Nov 2023 03:48:32 +0800 Subject: [PATCH] Add bug fixes and improvements to DDB source (#3559) Signed-off-by: Aiden Dai --- .../dynamodb-source/build.gradle | 1 + .../dynamodb/export/DataFileLoader.java | 60 ++++++--- .../export/DataFileLoaderFactory.java | 13 +- .../dynamodb/export/DataFileScheduler.java | 15 ++- .../source/dynamodb/model/ExportSummary.java | 4 +- .../source/dynamodb/model/LoadStatus.java | 20 +-- .../source/dynamodb/stream/ShardConsumer.java | 54 ++++++-- .../dynamodb/stream/ShardConsumerFactory.java | 12 +- .../dynamodb/stream/StreamCheckpointer.java | 5 +- .../dynamodb/export/DataFileLoaderTest.java | 117 ++++++++++++------ .../export/ManifestFileReaderTest.java | 2 +- .../dynamodb/stream/ShardConsumerTest.java | 72 ++++++++--- .../dynamodb/stream/StreamSchedulerTest.java | 5 +- 13 files changed, 255 insertions(+), 125 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/build.gradle b/data-prepper-plugins/dynamodb-source/build.gradle index d3a57de0e6..1fc14af6ea 100644 --- a/data-prepper-plugins/dynamodb-source/build.gradle +++ b/data-prepper-plugins/dynamodb-source/build.gradle @@ -31,6 +31,7 @@ dependencies { testImplementation platform('org.junit:junit-bom:5.9.1') testImplementation 'org.junit.jupiter:junit-jupiter' + testImplementation testLibs.mockito.inline testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoader.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoader.java index 6d1d0aedd9..fcede4dba9 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoader.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoader.java @@ -5,12 +5,21 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.export; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter; +import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.zip.GZIPInputStream; @@ -37,30 +46,37 @@ public class DataFileLoader implements Runnable { */ private static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 2 * 60_000; + static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60); + static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000; + private final String bucketName; private final String key; private final ExportRecordConverter recordConverter; - private final S3ObjectReader s3ObjectReader; + private final S3ObjectReader objectReader; private final DataFileCheckpointer checkpointer; - // Start Line is the checkpoint + /** + * Start Line is the checkpoint + */ private final int startLine; private DataFileLoader(Builder builder) { - this.s3ObjectReader = builder.s3ObjectReader; - this.recordConverter = builder.recordConverter; + this.objectReader = builder.objectReader; this.bucketName = builder.bucketName; this.key = builder.key; this.checkpointer = builder.checkpointer; this.startLine = builder.startLine; + + final BufferAccumulator> bufferAccumulator = BufferAccumulator.create(builder.buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); + recordConverter = new ExportRecordConverter(bufferAccumulator, builder.tableInfo, builder.pluginMetrics); } - public static Builder builder() { - return new Builder(); + public static Builder builder(final S3ObjectReader s3ObjectReader, final PluginMetrics pluginMetrics, final Buffer> buffer) { + return new Builder(s3ObjectReader, pluginMetrics, buffer); } @@ -69,9 +85,14 @@ public static Builder builder() { */ static class Builder { - private S3ObjectReader s3ObjectReader; + private final S3ObjectReader objectReader; + + private final PluginMetrics pluginMetrics; + + private final Buffer> buffer; + + private TableInfo tableInfo; - private ExportRecordConverter recordConverter; private DataFileCheckpointer checkpointer; @@ -81,13 +102,14 @@ static class Builder { private int startLine; - public Builder s3ObjectReader(S3ObjectReader s3ObjectReader) { - this.s3ObjectReader = s3ObjectReader; - return this; + public Builder(final S3ObjectReader objectReader, final PluginMetrics pluginMetrics, final Buffer> buffer) { + this.objectReader = objectReader; + this.pluginMetrics = pluginMetrics; + this.buffer = buffer; } - public Builder recordConverter(ExportRecordConverter recordConverter) { - this.recordConverter = recordConverter; + public Builder tableInfo(TableInfo tableInfo) { + this.tableInfo = tableInfo; return this; } @@ -128,7 +150,9 @@ public void run() { int lineCount = 0; int lastLineProcessed = 0; - try (GZIPInputStream gzipInputStream = new GZIPInputStream(s3ObjectReader.readFile(bucketName, key))) { + try { + InputStream inputStream = objectReader.readFile(bucketName, key); + GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream); BufferedReader reader = new BufferedReader(new InputStreamReader(gzipInputStream)); String line; @@ -170,11 +194,15 @@ public void run() { } lines.clear(); + reader.close(); + gzipInputStream.close(); + inputStream.close(); LOG.info("Complete loading s3://{}/{}", bucketName, key); - } catch (Exception e) { + } catch (IOException e) { checkpointer.checkpoint(lineCount); - String errorMessage = String.format("Loading of s3://{}/{} completed with Exception: {}", bucketName, key, e.getMessage()); + + String errorMessage = String.format("Loading of s3://%s/%s completed with Exception: %S", bucketName, key, e.getMessage()); throw new RuntimeException(errorMessage); } } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactory.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactory.java index 9e2378e3df..c8e5318f19 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactory.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactory.java @@ -5,25 +5,19 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.export; -import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; -import org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import software.amazon.awssdk.services.s3.S3Client; -import java.time.Duration; - /** * Factory class for DataFileLoader thread. */ public class DataFileLoaderFactory { - static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60); - static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000; private final EnhancedSourceCoordinator coordinator; @@ -39,17 +33,14 @@ public DataFileLoaderFactory(EnhancedSourceCoordinator coordinator, S3Client s3C } public Runnable createDataFileLoader(DataFilePartition dataFilePartition, TableInfo tableInfo) { - final BufferAccumulator> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); - ExportRecordConverter recordProcessor = new ExportRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); DataFileCheckpointer checkpointer = new DataFileCheckpointer(coordinator, dataFilePartition); // Start a data loader thread. - DataFileLoader loader = DataFileLoader.builder() - .s3ObjectReader(objectReader) + DataFileLoader loader = DataFileLoader.builder(objectReader, pluginMetrics, buffer) .bucketName(dataFilePartition.getBucket()) .key(dataFilePartition.getKey()) - .recordConverter(recordProcessor) + .tableInfo(tableInfo) .checkpointer(checkpointer) .startLine(dataFilePartition.getProgressState().get().getLoaded()) .build(); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java index 91432f3dad..79b99bf817 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java @@ -63,8 +63,6 @@ public DataFileScheduler(EnhancedSourceCoordinator coordinator, DataFileLoaderFa this.coordinator = coordinator; this.pluginMetrics = pluginMetrics; this.loaderFactory = loaderFactory; - - executor = Executors.newFixedThreadPool(MAX_JOB_COUNT); this.exportFileSuccessCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_PROCESSED_COUNT); @@ -76,7 +74,7 @@ private void processDataFilePartition(DataFilePartition dataFilePartition) { String tableArn = getTableArn(exportArn); TableInfo tableInfo = getTableInfo(tableArn); - + Runnable loader = loaderFactory.createDataFileLoader(dataFilePartition, tableInfo); CompletableFuture runLoader = CompletableFuture.runAsync(loader, executor); runLoader.whenComplete(completeDataLoader(dataFilePartition)); @@ -166,6 +164,17 @@ private BiConsumer completeDataLoader(DataFilePartition dataFilePartition) { }; } + /** + * There is a global state with sourcePartitionKey the export Arn, + * to track the number of files are processed.
+ * Each time, load of a data file is completed, + * The state must be updated.
+ * Note that the state may be updated since multiple threads are updating the same state. + * Retry is required. + * + * @param exportArn Export Arn. + * @param loaded Number records Loaded. + */ private void updateState(String exportArn, int loaded) { String streamArn = getStreamArn(exportArn); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/ExportSummary.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/ExportSummary.java index 881f59f605..578c5c7625 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/ExportSummary.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/ExportSummary.java @@ -57,7 +57,7 @@ public class ExportSummary { private long billedSizeBytes; @JsonProperty("itemCount") - private int itemCount; + private long itemCount; @JsonProperty("outputFormat") private String outputFormat; @@ -115,7 +115,7 @@ public long getBilledSizeBytes() { return billedSizeBytes; } - public int getItemCount() { + public long getItemCount() { return itemCount; } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/LoadStatus.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/LoadStatus.java index fd84c87e98..6493c70b51 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/LoadStatus.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/LoadStatus.java @@ -18,11 +18,11 @@ public class LoadStatus { private int loadedFiles; - private int totalRecords; + private long totalRecords; - private int loadedRecords; + private long loadedRecords; - public LoadStatus(int totalFiles, int loadedFiles, int totalRecords, int loadedRecords) { + public LoadStatus(int totalFiles, int loadedFiles, long totalRecords, long loadedRecords) { this.totalFiles = totalFiles; this.loadedFiles = loadedFiles; this.totalRecords = totalRecords; @@ -45,7 +45,7 @@ public void setLoadedFiles(int loadedFiles) { this.loadedFiles = loadedFiles; } - public int getTotalRecords() { + public long getTotalRecords() { return totalRecords; } @@ -53,11 +53,11 @@ public void setTotalRecords(int totalRecords) { this.totalRecords = totalRecords; } - public int getLoadedRecords() { + public long getLoadedRecords() { return loadedRecords; } - public void setLoadedRecords(int loadedRecords) { + public void setLoadedRecords(long loadedRecords) { this.loadedRecords = loadedRecords; } @@ -72,10 +72,10 @@ public Map toMap() { public static LoadStatus fromMap(Map map) { return new LoadStatus( - (int) map.get(TOTAL_FILES), - (int) map.get(LOADED_FILES), - (int) map.get(TOTAL_RECORDS), - (int) map.get(LOADED_RECORDS) + ((Number) map.get(TOTAL_FILES)).intValue(), + ((Number) map.get(LOADED_FILES)).intValue(), + ((Number) map.get(TOTAL_RECORDS)).longValue(), + ((Number) map.get(LOADED_RECORDS)).longValue() ); } } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java index ef47eb39ef..f80b145b8f 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java @@ -6,15 +6,21 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.stream; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter; +import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest; import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; -import software.amazon.awssdk.services.dynamodb.model.Record; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; +import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.stream.Collectors; @@ -46,11 +52,21 @@ public class ShardConsumer implements Runnable { */ private static final int DEFAULT_WAIT_FOR_EXPORT_INTERVAL_MILLS = 60_000; + + /** + * Default number of times in the wait for export to do regular checkpoint. + */ + private static final int DEFAULT_WAIT_COUNT_TO_CHECKPOINT = 5; + /** * Default regular checkpoint interval */ private static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 2 * 60_000; + static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60); + static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000; + + private final DynamoDbStreamsClient dynamoDbStreamsClient; private final StreamRecordConverter recordConverter; @@ -65,15 +81,17 @@ public class ShardConsumer implements Runnable { private ShardConsumer(Builder builder) { this.dynamoDbStreamsClient = builder.dynamoDbStreamsClient; - this.recordConverter = builder.recordConverter; this.checkpointer = builder.checkpointer; this.shardIterator = builder.shardIterator; this.startTime = builder.startTime; this.waitForExport = builder.waitForExport; + + final BufferAccumulator> bufferAccumulator = BufferAccumulator.create(builder.buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); + recordConverter = new StreamRecordConverter(bufferAccumulator, builder.tableInfo, builder.pluginMetrics); } - public static Builder builder(DynamoDbStreamsClient streamsClient) { - return new Builder(streamsClient); + public static Builder builder(final DynamoDbStreamsClient dynamoDbStreamsClient, final PluginMetrics pluginMetrics, final Buffer> buffer) { + return new Builder(dynamoDbStreamsClient, pluginMetrics, buffer); } @@ -81,8 +99,11 @@ static class Builder { private final DynamoDbStreamsClient dynamoDbStreamsClient; + private final PluginMetrics pluginMetrics; + + private final Buffer> buffer; - private StreamRecordConverter recordConverter; + private TableInfo tableInfo; private StreamCheckpointer checkpointer; @@ -93,12 +114,14 @@ static class Builder { private boolean waitForExport; - public Builder(DynamoDbStreamsClient dynamoDbStreamsClient) { + public Builder(final DynamoDbStreamsClient dynamoDbStreamsClient, final PluginMetrics pluginMetrics, final Buffer> buffer) { this.dynamoDbStreamsClient = dynamoDbStreamsClient; + this.pluginMetrics = pluginMetrics; + this.buffer = buffer; } - public Builder recordConverter(StreamRecordConverter recordConverter) { - this.recordConverter = recordConverter; + public Builder tableInfo(TableInfo tableInfo) { + this.tableInfo = tableInfo; return this; } @@ -157,7 +180,7 @@ public void run() { .build(); - List records; + List records; GetRecordsResponse response; try { response = dynamoDbStreamsClient.getRecords(req); @@ -205,14 +228,27 @@ public void run() { checkpointer.checkpoint(sequenceNumber); throw new RuntimeException("Shard Consumer is interrupted"); } + + if (waitForExport) { + waitForExport(); + } } private void waitForExport() { LOG.debug("Start waiting for export to be done and loaded"); + int numberOfWaits = 0; while (!checkpointer.isExportDone()) { LOG.debug("Export is in progress, wait..."); try { Thread.sleep(DEFAULT_WAIT_FOR_EXPORT_INTERVAL_MILLS); + // The wait for export may take a long time + // Need to extend the timeout of the ownership in the coordination store. + // Otherwise, the lease will expire. + numberOfWaits++; + if (numberOfWaits % DEFAULT_WAIT_COUNT_TO_CHECKPOINT == 0) { + // To extend the timeout of lease + checkpointer.checkpoint(null); + } } catch (InterruptedException e) { LOG.error("Wait for export is interrupted ({})", e.getMessage()); // Directly quit the process diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java index ba311456b1..df605941e1 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java @@ -5,13 +5,11 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.stream; -import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; -import org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; @@ -21,7 +19,6 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; -import java.time.Duration; import java.time.Instant; import java.util.Optional; @@ -33,8 +30,6 @@ public class ShardConsumerFactory { private static final int STREAM_TO_TABLE_OFFSET = "stream/".length(); - static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60); - static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000; private final DynamoDbStreamsClient streamsClient; @@ -83,12 +78,9 @@ public Runnable createConsumer(StreamPartition streamPartition) { StreamCheckpointer checkpointer = new StreamCheckpointer(enhancedSourceCoordinator, streamPartition); String tableArn = getTableArn(streamPartition.getStreamArn()); TableInfo tableInfo = getTableInfo(tableArn); - final BufferAccumulator> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); - StreamRecordConverter recordConverter = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); - - ShardConsumer shardConsumer = ShardConsumer.builder(streamsClient) - .recordConverter(recordConverter) + ShardConsumer shardConsumer = ShardConsumer.builder(streamsClient, pluginMetrics, buffer) + .tableInfo(tableInfo) .checkpointer(checkpointer) .shardIterator(shardIter) .startTime(startTime) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamCheckpointer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamCheckpointer.java index ccbfd268f3..ae807c2ca1 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamCheckpointer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamCheckpointer.java @@ -32,15 +32,12 @@ public StreamCheckpointer(EnhancedSourceCoordinator coordinator, StreamPartition private void setSequenceNumber(String sequenceNumber) { // Must only update progress if sequence number is not empty - // A blank sequence number means the current sequence number in the progress state has not changed + // A blank sequence number means the current sequence number in the progress state has not changed, do nothing if (sequenceNumber != null && !sequenceNumber.isEmpty()) { Optional progressState = streamPartition.getProgressState(); if (progressState.isPresent()) { progressState.get().setSequenceNumber(sequenceNumber); - } else { - } - } } diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java index 3f3d15d8cd..443e68f2fb 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java @@ -5,17 +5,24 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.export; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; -import org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.DataFileProgressState; +import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; +import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.http.AbortableInputStream; import software.amazon.awssdk.services.s3.S3Client; @@ -26,26 +33,24 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.List; import java.util.Optional; import java.util.Random; -import java.util.StringJoiner; import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.zip.GZIPOutputStream; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.dynamodb.export.DataFileLoader.BUFFER_TIMEOUT; +import static org.opensearch.dataprepper.plugins.source.dynamodb.export.DataFileLoader.DEFAULT_BUFFER_BATCH_SIZE; @ExtendWith(MockitoExtension.class) -@Disabled class DataFileLoaderTest { @Mock @@ -54,19 +59,35 @@ class DataFileLoaderTest { @Mock private S3Client s3Client; + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private Buffer> buffer; + + @Mock + private BufferAccumulator> bufferAccumulator; - private S3ObjectReader s3ObjectReader; @Mock - private ExportRecordConverter recordConverter; + private Counter testCounter; + + + private S3ObjectReader objectReader; private DataFileCheckpointer checkpointer; private DataFilePartition dataFilePartition; + private TableInfo tableInfo; + + private final String tableName = UUID.randomUUID().toString(); private final String tableArn = "arn:aws:dynamodb:us-west-2:123456789012:table/" + tableName; + private final String partitionKeyAttrName = "PK"; + private final String sortKeyAttrName = "SK"; + private final String manifestKey = UUID.randomUUID().toString(); private final String bucketName = UUID.randomUUID().toString(); private final String prefix = UUID.randomUUID().toString(); @@ -78,7 +99,7 @@ class DataFileLoaderTest { private final int total = random.nextInt(10); @BeforeEach - void setup() throws IOException { + void setup() throws Exception { DataFileProgressState state = new DataFileProgressState(); state.setLoaded(0); @@ -86,34 +107,50 @@ void setup() throws IOException { dataFilePartition = new DataFilePartition(exportArn, bucketName, manifestKey, Optional.of(state)); + TableMetadata metadata = TableMetadata.builder() + .exportRequired(true) + .streamRequired(true) + .partitionKeyAttributeName(partitionKeyAttrName) + .sortKeyAttributeName(sortKeyAttrName) + .build(); + tableInfo = new TableInfo(tableArn, metadata); + when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(generateGzipInputStream(total)); - s3ObjectReader = new S3ObjectReader(s3Client); + objectReader = new S3ObjectReader(s3Client); lenient().when(coordinator.createPartition(any(EnhancedSourcePartition.class))).thenReturn(true); lenient().doNothing().when(coordinator).completePartition(any(EnhancedSourcePartition.class)); lenient().doNothing().when(coordinator).saveProgressStateForPartition(any(EnhancedSourcePartition.class)); lenient().doNothing().when(coordinator).giveUpPartition(any(EnhancedSourcePartition.class)); - lenient().doNothing().when(recordConverter).writeToBuffer(any(List.class)); - + doNothing().when(bufferAccumulator).add(any(Record.class)); + doNothing().when(bufferAccumulator).flush(); checkpointer = new DataFileCheckpointer(coordinator, dataFilePartition); + given(pluginMetrics.counter(anyString())).willReturn(testCounter); + } - private ResponseInputStream generateGzipInputStream(int numberOfRecords) throws IOException { + private ResponseInputStream generateGzipInputStream(int numberOfRecords) { - StringJoiner stringJoiner = new StringJoiner("\\n"); + StringBuilder sb = new StringBuilder(); for (int i = 0; i < numberOfRecords; i++) { - stringJoiner.add(UUID.randomUUID().toString()); + final String pk = UUID.randomUUID().toString(); + final String sk = UUID.randomUUID().toString(); + String line = " $ion_1_0 {Item:{PK:\"" + pk + "\",SK:\"" + sk + "\"}}"; + sb.append(line + "\n"); } - final String data = stringJoiner.toString(); + final String data = sb.toString(); final byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8); final ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); - final GZIPOutputStream gzipOut = new GZIPOutputStream(byteOut); - gzipOut.write(dataBytes, 0, dataBytes.length); - gzipOut.close(); + try (final GZIPOutputStream gzipOut = new GZIPOutputStream(byteOut)) { + gzipOut.write(dataBytes, 0, dataBytes.length); + } catch (IOException e) { + e.printStackTrace(); + } + final byte[] bites = byteOut.toByteArray(); final ByteArrayInputStream byteInStream = new ByteArrayInputStream(bites); @@ -126,28 +163,28 @@ private ResponseInputStream generateGzipInputStream(int numbe } @Test - void test_run_loadFile_correctly() throws InterruptedException { - - DataFileLoader loader = DataFileLoader.builder() - .bucketName(bucketName) - .key(manifestKey) - .s3ObjectReader(s3ObjectReader) - .recordConverter(recordConverter) - .checkpointer(checkpointer) - .build(); + void test_run_loadFile_correctly() throws Exception { + DataFileLoader loader; + try ( + final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class) + ) { + bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); + loader = DataFileLoader.builder(objectReader, pluginMetrics, buffer) + .bucketName(bucketName) + .key(manifestKey) + .checkpointer(checkpointer) + .tableInfo(tableInfo) + .build(); + } - ExecutorService executorService = Executors.newSingleThreadExecutor(); - final Future future = executorService.submit(loader); - Thread.sleep(100); - executorService.shutdown(); - future.cancel(true); - assertThat(executorService.awaitTermination(1000, TimeUnit.MILLISECONDS), equalTo(true)); + loader.run(); // Should call s3 getObject verify(s3Client).getObject(any(GetObjectRequest.class)); // Should write to buffer - verify(recordConverter).writeToBuffer(any(List.class)); + verify(bufferAccumulator, times(total)).add(any(Record.class)); + verify(bufferAccumulator).flush(); // Should do one last checkpoint when done. verify(coordinator).saveProgressStateForPartition(any(DataFilePartition.class)); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ManifestFileReaderTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ManifestFileReaderTest.java index bfd3f2369d..4b93e2ef73 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ManifestFileReaderTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ManifestFileReaderTest.java @@ -50,7 +50,7 @@ void parseSummaryFile() { final String manifestFilesS3Key = UUID.randomUUID().toString(); final String outputFormat = "DYNAMODB_JSON"; long billedSizeBytes = random.nextLong(); - int itemCount = random.nextInt(10000); + long itemCount = random.nextLong(); String summaryData = String.format("{\"version\":\"%s\",\"exportArn\": \"%s\",\"startTime\":\"%s\",\"endTime\":\"%s\",\"tableArn\":\"%s\",\"tableId\":\"%s\",\"exportTime\":\"%s\",\"s3Bucket\":\"%s\",\"s3Prefix\":\"%s\",\"s3SseAlgorithm\":\"%s\",\"s3SseKmsKeyId\":null,\"manifestFilesS3Key\":\"%s\",\"billedSizeBytes\":%d,\"itemCount\":%d,\"outputFormat\":\"%s\"}", diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java index 87bc68ce7d..417787becc 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java @@ -5,17 +5,23 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.stream; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; -import org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; +import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest; @@ -30,12 +36,20 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; import java.util.UUID; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumer.BUFFER_TIMEOUT; +import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumer.DEFAULT_BUFFER_BATCH_SIZE; @ExtendWith(MockitoExtension.class) class ShardConsumerTest { @@ -46,15 +60,27 @@ class ShardConsumerTest { private DynamoDbStreamsClient dynamoDbStreamsClient; @Mock - private StreamRecordConverter recordConverter; + private PluginMetrics pluginMetrics; + + @Mock + private Buffer> buffer; + + @Mock + private BufferAccumulator> bufferAccumulator; @Mock private GlobalState tableInfoGlobalState; + @Mock + private Counter testCounter; + + private StreamCheckpointer checkpointer; private StreamPartition streamPartition; + private TableInfo tableInfo; + private final String tableName = UUID.randomUUID().toString(); private final String tableArn = "arn:aws:dynamodb:us-west-2:123456789012:table/" + tableName; @@ -62,16 +88,19 @@ class ShardConsumerTest { private final String partitionKeyAttrName = "PK"; private final String sortKeyAttrName = "SK"; - private final String exportArn = tableArn + "/export/01693291918297-bfeccbea"; private final String streamArn = tableArn + "/stream/2023-09-14T05:46:45.367"; private final String shardId = "shardId-" + UUID.randomUUID(); private final String shardIterator = UUID.randomUUID().toString(); + private final Random random = new Random(); + + private final int total = random.nextInt(10); + @BeforeEach - void setup() { + void setup() throws Exception { StreamProgressState state = new StreamProgressState(); state.setWaitForExport(false); @@ -87,6 +116,7 @@ void setup() { .partitionKeyAttributeName(partitionKeyAttrName) .sortKeyAttributeName(sortKeyAttrName) .build(); + tableInfo = new TableInfo(tableArn, metadata); lenient().when(tableInfoGlobalState.getProgressState()).thenReturn(Optional.of(metadata.toMap())); lenient().when(coordinator.createPartition(any(EnhancedSourcePartition.class))).thenReturn(true); @@ -94,34 +124,46 @@ void setup() { lenient().doNothing().when(coordinator).saveProgressStateForPartition(any(EnhancedSourcePartition.class)); lenient().doNothing().when(coordinator).giveUpPartition(any(EnhancedSourcePartition.class)); + doNothing().when(bufferAccumulator).add(any(org.opensearch.dataprepper.model.record.Record.class)); + doNothing().when(bufferAccumulator).flush(); + checkpointer = new StreamCheckpointer(coordinator, streamPartition); - List records = buildRecords(10); + List records = buildRecords(total); GetRecordsResponse response = GetRecordsResponse.builder() .records(records) .nextShardIterator(null) .build(); when(dynamoDbStreamsClient.getRecords(any(GetRecordsRequest.class))).thenReturn(response); + + given(pluginMetrics.counter(anyString())).willReturn(testCounter); } @Test - void test_run_shardConsumer_correctly() { - - ShardConsumer shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient) - .shardIterator(shardIterator) - .checkpointer(checkpointer) - .recordConverter(recordConverter) - .startTime(null) - .waitForExport(false) - .build(); + void test_run_shardConsumer_correctly() throws Exception { + ShardConsumer shardConsumer; + try ( + final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class) + ) { + bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); + shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, buffer) + .shardIterator(shardIterator) + .checkpointer(checkpointer) + .tableInfo(tableInfo) + .startTime(null) + .waitForExport(false) + .build(); + } + shardConsumer.run(); // Should call GetRecords verify(dynamoDbStreamsClient).getRecords(any(GetRecordsRequest.class)); // Should write to buffer - verify(recordConverter).writeToBuffer(any(List.class)); + verify(bufferAccumulator, times(total)).add(any(org.opensearch.dataprepper.model.record.Record.class)); + verify(bufferAccumulator).flush(); // Should complete the consumer as reach to end of shard verify(coordinator).saveProgressStateForPartition(any(StreamPartition.class)); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java index 586a3c188a..a551a6052f 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java @@ -6,7 +6,6 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.stream; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -103,7 +102,6 @@ void setup() { @Test - @Disabled public void test_normal_run() throws InterruptedException { given(coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).willReturn(Optional.of(streamPartition)).willReturn(Optional.empty()); @@ -112,10 +110,9 @@ public void test_normal_run() throws InterruptedException { ExecutorService executorService = Executors.newSingleThreadExecutor(); final Future future = executorService.submit(() -> scheduler.run()); - Thread.sleep(100); + Thread.sleep(2000); executorService.shutdown(); future.cancel(true); - assertThat(executorService.awaitTermination(1000, TimeUnit.MILLISECONDS), equalTo(true)); // Should acquire the stream partition verify(coordinator).acquireAvailablePartition(StreamPartition.PARTITION_TYPE);