Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update expiration time based on ttl on the source coordinator item. #5262

Merged
merged 2 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ public Optional<SourcePartitionStoreItem> getAvailablePartition(final String own
final Duration ownershipTimeout,
final SourcePartitionStatus sourcePartitionStatus,
final String sourceStatusCombinationKey,
final int pageLimit) {
final int pageLimit,
final Duration ttl) {
try {

final DynamoDbIndex<DynamoDbSourcePartitionItem> sourceStatusIndex = table.index(SOURCE_STATUS_COMBINATION_KEY_GLOBAL_SECONDARY_INDEX);
Expand Down Expand Up @@ -273,8 +274,11 @@ public Optional<SourcePartitionStoreItem> getAvailablePartition(final String own
item.setSourcePartitionStatus(SourcePartitionStatus.ASSIGNED);
item.setSourceStatusCombinationKey(String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, item.getSourceIdentifier(), SourcePartitionStatus.ASSIGNED));
item.setPartitionPriority(partitionOwnershipTimeout.toString());
final boolean acquired = this.tryAcquirePartitionItem(item);
if (Objects.nonNull(ttl)) {
item.setExpirationTime(Instant.now().plus(ttl).getEpochSecond());
}

final boolean acquired = this.tryAcquirePartitionItem(item);
if (acquired) {
return Optional.of(item);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,23 +98,23 @@ public boolean tryCreatePartitionItem(final String sourceIdentifier,
public Optional<SourcePartitionStoreItem> tryAcquireAvailablePartition(final String sourceIdentifier, final String ownerId, final Duration ownershipTimeout) {
final Optional<SourcePartitionStoreItem> acquiredAssignedItem = dynamoDbClientWrapper.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.ASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED), 1);
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED), 1, dynamoStoreSettings.getTtl());

if (acquiredAssignedItem.isPresent()) {
return acquiredAssignedItem;
}

final Optional<SourcePartitionStoreItem> acquiredUnassignedItem = dynamoDbClientWrapper.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.UNASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), 5);
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), 5, dynamoStoreSettings.getTtl());

if (acquiredUnassignedItem.isPresent()) {
return acquiredUnassignedItem;
}

return dynamoDbClientWrapper.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED), 1);
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED), 1, dynamoStoreSettings.getTtl());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -464,8 +465,10 @@ void getAvailablePartition_with_no_items_from_query_returns_empty_optional(final

final int pageLimit = new Random().nextInt(20);

final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10);

final Optional<SourcePartitionStoreItem> result = objectUnderTest.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, pageLimit);
ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, pageLimit, ttl);

assertThat(result.isEmpty(), equalTo(true));

Expand Down Expand Up @@ -513,8 +516,10 @@ void getAvailablePartition_will_continue_until_tryAcquirePartition_succeeds(fina
final DynamoDbClientWrapper objectUnderTest = createObjectUnderTest();
reflectivelySetField(objectUnderTest, "table", table);

final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10);

final Optional<SourcePartitionStoreItem> result = objectUnderTest.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20));
ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20), ttl);

assertThat(result.isPresent(), equalTo(true));
assertThat(result.get(), equalTo(acquiredItem));
Expand All @@ -531,6 +536,10 @@ void getAvailablePartition_will_continue_until_tryAcquirePartition_succeeds(fina

assertThat(newPartitionOwnershipTimeout.isAfter(now.plus(ownershipTimeout)), equalTo(true));

final ArgumentCaptor<Long> expiryTimeArgumentCaptor = ArgumentCaptor.forClass(Long.class);
verify(acquiredItem).setExpirationTime(expiryTimeArgumentCaptor.capture());
assertThat(expiryTimeArgumentCaptor.getValue(), greaterThan(Instant.now().getEpochSecond()));

verify(acquiredItem).setPartitionPriority(newPartitionOwnershipTimeout.toString());
}

Expand Down Expand Up @@ -574,8 +583,10 @@ void getAvailablePartition_with_multiple_pages_continue_until_tryAcquirePartitio
final DynamoDbClientWrapper objectUnderTest = createObjectUnderTest();
reflectivelySetField(objectUnderTest, "table", table);

final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10);

final Optional<SourcePartitionStoreItem> result = objectUnderTest.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20));
ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20), ttl);

assertThat(result.isPresent(), equalTo(true));
assertThat(result.get(), equalTo(acquiredItem));
Expand All @@ -593,6 +604,10 @@ void getAvailablePartition_with_multiple_pages_continue_until_tryAcquirePartitio
assertThat(newPartitionOwnershipTimeout.isAfter(now.plus(ownershipTimeout)), equalTo(true));

verify(acquiredItem).setPartitionPriority(newPartitionOwnershipTimeout.toString());

final ArgumentCaptor<Long> expiryTimeArgumentCaptor = ArgumentCaptor.forClass(Long.class);
verify(acquiredItem).setExpirationTime(expiryTimeArgumentCaptor.capture());
assertThat(expiryTimeArgumentCaptor.getValue(), greaterThan(Instant.now().getEpochSecond()));
}

@ParameterizedTest
Expand Down Expand Up @@ -635,8 +650,10 @@ void getAvailablePartition_with_multiple_pages_will_iterate_through_all_items_wi
final DynamoDbClientWrapper objectUnderTest = createObjectUnderTest();
reflectivelySetField(objectUnderTest, "table", table);

final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10);

final Optional<SourcePartitionStoreItem> result = objectUnderTest.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20));
ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20), ttl);

assertThat(result.isEmpty(), equalTo(true));

Expand All @@ -653,6 +670,10 @@ void getAvailablePartition_with_multiple_pages_will_iterate_through_all_items_wi
assertThat(newPartitionOwnershipTimeout.isAfter(now.plus(ownershipTimeout)), equalTo(true));

verify(acquiredItem).setPartitionPriority(newPartitionOwnershipTimeout.toString());

final ArgumentCaptor<Long> expiryTimeArgumentCaptor = ArgumentCaptor.forClass(Long.class);
verify(acquiredItem).setExpirationTime(expiryTimeArgumentCaptor.capture());
assertThat(expiryTimeArgumentCaptor.getValue(), greaterThan(Instant.now().getEpochSecond()));
}

@Test
Expand Down Expand Up @@ -681,7 +702,7 @@ void getAvailablePartition_with_assigned_partition_with_unexpired_partitionOwner
reflectivelySetField(objectUnderTest, "table", table);

final Optional<SourcePartitionStoreItem> result = objectUnderTest.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.ASSIGNED, sourceStatusCombinationKey, new Random().nextInt(20));
ownerId, ownershipTimeout, SourcePartitionStatus.ASSIGNED, sourceStatusCombinationKey, new Random().nextInt(20), Duration.ofSeconds(new Random().nextInt()));

assertThat(result.isEmpty(), equalTo(true));

Expand Down Expand Up @@ -716,7 +737,7 @@ void getAvailablePartition_with_closed_partition_with_unreached_reOpenAt_time_re
reflectivelySetField(objectUnderTest, "table", table);

final Optional<SourcePartitionStoreItem> result = objectUnderTest.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED, sourceStatusCombinationKey, new Random().nextInt(20));
ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED, sourceStatusCombinationKey, new Random().nextInt(20), Duration.ofSeconds(new Random().nextInt()));

assertThat(result.isEmpty(), equalTo(true));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,21 +245,23 @@ void getAvailablePartition_with_no_item_acquired_returns_empty_optional() {
final String ownerId = UUID.randomUUID().toString();
final String sourceIdentifier = UUID.randomUUID().toString();
final Duration ownershipTimeout = Duration.ofMinutes(2);
final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10);

given(dynamoStoreSettings.getTtl()).willReturn(ttl);
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.ASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED),
1))
1, ttl))
.willReturn(Optional.empty());
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.CLOSED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED),
1))
1, ttl))
.willReturn(Optional.empty());
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.UNASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED),
5))
5, ttl))
.willReturn(Optional.empty());

final Optional<SourcePartitionStoreItem> result = createObjectUnderTest().tryAcquireAvailablePartition(sourceIdentifier, ownerId, ownershipTimeout);
Expand All @@ -272,13 +274,14 @@ void getAvailablePartition_with_acquired_ASSIGNED_partition_returns_the_partitio
final String ownerId = UUID.randomUUID().toString();
final String sourceIdentifier = UUID.randomUUID().toString();
final Duration ownershipTimeout = Duration.ofMinutes(2);

final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10);
given(dynamoStoreSettings.getTtl()).willReturn(ttl);
final DynamoDbSourcePartitionItem acquiredItem = mock(DynamoDbSourcePartitionItem.class);

given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.ASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED),
1))
1, ttl))
.willReturn(Optional.of(acquiredItem));

final Optional<SourcePartitionStoreItem> result = createObjectUnderTest().tryAcquireAvailablePartition(sourceIdentifier, ownerId, ownershipTimeout);
Expand All @@ -294,23 +297,25 @@ void getAvailablePartition_with_acquired_CLOSED_partition_returns_the_partition(
final String ownerId = UUID.randomUUID().toString();
final String sourceIdentifier = UUID.randomUUID().toString();
final Duration ownershipTimeout = Duration.ofMinutes(2);
final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10);

given(dynamoStoreSettings.getTtl()).willReturn(ttl);
final DynamoDbSourcePartitionItem acquiredItem = mock(DynamoDbSourcePartitionItem.class);

given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.ASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED),
1))
1, ttl))
.willReturn(Optional.empty());
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.UNASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED),
5))
5, ttl))
.willReturn(Optional.empty());
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.CLOSED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED),
1))
1, ttl))
.willReturn(Optional.of(acquiredItem));

final Optional<SourcePartitionStoreItem> result = createObjectUnderTest().tryAcquireAvailablePartition(sourceIdentifier, ownerId, ownershipTimeout);
Expand All @@ -326,18 +331,20 @@ void getAvailablePartition_with_acquired_UNASSIGNED_partition_returns_the_partit
final String ownerId = UUID.randomUUID().toString();
final String sourceIdentifier = UUID.randomUUID().toString();
final Duration ownershipTimeout = Duration.ofMinutes(2);
final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10);

given(dynamoStoreSettings.getTtl()).willReturn(ttl);
final DynamoDbSourcePartitionItem acquiredItem = mock(DynamoDbSourcePartitionItem.class);

given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.ASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED),
1))
1, ttl))
.willReturn(Optional.empty());
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.UNASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED),
5))
5, ttl))
.willReturn(Optional.of(acquiredItem));

final Optional<SourcePartitionStoreItem> result = createObjectUnderTest().tryAcquireAvailablePartition(sourceIdentifier, ownerId, ownershipTimeout);
Expand Down
Loading