From c30dc2dc0e46f697f8c3624c0130d7710d49d7d6 Mon Sep 17 00:00:00 2001 From: Souvik Bose Date: Thu, 12 Dec 2024 18:13:28 -0800 Subject: [PATCH] Fix the unit tests Signed-off-by: Souvik Bose --- .../dynamodb/DynamoDbClientWrapperTest.java | 12 ++++++++---- .../DynamoDbSourceCoordinationStoreTest.java | 5 +++++ 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapperTest.java b/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapperTest.java index d64913e862..561733762e 100644 --- a/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapperTest.java +++ b/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapperTest.java @@ -62,7 +62,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; @@ -518,6 +518,8 @@ void getAvailablePartition_will_continue_until_tryAcquirePartition_succeeds(fina final Duration ttl = Duration.ofSeconds(new Random().nextInt()); + final Long expectedExpirationTime = Instant.now().plus(ttl).getEpochSecond(); + final Optional result = objectUnderTest.getAvailablePartition( ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20), ttl); @@ -538,7 +540,7 @@ void getAvailablePartition_will_continue_until_tryAcquirePartition_succeeds(fina final ArgumentCaptor expiryTimeArgumentCaptor = ArgumentCaptor.forClass(Long.class); verify(acquiredItem).setExpirationTime(expiryTimeArgumentCaptor.capture()); - assertThat(expiryTimeArgumentCaptor.getValue(), greaterThan(Instant.now().getEpochSecond())); + assertThat(expiryTimeArgumentCaptor.getValue(), greaterThanOrEqualTo(expectedExpirationTime)); verify(acquiredItem).setPartitionPriority(newPartitionOwnershipTimeout.toString()); } @@ -584,6 +586,7 @@ void getAvailablePartition_with_multiple_pages_continue_until_tryAcquirePartitio reflectivelySetField(objectUnderTest, "table", table); final Duration ttl = Duration.ofSeconds(new Random().nextInt()); + final Long expectedExpirationTime = Instant.now().plus(ttl).getEpochSecond(); final Optional result = objectUnderTest.getAvailablePartition( ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20), ttl); @@ -607,7 +610,7 @@ void getAvailablePartition_with_multiple_pages_continue_until_tryAcquirePartitio final ArgumentCaptor expiryTimeArgumentCaptor = ArgumentCaptor.forClass(Long.class); verify(acquiredItem).setExpirationTime(expiryTimeArgumentCaptor.capture()); - assertThat(expiryTimeArgumentCaptor.getValue(), greaterThan(Instant.now().getEpochSecond())); + assertThat(expiryTimeArgumentCaptor.getValue(), greaterThanOrEqualTo(expectedExpirationTime)); } @ParameterizedTest @@ -651,6 +654,7 @@ void getAvailablePartition_with_multiple_pages_will_iterate_through_all_items_wi reflectivelySetField(objectUnderTest, "table", table); final Duration ttl = Duration.ofSeconds(new Random().nextInt()); + final Long expectedExpirationTime = Instant.now().plus(ttl).getEpochSecond(); final Optional result = objectUnderTest.getAvailablePartition( ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20), ttl); @@ -673,7 +677,7 @@ void getAvailablePartition_with_multiple_pages_will_iterate_through_all_items_wi final ArgumentCaptor expiryTimeArgumentCaptor = ArgumentCaptor.forClass(Long.class); verify(acquiredItem).setExpirationTime(expiryTimeArgumentCaptor.capture()); - assertThat(expiryTimeArgumentCaptor.getValue(), greaterThan(Instant.now().getEpochSecond())); + assertThat(expiryTimeArgumentCaptor.getValue(), greaterThanOrEqualTo(expectedExpirationTime)); } @Test diff --git a/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java b/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java index 77ffb49c6b..717c35a7f5 100644 --- a/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java +++ b/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java @@ -247,6 +247,8 @@ void getAvailablePartition_with_no_item_acquired_returns_empty_optional() { final Duration ownershipTimeout = Duration.ofMinutes(2); final Duration ttl = Duration.ofSeconds(new Random().nextInt()); + given(dynamoStoreSettings.getTtl()).willReturn(ttl); + given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, SourcePartitionStatus.ASSIGNED, String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED), @@ -275,6 +277,7 @@ void getAvailablePartition_with_acquired_ASSIGNED_partition_returns_the_partitio final Duration ownershipTimeout = Duration.ofMinutes(2); final Duration ttl = Duration.ofSeconds(new Random().nextInt()); + given(dynamoStoreSettings.getTtl()).willReturn(ttl); final DynamoDbSourcePartitionItem acquiredItem = mock(DynamoDbSourcePartitionItem.class); given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, @@ -298,6 +301,7 @@ void getAvailablePartition_with_acquired_CLOSED_partition_returns_the_partition( final Duration ownershipTimeout = Duration.ofMinutes(2); final Duration ttl = Duration.ofSeconds(new Random().nextInt()); + given(dynamoStoreSettings.getTtl()).willReturn(ttl); final DynamoDbSourcePartitionItem acquiredItem = mock(DynamoDbSourcePartitionItem.class); given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, @@ -330,6 +334,7 @@ void getAvailablePartition_with_acquired_UNASSIGNED_partition_returns_the_partit final String sourceIdentifier = UUID.randomUUID().toString(); final Duration ownershipTimeout = Duration.ofMinutes(2); final Duration ttl = Duration.ofSeconds(new Random().nextInt()); + given(dynamoStoreSettings.getTtl()).willReturn(ttl); final DynamoDbSourcePartitionItem acquiredItem = mock(DynamoDbSourcePartitionItem.class);