diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/SourceCoordinationStore.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/SourceCoordinationStore.java index 13d8d00a03..26bfb5e8d8 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/SourceCoordinationStore.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/SourceCoordinationStore.java @@ -39,7 +39,8 @@ boolean tryCreatePartitionItem(final String sourceIdentifier, final String partitionKey, final SourcePartitionStatus sourcePartitionStatus, final Long closedCount, - final String partitionProgressState); + final String partitionProgressState, + final boolean isReadOnlyItem); /** * The following scenarios should qualify a partition as available to be acquired diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java index be282d0ca5..272cdbb21c 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java @@ -134,7 +134,7 @@ public LeaseBasedSourceCoordinator(final Class partitionProgressStateClass, public void initialize() { sourceCoordinationStore.initializeStore(); initialized = true; - sourceCoordinationStore.tryCreatePartitionItem(sourceIdentifierWithGlobalStateType, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS, SourcePartitionStatus.UNASSIGNED, 0L, null); + sourceCoordinationStore.tryCreatePartitionItem(sourceIdentifierWithGlobalStateType, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS, SourcePartitionStatus.UNASSIGNED, 0L, null, false); } @Override @@ -195,7 +195,8 @@ private void createPartitions(final List partitionIdentifie partitionIdentifier.getPartitionKey(), SourcePartitionStatus.UNASSIGNED, 0L, - null + null, + false ); if (partitionCreated) { diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java index 619d8ca3e5..1dcd2ebe42 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java @@ -101,14 +101,14 @@ public boolean createPartition(EnhancedSourcePartition partition) { // Don't need the status for Global state which is not for lease. SourcePartitionStatus status = partition.getPartitionType() == null ? null : SourcePartitionStatus.UNASSIGNED; - boolean partitionCreated = coordinationStore.tryCreatePartitionItem( + return coordinationStore.tryCreatePartitionItem( this.sourceIdentifier + "|" + partitionType, partition.getPartitionKey(), status, 0L, - partition.convertPartitionProgressStatetoString(partition.getProgressState()) - ); - return partitionCreated; + partition.convertPartitionProgressStatetoString(partition.getProgressState()), + // For now, global items with no partitionType will be considered ReadOnly, but this should be directly in EnhancedSourcePartition in the future + partition.getPartitionType() == null); } @@ -250,7 +250,7 @@ public Optional getPartition(final String partitionKey) // Default to Global State only. final Optional sourceItem = coordinationStore.getSourcePartitionItem(this.sourceIdentifier + "|" + DEFAULT_GLOBAL_STATE_PARTITION_TYPE, partitionKey); if (!sourceItem.isPresent()) { - LOG.error("Global state {} is not found.", partitionKey); + LOG.warn("Global partition item with sourcePartitionKey '{}' could not be found.", partitionKey); return Optional.empty(); } return Optional.of(partitionFactory.apply(sourceItem.get())); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java index 2a95299593..8984f7cfc0 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java @@ -46,6 +46,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; @@ -163,7 +164,7 @@ private SourceCoordinator createObjectUnderTest() { doNothing().when(sourceCoordinationStore).initializeStore(); given(sourceCoordinationStore.tryCreatePartitionItem( fullSourceIdentifierForGlobalState, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS, - SourcePartitionStatus.UNASSIGNED, 0L, null)).willReturn(true); + SourcePartitionStatus.UNASSIGNED, 0L, null, false)).willReturn(true); objectUnderTest.initialize(); return objectUnderTest; } @@ -175,7 +176,7 @@ void initialize_calls_initializeStore() { verify(sourceCoordinationStore).initializeStore(); verify(sourceCoordinationStore).tryCreatePartitionItem(fullSourceIdentifierForGlobalState, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS, - SourcePartitionStatus.UNASSIGNED, 0L, null); + SourcePartitionStatus.UNASSIGNED, 0L, null, false); } @Test @@ -200,7 +201,7 @@ void getNextPartition_calls_supplier_and_creates_partition_with_existing_then_no doNothing().when(sourceCoordinationStore).tryUpdateSourcePartitionItem(globalStateForPartitionCreationItem); given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, partitionIdentifierToSkip.getPartitionKey())).willReturn(Optional.of(sourcePartitionStoreItem)); given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey())).willReturn(Optional.empty()); - given(sourceCoordinationStore.tryCreatePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey(), SourcePartitionStatus.UNASSIGNED, 0L, null)).willReturn(true); + given(sourceCoordinationStore.tryCreatePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey(), SourcePartitionStatus.UNASSIGNED, 0L, null, false)).willReturn(true); final Optional> result = createObjectUnderTest().getNextPartition(partitionCreationSupplier); @@ -242,7 +243,7 @@ void getNextPartition_calls_supplier_which_returns_existing_partition_does_not_c assertThat(result.isEmpty(), equalTo(true)); - verify(sourceCoordinationStore, never()).tryCreatePartitionItem(anyString(), anyString(), any(), anyLong(), anyString()); + verify(sourceCoordinationStore, never()).tryCreatePartitionItem(anyString(), anyString(), any(), anyLong(), anyString(), eq(false)); verify(partitionCreationSupplierInvocationsCounter).increment(); verify(noPartitionsAcquiredCounter).increment(); @@ -271,7 +272,7 @@ void getNextPartition_with_non_existing_item_and_create_attempt_fails_will_do_no given(globalStateForPartitionCreationItem.getPartitionOwner()).willReturn(sourceIdentifierWithPartitionPrefix + ":" + InetAddress.getLocalHost().getHostName()); given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForGlobalState, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS)).willReturn(Optional.of(globalStateForPartitionCreationItem)); given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey())).willReturn(Optional.empty()); - given(sourceCoordinationStore.tryCreatePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey(), SourcePartitionStatus.UNASSIGNED, 0L, null)).willReturn(false); + given(sourceCoordinationStore.tryCreatePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey(), SourcePartitionStatus.UNASSIGNED, 0L, null, false)).willReturn(false); final Optional> result = createObjectUnderTest().getNextPartition(partitionCreationSupplier); @@ -381,7 +382,7 @@ void getNextPartition_with_no_active_partition_and_successful_tryAcquireAvailabl verify(partitionManager).setActivePartition(result.get()); verify(sourceCoordinationStore, never()).getSourcePartitionItem(anyString(), anyString()); verify(sourceCoordinationStore, never()).tryUpdateSourcePartitionItem(any(SourcePartitionStoreItem.class)); - verify(sourceCoordinationStore, never()).tryCreatePartitionItem(anyString(), anyString(), any(), anyLong(), anyString()); + verify(sourceCoordinationStore, never()).tryCreatePartitionItem(anyString(), anyString(), any(), anyLong(), anyString(), eq(false)); verify(partitionsAcquiredCounter).increment(); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinatorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinatorTest.java index 6a1122ee56..6608811945 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinatorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinatorTest.java @@ -121,12 +121,12 @@ void test_createPartition() { // A normal type. TestEnhancedSourcePartition partition = new TestEnhancedSourcePartition(false); coordinator.createPartition(partition); - verify(sourceCoordinationStore).tryCreatePartitionItem(eq(sourceIdentifier + "|" + DEFAULT_PARTITION_TYPE), anyString(), eq(SourcePartitionStatus.UNASSIGNED), anyLong(), eq(null)); + verify(sourceCoordinationStore).tryCreatePartitionItem(eq(sourceIdentifier + "|" + DEFAULT_PARTITION_TYPE), anyString(), eq(SourcePartitionStatus.UNASSIGNED), anyLong(), eq(null), eq(false)); // GlobalState. TestEnhancedSourcePartition globalState = new TestEnhancedSourcePartition(true); coordinator.createPartition(globalState); - verify(sourceCoordinationStore).tryCreatePartitionItem(eq(sourceIdentifier + "|GLOBAL"), anyString(), eq(null), anyLong(), eq(null)); + verify(sourceCoordinationStore).tryCreatePartitionItem(eq(sourceIdentifier + "|GLOBAL"), anyString(), eq(null), anyLong(), eq(null), eq(true)); } diff --git a/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStore.java b/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStore.java index d8f6cde81f..2654cf5110 100644 --- a/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStore.java +++ b/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStore.java @@ -72,10 +72,11 @@ public boolean tryCreatePartitionItem(final String sourceIdentifier, final String sourcePartitionKey, final SourcePartitionStatus sourcePartitionStatus, final Long closedCount, - final String partitionProgressState) { + final String partitionProgressState, + final boolean isReadOnlyItem) { final DynamoDbSourcePartitionItem newPartitionItem = new DynamoDbSourcePartitionItem(); - if (Objects.nonNull(dynamoStoreSettings.getTtl())) { + if (!isReadOnlyItem && Objects.nonNull(dynamoStoreSettings.getTtl())) { newPartitionItem.setExpirationTime(Instant.now().plus(dynamoStoreSettings.getTtl()).getEpochSecond()); } newPartitionItem.setSourceIdentifier(sourceIdentifier); diff --git a/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java b/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java index 5003b17150..3c15376a5d 100644 --- a/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java +++ b/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java @@ -8,6 +8,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockedStatic; @@ -27,11 +29,13 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; @@ -94,8 +98,9 @@ void getSourcePartitionItem_calls_dynamoClientWrapper_correctly() { assertThat(result.get(), equalTo(sourcePartitionStoreItem)); } - @Test - void tryCreatePartitionItem_calls_dynamoDbClientWrapper_correctly() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void tryCreatePartitionItem_calls_dynamoDbClientWrapper_correctly(final boolean isReadOnlyItem) { final String sourceIdentifier = UUID.randomUUID().toString(); final String sourcePartitionKey = UUID.randomUUID().toString(); final SourcePartitionStatus sourcePartitionStatus = SourcePartitionStatus.UNASSIGNED; @@ -107,9 +112,9 @@ void tryCreatePartitionItem_calls_dynamoDbClientWrapper_correctly() { final Duration ttl = Duration.ofSeconds(30); final Long nowPlusTtl = Instant.now().plus(ttl).getEpochSecond(); - given(dynamoStoreSettings.getTtl()).willReturn(ttl); + lenient().when(dynamoStoreSettings.getTtl()).thenReturn(ttl); - final boolean result = createObjectUnderTest().tryCreatePartitionItem(sourceIdentifier, sourcePartitionKey, sourcePartitionStatus, closedCount, partitionProgressState); + final boolean result = createObjectUnderTest().tryCreatePartitionItem(sourceIdentifier, sourcePartitionKey, sourcePartitionStatus, closedCount, partitionProgressState, isReadOnlyItem); assertThat(result, equalTo(true)); @@ -122,7 +127,12 @@ void tryCreatePartitionItem_calls_dynamoDbClientWrapper_correctly() { assertThat(createdItem.getPartitionProgressState(), equalTo(partitionProgressState)); assertThat(createdItem.getSourceStatusCombinationKey(), equalTo(sourceIdentifier + "|" + sourcePartitionStatus)); assertThat(createdItem.getPartitionPriority(), notNullValue()); - assertThat(createdItem.getExpirationTime(), greaterThanOrEqualTo(nowPlusTtl)); + + if (isReadOnlyItem) { + assertThat(createdItem.getExpirationTime(), nullValue()); + } else { + assertThat(createdItem.getExpirationTime(), greaterThanOrEqualTo(nowPlusTtl)); + } } @Test diff --git a/data-prepper-plugins/in-memory-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemorySourceCoordinationStore.java b/data-prepper-plugins/in-memory-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemorySourceCoordinationStore.java index 0ea8c601a9..16cfb48a6d 100644 --- a/data-prepper-plugins/in-memory-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemorySourceCoordinationStore.java +++ b/data-prepper-plugins/in-memory-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemorySourceCoordinationStore.java @@ -68,7 +68,8 @@ public boolean tryCreatePartitionItem(final String sourceIdentifier, final String partitionKey, final SourcePartitionStatus sourcePartitionStatus, final Long closedCount, - final String partitionProgressState) { + final String partitionProgressState, + final boolean isReadOnlyItem) { synchronized (this) { if (inMemoryPartitionAccessor.getItem(sourceIdentifier, partitionKey).isEmpty()) { final InMemorySourcePartitionStoreItem inMemorySourcePartitionStoreItem = new InMemorySourcePartitionStoreItem(); diff --git a/data-prepper-plugins/in-memory-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemorySourceCoordinationStoreTest.java b/data-prepper-plugins/in-memory-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemorySourceCoordinationStoreTest.java index ac1b19ad25..bbc6a88224 100644 --- a/data-prepper-plugins/in-memory-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemorySourceCoordinationStoreTest.java +++ b/data-prepper-plugins/in-memory-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemorySourceCoordinationStoreTest.java @@ -109,7 +109,7 @@ void tryCreatePartitionItem_for_item_that_exists_does_not_queuePartition_and_ret given(inMemoryPartitionAccessor.getItem(sourceIdentifier, partitionKey)).willReturn(Optional.of(mock(SourcePartitionStoreItem.class))); final InMemorySourceCoordinationStore objectUnderTest = createObjectUnderTest(); - final boolean created = objectUnderTest.tryCreatePartitionItem(sourceIdentifier, partitionKey, status, closedCount, partitionProgressState); + final boolean created = objectUnderTest.tryCreatePartitionItem(sourceIdentifier, partitionKey, status, closedCount, partitionProgressState, false); assertThat(created, equalTo(false)); verify(inMemoryPartitionAccessor, never()).queuePartition(any()); } @@ -129,7 +129,7 @@ void tryCreatePartitionItem_for_item_that_does_not_exist_queues_that_partition() doNothing().when(inMemoryPartitionAccessor).queuePartition(argumentCaptor.capture()); final InMemorySourceCoordinationStore objectUnderTest = createObjectUnderTest(); - final boolean created = objectUnderTest.tryCreatePartitionItem(sourceIdentifier, partitionKey, status, closedCount, partitionProgressState); + final boolean created = objectUnderTest.tryCreatePartitionItem(sourceIdentifier, partitionKey, status, closedCount, partitionProgressState, false); assertThat(created, equalTo(true)); final InMemorySourcePartitionStoreItem createdItem = argumentCaptor.getValue();