Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug so GLOBAL read-only items do not expire from TTL in ddb source coordination store #3703

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ boolean tryCreatePartitionItem(final String sourceIdentifier,
final String partitionKey,
final SourcePartitionStatus sourcePartitionStatus,
final Long closedCount,
final String partitionProgressState);
final String partitionProgressState,
final boolean isReadOnlyItem);

/**
* The following scenarios should qualify a partition as available to be acquired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public LeaseBasedSourceCoordinator(final Class<T> partitionProgressStateClass,
public void initialize() {
sourceCoordinationStore.initializeStore();
initialized = true;
sourceCoordinationStore.tryCreatePartitionItem(sourceIdentifierWithGlobalStateType, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS, SourcePartitionStatus.UNASSIGNED, 0L, null);
sourceCoordinationStore.tryCreatePartitionItem(sourceIdentifierWithGlobalStateType, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS, SourcePartitionStatus.UNASSIGNED, 0L, null, false);
}

@Override
Expand Down Expand Up @@ -195,7 +195,8 @@ private void createPartitions(final List<PartitionIdentifier> partitionIdentifie
partitionIdentifier.getPartitionKey(),
SourcePartitionStatus.UNASSIGNED,
0L,
null
null,
false
);

if (partitionCreated) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,14 @@ public <T> boolean createPartition(EnhancedSourcePartition<T> partition) {
// Don't need the status for Global state which is not for lease.
SourcePartitionStatus status = partition.getPartitionType() == null ? null : SourcePartitionStatus.UNASSIGNED;

boolean partitionCreated = coordinationStore.tryCreatePartitionItem(
return coordinationStore.tryCreatePartitionItem(
this.sourceIdentifier + "|" + partitionType,
partition.getPartitionKey(),
status,
0L,
partition.convertPartitionProgressStatetoString(partition.getProgressState())
);
return partitionCreated;
partition.convertPartitionProgressStatetoString(partition.getProgressState()),
// For now, global items with no partitionType will be considered ReadOnly, but this should be directly in EnhancedSourcePartition in the future
partition.getPartitionType() == null);

}

Expand Down Expand Up @@ -250,7 +250,7 @@ public Optional<EnhancedSourcePartition> getPartition(final String partitionKey)
// Default to Global State only.
final Optional<SourcePartitionStoreItem> sourceItem = coordinationStore.getSourcePartitionItem(this.sourceIdentifier + "|" + DEFAULT_GLOBAL_STATE_PARTITION_TYPE, partitionKey);
if (!sourceItem.isPresent()) {
LOG.error("Global state {} is not found.", partitionKey);
LOG.warn("Global partition item with sourcePartitionKey '{}' could not be found.", partitionKey);
return Optional.empty();
}
return Optional.of(partitionFactory.apply(sourceItem.get()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
Expand Down Expand Up @@ -163,7 +164,7 @@ private SourceCoordinator<String> createObjectUnderTest() {
doNothing().when(sourceCoordinationStore).initializeStore();
given(sourceCoordinationStore.tryCreatePartitionItem(
fullSourceIdentifierForGlobalState, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS,
SourcePartitionStatus.UNASSIGNED, 0L, null)).willReturn(true);
SourcePartitionStatus.UNASSIGNED, 0L, null, false)).willReturn(true);
objectUnderTest.initialize();
return objectUnderTest;
}
Expand All @@ -175,7 +176,7 @@ void initialize_calls_initializeStore() {

verify(sourceCoordinationStore).initializeStore();
verify(sourceCoordinationStore).tryCreatePartitionItem(fullSourceIdentifierForGlobalState, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS,
SourcePartitionStatus.UNASSIGNED, 0L, null);
SourcePartitionStatus.UNASSIGNED, 0L, null, false);
}

@Test
Expand All @@ -200,7 +201,7 @@ void getNextPartition_calls_supplier_and_creates_partition_with_existing_then_no
doNothing().when(sourceCoordinationStore).tryUpdateSourcePartitionItem(globalStateForPartitionCreationItem);
given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, partitionIdentifierToSkip.getPartitionKey())).willReturn(Optional.of(sourcePartitionStoreItem));
given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey())).willReturn(Optional.empty());
given(sourceCoordinationStore.tryCreatePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey(), SourcePartitionStatus.UNASSIGNED, 0L, null)).willReturn(true);
given(sourceCoordinationStore.tryCreatePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey(), SourcePartitionStatus.UNASSIGNED, 0L, null, false)).willReturn(true);

final Optional<SourcePartition<String>> result = createObjectUnderTest().getNextPartition(partitionCreationSupplier);

Expand Down Expand Up @@ -242,7 +243,7 @@ void getNextPartition_calls_supplier_which_returns_existing_partition_does_not_c

assertThat(result.isEmpty(), equalTo(true));

verify(sourceCoordinationStore, never()).tryCreatePartitionItem(anyString(), anyString(), any(), anyLong(), anyString());
verify(sourceCoordinationStore, never()).tryCreatePartitionItem(anyString(), anyString(), any(), anyLong(), anyString(), eq(false));

verify(partitionCreationSupplierInvocationsCounter).increment();
verify(noPartitionsAcquiredCounter).increment();
Expand Down Expand Up @@ -271,7 +272,7 @@ void getNextPartition_with_non_existing_item_and_create_attempt_fails_will_do_no
given(globalStateForPartitionCreationItem.getPartitionOwner()).willReturn(sourceIdentifierWithPartitionPrefix + ":" + InetAddress.getLocalHost().getHostName());
given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForGlobalState, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS)).willReturn(Optional.of(globalStateForPartitionCreationItem));
given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey())).willReturn(Optional.empty());
given(sourceCoordinationStore.tryCreatePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey(), SourcePartitionStatus.UNASSIGNED, 0L, null)).willReturn(false);
given(sourceCoordinationStore.tryCreatePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey(), SourcePartitionStatus.UNASSIGNED, 0L, null, false)).willReturn(false);

final Optional<SourcePartition<String>> result = createObjectUnderTest().getNextPartition(partitionCreationSupplier);

Expand Down Expand Up @@ -381,7 +382,7 @@ void getNextPartition_with_no_active_partition_and_successful_tryAcquireAvailabl
verify(partitionManager).setActivePartition(result.get());
verify(sourceCoordinationStore, never()).getSourcePartitionItem(anyString(), anyString());
verify(sourceCoordinationStore, never()).tryUpdateSourcePartitionItem(any(SourcePartitionStoreItem.class));
verify(sourceCoordinationStore, never()).tryCreatePartitionItem(anyString(), anyString(), any(), anyLong(), anyString());
verify(sourceCoordinationStore, never()).tryCreatePartitionItem(anyString(), anyString(), any(), anyLong(), anyString(), eq(false));

verify(partitionsAcquiredCounter).increment();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,12 @@ void test_createPartition() {
// A normal type.
TestEnhancedSourcePartition partition = new TestEnhancedSourcePartition(false);
coordinator.createPartition(partition);
verify(sourceCoordinationStore).tryCreatePartitionItem(eq(sourceIdentifier + "|" + DEFAULT_PARTITION_TYPE), anyString(), eq(SourcePartitionStatus.UNASSIGNED), anyLong(), eq(null));
verify(sourceCoordinationStore).tryCreatePartitionItem(eq(sourceIdentifier + "|" + DEFAULT_PARTITION_TYPE), anyString(), eq(SourcePartitionStatus.UNASSIGNED), anyLong(), eq(null), eq(false));

// GlobalState.
TestEnhancedSourcePartition globalState = new TestEnhancedSourcePartition(true);
coordinator.createPartition(globalState);
verify(sourceCoordinationStore).tryCreatePartitionItem(eq(sourceIdentifier + "|GLOBAL"), anyString(), eq(null), anyLong(), eq(null));
verify(sourceCoordinationStore).tryCreatePartitionItem(eq(sourceIdentifier + "|GLOBAL"), anyString(), eq(null), anyLong(), eq(null), eq(true));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ public boolean tryCreatePartitionItem(final String sourceIdentifier,
final String sourcePartitionKey,
final SourcePartitionStatus sourcePartitionStatus,
final Long closedCount,
final String partitionProgressState) {
final String partitionProgressState,
final boolean isReadOnlyItem) {
final DynamoDbSourcePartitionItem newPartitionItem = new DynamoDbSourcePartitionItem();

if (Objects.nonNull(dynamoStoreSettings.getTtl())) {
if (!isReadOnlyItem && Objects.nonNull(dynamoStoreSettings.getTtl())) {
newPartitionItem.setExpirationTime(Instant.now().plus(dynamoStoreSettings.getTtl()).getEpochSecond());
}
newPartitionItem.setSourceIdentifier(sourceIdentifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockedStatic;
Expand All @@ -27,11 +29,13 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -94,8 +98,9 @@ void getSourcePartitionItem_calls_dynamoClientWrapper_correctly() {
assertThat(result.get(), equalTo(sourcePartitionStoreItem));
}

@Test
void tryCreatePartitionItem_calls_dynamoDbClientWrapper_correctly() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void tryCreatePartitionItem_calls_dynamoDbClientWrapper_correctly(final boolean isReadOnlyItem) {
final String sourceIdentifier = UUID.randomUUID().toString();
final String sourcePartitionKey = UUID.randomUUID().toString();
final SourcePartitionStatus sourcePartitionStatus = SourcePartitionStatus.UNASSIGNED;
Expand All @@ -107,9 +112,9 @@ void tryCreatePartitionItem_calls_dynamoDbClientWrapper_correctly() {

final Duration ttl = Duration.ofSeconds(30);
final Long nowPlusTtl = Instant.now().plus(ttl).getEpochSecond();
given(dynamoStoreSettings.getTtl()).willReturn(ttl);
lenient().when(dynamoStoreSettings.getTtl()).thenReturn(ttl);

final boolean result = createObjectUnderTest().tryCreatePartitionItem(sourceIdentifier, sourcePartitionKey, sourcePartitionStatus, closedCount, partitionProgressState);
final boolean result = createObjectUnderTest().tryCreatePartitionItem(sourceIdentifier, sourcePartitionKey, sourcePartitionStatus, closedCount, partitionProgressState, isReadOnlyItem);

assertThat(result, equalTo(true));

Expand All @@ -122,7 +127,12 @@ void tryCreatePartitionItem_calls_dynamoDbClientWrapper_correctly() {
assertThat(createdItem.getPartitionProgressState(), equalTo(partitionProgressState));
assertThat(createdItem.getSourceStatusCombinationKey(), equalTo(sourceIdentifier + "|" + sourcePartitionStatus));
assertThat(createdItem.getPartitionPriority(), notNullValue());
assertThat(createdItem.getExpirationTime(), greaterThanOrEqualTo(nowPlusTtl));

if (isReadOnlyItem) {
assertThat(createdItem.getExpirationTime(), nullValue());
} else {
assertThat(createdItem.getExpirationTime(), greaterThanOrEqualTo(nowPlusTtl));
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public boolean tryCreatePartitionItem(final String sourceIdentifier,
final String partitionKey,
final SourcePartitionStatus sourcePartitionStatus,
final Long closedCount,
final String partitionProgressState) {
final String partitionProgressState,
final boolean isReadOnlyItem) {
synchronized (this) {
if (inMemoryPartitionAccessor.getItem(sourceIdentifier, partitionKey).isEmpty()) {
final InMemorySourcePartitionStoreItem inMemorySourcePartitionStoreItem = new InMemorySourcePartitionStoreItem();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void tryCreatePartitionItem_for_item_that_exists_does_not_queuePartition_and_ret
given(inMemoryPartitionAccessor.getItem(sourceIdentifier, partitionKey)).willReturn(Optional.of(mock(SourcePartitionStoreItem.class)));
final InMemorySourceCoordinationStore objectUnderTest = createObjectUnderTest();

final boolean created = objectUnderTest.tryCreatePartitionItem(sourceIdentifier, partitionKey, status, closedCount, partitionProgressState);
final boolean created = objectUnderTest.tryCreatePartitionItem(sourceIdentifier, partitionKey, status, closedCount, partitionProgressState, false);
assertThat(created, equalTo(false));
verify(inMemoryPartitionAccessor, never()).queuePartition(any());
}
Expand All @@ -129,7 +129,7 @@ void tryCreatePartitionItem_for_item_that_does_not_exist_queues_that_partition()
doNothing().when(inMemoryPartitionAccessor).queuePartition(argumentCaptor.capture());
final InMemorySourceCoordinationStore objectUnderTest = createObjectUnderTest();

final boolean created = objectUnderTest.tryCreatePartitionItem(sourceIdentifier, partitionKey, status, closedCount, partitionProgressState);
final boolean created = objectUnderTest.tryCreatePartitionItem(sourceIdentifier, partitionKey, status, closedCount, partitionProgressState, false);
assertThat(created, equalTo(true));

final InMemorySourcePartitionStoreItem createdItem = argumentCaptor.getValue();
Expand Down
Loading