From c4a3f73016a4e6ac64752ad2a2eaf203d9e29b97 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Wed, 14 Feb 2024 13:31:29 -0600 Subject: [PATCH] Fix bug where s3 scan could skip when lastModifiedTimestamps are the same (#4124) (#4127) Signed-off-by: Taylor Gray --- .../dynamodb-source/README.md | 84 +------------------ .../s3/S3ScanPartitionCreationSupplier.java | 2 +- .../S3ScanPartitionCreationSupplierTest.java | 4 +- 3 files changed, 6 insertions(+), 84 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/README.md b/data-prepper-plugins/dynamodb-source/README.md index db620a6406..7b3369fc5e 100644 --- a/data-prepper-plugins/dynamodb-source/README.md +++ b/data-prepper-plugins/dynamodb-source/README.md @@ -1,85 +1,5 @@ # DynamoDB Source -This is a source plugin that supports retrieve data from DynamoDB tables. Basic use case of this source plugin is to -sync the data from DynamoDB tables to OpenSearch indexes. With this CDC support, customer can run the end to end data -sync pipeline and capture changed data in near real-time without writing any codes and without any downtime of business. -Such pipeline can run on multiple nodes in parallel to support data capture of large scale tables. +This source ingests data to Data Prepper from DynamoDB -This plugin can support below three different modes: - -1. Full load only: One time full data export and load -2. CDC Only: DynamoDB Stream -3. Full load + CDC: One time full export and load + DynamoDB Stream. - -## Usages - -To get started with this DynamoDB source, create the following source configuration: - -```yaml -source: - dynamodb: - tables: - - table_arn: "arn:aws:dynamodb:us-west-2:123456789012:table/my-table" - stream: - start_position: - export: - s3_bucket: "my-bucket" - s3_prefix: "export/" - aws: - region: "us-west-2" - sts_role_arn: "arn:aws:iam::123456789012:role/DataPrepperRole" - - coordinator: - dynamodb: - table_name: "coordinator-demo" - region: "us-west-2" - - -``` - -## Configurations - -### Shared Configurations: - -* coordinator (Required): Coordination store setting. This design create a custom coordinator based on existing - coordination store implementation. Only DynamoDB is tested so far. -* aws (Required): High level AWS Auth. Note Data Prepper will use the same AWS auth to access all tables, check - Security for more details. - * region - * sts_role_arn - -### Export Configurations: - -* s3_bucket (Required): The destination bucket to store the exported data files -* s3_prefix (Optional): Custom prefix. -* s3_sse_kms_key_id (Optional): A AWS KMS Customer Managed Key (CMK) to encrypt the export data files. The key id will - be the ARN of the Key, e.g. arn:aws:kms:us-west-2:123456789012:key/0a4bc22f-bb96-4ad4-80ca-63b12b3ec147 - -### Stream Configurations - -* start_position (Optional): start position of the stream, can be either TRIM_HORIZON or LATEST. If export is required, - this value will be ignored and set to LATEST by default. This is useful if customer don’t want to run initial export, - so they can - choose either from the beginning of the stream (up to 24 hours) or from latest (from the time point when pipeline is - started) - -## Metrics - -### Counter - -- `exportJobsSuccess`: measures total number of export jobs run with status completed. -- `exportJobsErrors`: measures total number of export jobs cannot be submitted or run with status failed. -- `exportFilesTotal`: measures total number of export files generated. -- `exportFilesSuccess`: measures total number of export files read (till the last line) successfully. -- `exportRecordsTotal`: measures total number of export records generated -- `exportRecordsSuccess`: measures total number of export records processed successfully . -- `exportRecordsErrors`: measures total number of export records processed failed -- `changeEventsSucceeded`: measures total number of changed events in total processed successfully -- `changeEventsFailed`: measures total number of changed events in total processed failed - -## Developer Guide - -This plugin is compatible with Java 17. See - -- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md) -- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md) +See the [`dynamodb` source documentation](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/dynamo-db/) diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java index 04da7cdaa1..367505a936 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java @@ -165,7 +165,7 @@ private boolean isLastModifiedTimeAfterMostRecentScanForBucket(final String buck final Instant lastProcessedObjectTimestamp = Instant.parse((String) globalStateMap.get(bucketName)); - return s3Object.lastModified().compareTo(lastProcessedObjectTimestamp) > 0; + return s3Object.lastModified().compareTo(lastProcessedObjectTimestamp.minusSeconds(1)) >= 0; } private Instant getMostRecentLastModifiedTimestamp(final ListObjectsV2Response listObjectsV2Response, diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java index 77a110528c..a0ece6d988 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java @@ -204,7 +204,7 @@ void getNextPartition_supplier_with_scheduling_options_returns_expected_Partitio final S3Object invalidForFirstBucketSuffixObject = mock(S3Object.class); given(invalidForFirstBucketSuffixObject.key()).willReturn("test.invalid"); - given(invalidForFirstBucketSuffixObject.lastModified()).willReturn(Instant.now()); + given(invalidForFirstBucketSuffixObject.lastModified()).willReturn(Instant.now().minusSeconds(2)); s3ObjectsList.add(invalidForFirstBucketSuffixObject); expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + invalidForFirstBucketSuffixObject.key()).build()); @@ -223,7 +223,9 @@ void getNextPartition_supplier_with_scheduling_options_returns_expected_Partitio final List expectedPartitionIdentifiersSecondScan = new ArrayList<>(); expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + secondScanObject.key()).build()); + expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build()); expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + secondScanObject.key()).build()); + expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build()); final List secondScanObjects = new ArrayList<>(s3ObjectsList); secondScanObjects.add(secondScanObject);