Skip to content

Commit

Permalink
Fix bug so GLOBAL read-only items do not expire from TTL in ddb sourc…
Browse files Browse the repository at this point in the history
…e coordination store (#3703)

Fix bug so GLOBAL read-only items do not expire from TTL in ddb source coordination store

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Nov 28, 2023
1 parent 6878f56 commit c8548a0
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ boolean tryCreatePartitionItem(final String sourceIdentifier,
final String partitionKey,
final SourcePartitionStatus sourcePartitionStatus,
final Long closedCount,
final String partitionProgressState);
final String partitionProgressState,
final boolean isReadOnlyItem);

/**
* The following scenarios should qualify a partition as available to be acquired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public LeaseBasedSourceCoordinator(final Class<T> partitionProgressStateClass,
public void initialize() {
sourceCoordinationStore.initializeStore();
initialized = true;
sourceCoordinationStore.tryCreatePartitionItem(sourceIdentifierWithGlobalStateType, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS, SourcePartitionStatus.UNASSIGNED, 0L, null);
sourceCoordinationStore.tryCreatePartitionItem(sourceIdentifierWithGlobalStateType, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS, SourcePartitionStatus.UNASSIGNED, 0L, null, false);
}

@Override
Expand Down Expand Up @@ -195,7 +195,8 @@ private void createPartitions(final List<PartitionIdentifier> partitionIdentifie
partitionIdentifier.getPartitionKey(),
SourcePartitionStatus.UNASSIGNED,
0L,
null
null,
false
);

if (partitionCreated) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,14 @@ public <T> boolean createPartition(EnhancedSourcePartition<T> partition) {
// Don't need the status for Global state which is not for lease.
SourcePartitionStatus status = partition.getPartitionType() == null ? null : SourcePartitionStatus.UNASSIGNED;

boolean partitionCreated = coordinationStore.tryCreatePartitionItem(
return coordinationStore.tryCreatePartitionItem(
this.sourceIdentifier + "|" + partitionType,
partition.getPartitionKey(),
status,
0L,
partition.convertPartitionProgressStatetoString(partition.getProgressState())
);
return partitionCreated;
partition.convertPartitionProgressStatetoString(partition.getProgressState()),
// For now, global items with no partitionType will be considered ReadOnly, but this should be directly in EnhancedSourcePartition in the future
partition.getPartitionType() == null);

}

Expand Down Expand Up @@ -250,7 +250,7 @@ public Optional<EnhancedSourcePartition> getPartition(final String partitionKey)
// Default to Global State only.
final Optional<SourcePartitionStoreItem> sourceItem = coordinationStore.getSourcePartitionItem(this.sourceIdentifier + "|" + DEFAULT_GLOBAL_STATE_PARTITION_TYPE, partitionKey);
if (!sourceItem.isPresent()) {
LOG.error("Global state {} is not found.", partitionKey);
LOG.warn("Global partition item with sourcePartitionKey '{}' could not be found.", partitionKey);
return Optional.empty();
}
return Optional.of(partitionFactory.apply(sourceItem.get()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
Expand Down Expand Up @@ -163,7 +164,7 @@ private SourceCoordinator<String> createObjectUnderTest() {
doNothing().when(sourceCoordinationStore).initializeStore();
given(sourceCoordinationStore.tryCreatePartitionItem(
fullSourceIdentifierForGlobalState, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS,
SourcePartitionStatus.UNASSIGNED, 0L, null)).willReturn(true);
SourcePartitionStatus.UNASSIGNED, 0L, null, false)).willReturn(true);
objectUnderTest.initialize();
return objectUnderTest;
}
Expand All @@ -175,7 +176,7 @@ void initialize_calls_initializeStore() {

verify(sourceCoordinationStore).initializeStore();
verify(sourceCoordinationStore).tryCreatePartitionItem(fullSourceIdentifierForGlobalState, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS,
SourcePartitionStatus.UNASSIGNED, 0L, null);
SourcePartitionStatus.UNASSIGNED, 0L, null, false);
}

@Test
Expand All @@ -200,7 +201,7 @@ void getNextPartition_calls_supplier_and_creates_partition_with_existing_then_no
doNothing().when(sourceCoordinationStore).tryUpdateSourcePartitionItem(globalStateForPartitionCreationItem);
given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, partitionIdentifierToSkip.getPartitionKey())).willReturn(Optional.of(sourcePartitionStoreItem));
given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey())).willReturn(Optional.empty());
given(sourceCoordinationStore.tryCreatePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey(), SourcePartitionStatus.UNASSIGNED, 0L, null)).willReturn(true);
given(sourceCoordinationStore.tryCreatePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey(), SourcePartitionStatus.UNASSIGNED, 0L, null, false)).willReturn(true);

final Optional<SourcePartition<String>> result = createObjectUnderTest().getNextPartition(partitionCreationSupplier);

Expand Down Expand Up @@ -242,7 +243,7 @@ void getNextPartition_calls_supplier_which_returns_existing_partition_does_not_c

assertThat(result.isEmpty(), equalTo(true));

verify(sourceCoordinationStore, never()).tryCreatePartitionItem(anyString(), anyString(), any(), anyLong(), anyString());
verify(sourceCoordinationStore, never()).tryCreatePartitionItem(anyString(), anyString(), any(), anyLong(), anyString(), eq(false));

verify(partitionCreationSupplierInvocationsCounter).increment();
verify(noPartitionsAcquiredCounter).increment();
Expand Down Expand Up @@ -271,7 +272,7 @@ void getNextPartition_with_non_existing_item_and_create_attempt_fails_will_do_no
given(globalStateForPartitionCreationItem.getPartitionOwner()).willReturn(sourceIdentifierWithPartitionPrefix + ":" + InetAddress.getLocalHost().getHostName());
given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForGlobalState, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS)).willReturn(Optional.of(globalStateForPartitionCreationItem));
given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey())).willReturn(Optional.empty());
given(sourceCoordinationStore.tryCreatePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey(), SourcePartitionStatus.UNASSIGNED, 0L, null)).willReturn(false);
given(sourceCoordinationStore.tryCreatePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey(), SourcePartitionStatus.UNASSIGNED, 0L, null, false)).willReturn(false);

final Optional<SourcePartition<String>> result = createObjectUnderTest().getNextPartition(partitionCreationSupplier);

Expand Down Expand Up @@ -381,7 +382,7 @@ void getNextPartition_with_no_active_partition_and_successful_tryAcquireAvailabl
verify(partitionManager).setActivePartition(result.get());
verify(sourceCoordinationStore, never()).getSourcePartitionItem(anyString(), anyString());
verify(sourceCoordinationStore, never()).tryUpdateSourcePartitionItem(any(SourcePartitionStoreItem.class));
verify(sourceCoordinationStore, never()).tryCreatePartitionItem(anyString(), anyString(), any(), anyLong(), anyString());
verify(sourceCoordinationStore, never()).tryCreatePartitionItem(anyString(), anyString(), any(), anyLong(), anyString(), eq(false));

verify(partitionsAcquiredCounter).increment();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,12 @@ void test_createPartition() {
// A normal type.
TestEnhancedSourcePartition partition = new TestEnhancedSourcePartition(false);
coordinator.createPartition(partition);
verify(sourceCoordinationStore).tryCreatePartitionItem(eq(sourceIdentifier + "|" + DEFAULT_PARTITION_TYPE), anyString(), eq(SourcePartitionStatus.UNASSIGNED), anyLong(), eq(null));
verify(sourceCoordinationStore).tryCreatePartitionItem(eq(sourceIdentifier + "|" + DEFAULT_PARTITION_TYPE), anyString(), eq(SourcePartitionStatus.UNASSIGNED), anyLong(), eq(null), eq(false));

// GlobalState.
TestEnhancedSourcePartition globalState = new TestEnhancedSourcePartition(true);
coordinator.createPartition(globalState);
verify(sourceCoordinationStore).tryCreatePartitionItem(eq(sourceIdentifier + "|GLOBAL"), anyString(), eq(null), anyLong(), eq(null));
verify(sourceCoordinationStore).tryCreatePartitionItem(eq(sourceIdentifier + "|GLOBAL"), anyString(), eq(null), anyLong(), eq(null), eq(true));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ public boolean tryCreatePartitionItem(final String sourceIdentifier,
final String sourcePartitionKey,
final SourcePartitionStatus sourcePartitionStatus,
final Long closedCount,
final String partitionProgressState) {
final String partitionProgressState,
final boolean isReadOnlyItem) {
final DynamoDbSourcePartitionItem newPartitionItem = new DynamoDbSourcePartitionItem();

if (Objects.nonNull(dynamoStoreSettings.getTtl())) {
if (!isReadOnlyItem && Objects.nonNull(dynamoStoreSettings.getTtl())) {
newPartitionItem.setExpirationTime(Instant.now().plus(dynamoStoreSettings.getTtl()).getEpochSecond());
}
newPartitionItem.setSourceIdentifier(sourceIdentifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockedStatic;
Expand All @@ -27,11 +29,13 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -94,8 +98,9 @@ void getSourcePartitionItem_calls_dynamoClientWrapper_correctly() {
assertThat(result.get(), equalTo(sourcePartitionStoreItem));
}

@Test
void tryCreatePartitionItem_calls_dynamoDbClientWrapper_correctly() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void tryCreatePartitionItem_calls_dynamoDbClientWrapper_correctly(final boolean isReadOnlyItem) {
final String sourceIdentifier = UUID.randomUUID().toString();
final String sourcePartitionKey = UUID.randomUUID().toString();
final SourcePartitionStatus sourcePartitionStatus = SourcePartitionStatus.UNASSIGNED;
Expand All @@ -107,9 +112,9 @@ void tryCreatePartitionItem_calls_dynamoDbClientWrapper_correctly() {

final Duration ttl = Duration.ofSeconds(30);
final Long nowPlusTtl = Instant.now().plus(ttl).getEpochSecond();
given(dynamoStoreSettings.getTtl()).willReturn(ttl);
lenient().when(dynamoStoreSettings.getTtl()).thenReturn(ttl);

final boolean result = createObjectUnderTest().tryCreatePartitionItem(sourceIdentifier, sourcePartitionKey, sourcePartitionStatus, closedCount, partitionProgressState);
final boolean result = createObjectUnderTest().tryCreatePartitionItem(sourceIdentifier, sourcePartitionKey, sourcePartitionStatus, closedCount, partitionProgressState, isReadOnlyItem);

assertThat(result, equalTo(true));

Expand All @@ -122,7 +127,12 @@ void tryCreatePartitionItem_calls_dynamoDbClientWrapper_correctly() {
assertThat(createdItem.getPartitionProgressState(), equalTo(partitionProgressState));
assertThat(createdItem.getSourceStatusCombinationKey(), equalTo(sourceIdentifier + "|" + sourcePartitionStatus));
assertThat(createdItem.getPartitionPriority(), notNullValue());
assertThat(createdItem.getExpirationTime(), greaterThanOrEqualTo(nowPlusTtl));

if (isReadOnlyItem) {
assertThat(createdItem.getExpirationTime(), nullValue());
} else {
assertThat(createdItem.getExpirationTime(), greaterThanOrEqualTo(nowPlusTtl));
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public boolean tryCreatePartitionItem(final String sourceIdentifier,
final String partitionKey,
final SourcePartitionStatus sourcePartitionStatus,
final Long closedCount,
final String partitionProgressState) {
final String partitionProgressState,
final boolean isReadOnlyItem) {
synchronized (this) {
if (inMemoryPartitionAccessor.getItem(sourceIdentifier, partitionKey).isEmpty()) {
final InMemorySourcePartitionStoreItem inMemorySourcePartitionStoreItem = new InMemorySourcePartitionStoreItem();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void tryCreatePartitionItem_for_item_that_exists_does_not_queuePartition_and_ret
given(inMemoryPartitionAccessor.getItem(sourceIdentifier, partitionKey)).willReturn(Optional.of(mock(SourcePartitionStoreItem.class)));
final InMemorySourceCoordinationStore objectUnderTest = createObjectUnderTest();

final boolean created = objectUnderTest.tryCreatePartitionItem(sourceIdentifier, partitionKey, status, closedCount, partitionProgressState);
final boolean created = objectUnderTest.tryCreatePartitionItem(sourceIdentifier, partitionKey, status, closedCount, partitionProgressState, false);
assertThat(created, equalTo(false));
verify(inMemoryPartitionAccessor, never()).queuePartition(any());
}
Expand All @@ -129,7 +129,7 @@ void tryCreatePartitionItem_for_item_that_does_not_exist_queues_that_partition()
doNothing().when(inMemoryPartitionAccessor).queuePartition(argumentCaptor.capture());
final InMemorySourceCoordinationStore objectUnderTest = createObjectUnderTest();

final boolean created = objectUnderTest.tryCreatePartitionItem(sourceIdentifier, partitionKey, status, closedCount, partitionProgressState);
final boolean created = objectUnderTest.tryCreatePartitionItem(sourceIdentifier, partitionKey, status, closedCount, partitionProgressState, false);
assertThat(created, equalTo(true));

final InMemorySourcePartitionStoreItem createdItem = argumentCaptor.getValue();
Expand Down

0 comments on commit c8548a0

Please sign in to comment.