diff --git a/data-prepper-plugins/dynamodb-source/README.md b/data-prepper-plugins/dynamodb-source/README.md index 49a6371e86..7b3369fc5e 100644 --- a/data-prepper-plugins/dynamodb-source/README.md +++ b/data-prepper-plugins/dynamodb-source/README.md @@ -2,4 +2,4 @@ This source ingests data to Data Prepper from DynamoDB -See the [`dynamodb` source documentation](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/dynamo-db/) \ No newline at end of file +See the [`dynamodb` source documentation](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/dynamo-db/) diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java index 04da7cdaa1..367505a936 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java @@ -165,7 +165,7 @@ private boolean isLastModifiedTimeAfterMostRecentScanForBucket(final String buck final Instant lastProcessedObjectTimestamp = Instant.parse((String) globalStateMap.get(bucketName)); - return s3Object.lastModified().compareTo(lastProcessedObjectTimestamp) > 0; + return s3Object.lastModified().compareTo(lastProcessedObjectTimestamp.minusSeconds(1)) >= 0; } private Instant getMostRecentLastModifiedTimestamp(final ListObjectsV2Response listObjectsV2Response, diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java index 77a110528c..a0ece6d988 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java @@ -204,7 +204,7 @@ void getNextPartition_supplier_with_scheduling_options_returns_expected_Partitio final S3Object invalidForFirstBucketSuffixObject = mock(S3Object.class); given(invalidForFirstBucketSuffixObject.key()).willReturn("test.invalid"); - given(invalidForFirstBucketSuffixObject.lastModified()).willReturn(Instant.now()); + given(invalidForFirstBucketSuffixObject.lastModified()).willReturn(Instant.now().minusSeconds(2)); s3ObjectsList.add(invalidForFirstBucketSuffixObject); expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + invalidForFirstBucketSuffixObject.key()).build()); @@ -223,7 +223,9 @@ void getNextPartition_supplier_with_scheduling_options_returns_expected_Partitio final List expectedPartitionIdentifiersSecondScan = new ArrayList<>(); expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + secondScanObject.key()).build()); + expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build()); expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + secondScanObject.key()).build()); + expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build()); final List secondScanObjects = new ArrayList<>(s3ObjectsList); secondScanObjects.add(secondScanObject);