diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java index fd35167d02a58..3fc0432e7738e 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java @@ -17,17 +17,12 @@ package org.apache.kafka.server.log.remote.metadata.storage; import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.test.TestUtils; -import org.junit.jupiter.api.Assertions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.function.Function; import java.util.function.Supplier; @@ -37,10 +32,9 @@ import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; public class RemoteLogMetadataManagerTestUtils { - private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataManagerTestUtils.class); - private static final int METADATA_TOPIC_PARTITIONS_COUNT = 3; private static final short METADATA_TOPIC_REPLICATION_FACTOR = 2; private static final long METADATA_TOPIC_RETENTION_MS = 24 * 60 * 60 * 1000L; @@ -53,7 +47,6 @@ public static class Builder { private String bootstrapServers; private boolean startConsumerThread; private Map overrideRemoteLogMetadataManagerProps = Collections.emptyMap(); - private Set topicIdPartitions = Collections.emptySet(); private Supplier remotePartitionMetadataStore = RemotePartitionMetadataStore::new; private Function remoteLogMetadataTopicPartitioner = RemoteLogMetadataTopicPartitioner::new; @@ -85,11 +78,6 @@ public Builder overrideRemoteLogMetadataManagerProps(Map overrid return this; } - public Builder topicIdPartitions(Set topicIdPartitions) { - this.topicIdPartitions = Objects.requireNonNull(topicIdPartitions); - return this; - } - public TopicBasedRemoteLogMetadataManager build() { Objects.requireNonNull(bootstrapServers); String logDir = TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath(); @@ -105,19 +93,12 @@ public TopicBasedRemoteLogMetadataManager build() { configs.put(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, METADATA_TOPIC_PARTITIONS_COUNT); configs.put(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, METADATA_TOPIC_REPLICATION_FACTOR); configs.put(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, METADATA_TOPIC_RETENTION_MS); - - log.debug("TopicBasedRemoteLogMetadataManager configs before adding overridden properties: {}", configs); // Add override properties. configs.putAll(overrideRemoteLogMetadataManagerProps); - log.debug("TopicBasedRemoteLogMetadataManager configs after adding overridden properties: {}", configs); topicBasedRemoteLogMetadataManager.configure(configs); - - Assertions.assertDoesNotThrow(() -> TestUtils.waitForCondition(topicBasedRemoteLogMetadataManager::isInitialized, 60_000L, + assertDoesNotThrow(() -> TestUtils.waitForCondition(topicBasedRemoteLogMetadataManager::isInitialized, 60_000L, "Time out reached before it is initialized successfully")); - - topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(topicIdPartitions, Collections.emptySet()); - return topicBasedRemoteLogMetadataManager; } } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleManager.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleManager.java deleted file mode 100644 index b887059515295..0000000000000 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleManager.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.server.log.remote.metadata.storage; - -import org.apache.kafka.common.TopicIdPartition; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; -import org.apache.kafka.server.log.remote.storage.RemoteStorageException; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Iterator; -import java.util.Optional; - -/** - * This interface defines the lifecycle methods for {@code RemoteLogSegmentMetadata}. {@link RemoteLogSegmentLifecycleTest} tests - * different implementations of this interface. This is responsible for managing all the segments for a given {@code topicIdPartition} - * registered with {@link #initialize(TopicIdPartition)}. - * - * @see org.apache.kafka.server.log.remote.metadata.storage.RemoteLogSegmentLifecycleTest.RemoteLogMetadataCacheWrapper - * @see org.apache.kafka.server.log.remote.metadata.storage.RemoteLogSegmentLifecycleTest.TopicBasedRemoteLogMetadataManagerWrapper - */ -public interface RemoteLogSegmentLifecycleManager extends Closeable { - - /** - * Initialize the resources for this instance and register the given {@code topicIdPartition}. - * - * @param topicIdPartition topic partition to be registered with this instance. - */ - default void initialize(TopicIdPartition topicIdPartition) { - } - - @Override - default void close() throws IOException { - } - - void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata) throws RemoteStorageException; - - void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmentMetadataUpdate) throws RemoteStorageException; - - Optional highestOffsetForEpoch(int epoch) throws RemoteStorageException; - - Optional remoteLogSegmentMetadata(int leaderEpoch, - long offset) throws RemoteStorageException; - - Iterator listRemoteLogSegments(int leaderEpoch) throws RemoteStorageException; - - Iterator listAllRemoteLogSegments() throws RemoteStorageException; -} diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java index 95fbed72b17cd..7dbf924653e9e 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java @@ -16,116 +16,132 @@ */ package org.apache.kafka.server.log.remote.metadata.storage; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.junit.ClusterTestExtensions; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; -import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException; import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.storage.internals.log.EpochEntry; import org.apache.kafka.test.TestUtils; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.extension.ExtendWith; + import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; -import java.util.Set; -import java.util.stream.Stream; +import java.util.concurrent.ExecutionException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.DELETE_SEGMENT_STARTED; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.DELETE_SEGMENT_FINISHED; +@ClusterTestDefaults(brokers = 3) +@ExtendWith(value = ClusterTestExtensions.class) +@Tag("integration") public class RemoteLogSegmentLifecycleTest { - private static final Logger log = LoggerFactory.getLogger(RemoteLogSegmentLifecycleTest.class); - private static final int SEG_SIZE = 1024 * 1024; - private static final int BROKER_ID_0 = 0; - private static final int BROKER_ID_1 = 1; + private final int segSize = 1048576; + private final int brokerId0 = 0; + private final int brokerId1 = 1; + private final Uuid topicId = Uuid.randomUuid(); + private final TopicPartition tp = new TopicPartition("foo", 0); + private final TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, tp); + private final Time time = new SystemTime(); + private final RemotePartitionMetadataStore spyRemotePartitionMetadataStore = spy(new RemotePartitionMetadataStore()); + private final ClusterInstance clusterInstance; + + RemoteLogSegmentLifecycleTest(ClusterInstance clusterInstance) { // Constructor injections + this.clusterInstance = clusterInstance; + } - private final TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); - private final Time time = new MockTime(1); + private RemoteLogMetadataManager createTopicBasedRemoteLogMetadataManager() { + return RemoteLogMetadataManagerTestUtils.builder() + .bootstrapServers(clusterInstance.bootstrapServers()) + .startConsumerThread(true) + .remotePartitionMetadataStore(() -> spyRemotePartitionMetadataStore) + .build(); + } - @ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}") - @MethodSource("remoteLogSegmentLifecycleManagers") - public void testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager remoteLogSegmentLifecycleManager) throws Exception { - try { - remoteLogSegmentLifecycleManager.initialize(topicIdPartition); + @ClusterTest + public void testRemoteLogSegmentLifeCycle() throws Exception { + try (RemoteLogMetadataManager metadataManager = createTopicBasedRemoteLogMetadataManager()) { + metadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet()); // segment 0 // offsets: [0-100] // leader epochs (0,0), (1,20), (2,80) - Map segment0LeaderEpochs = new HashMap<>(); - segment0LeaderEpochs.put(0, 0L); - segment0LeaderEpochs.put(1, 20L); - segment0LeaderEpochs.put(2, 80L); - RemoteLogSegmentId segment0Id = new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()); - RemoteLogSegmentMetadata segment0Metadata = new RemoteLogSegmentMetadata(segment0Id, 0L, 100L, - -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, - segment0LeaderEpochs); - remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segment0Metadata); + Map leaderEpochSegment0 = new HashMap<>(); + leaderEpochSegment0.put(0, 0L); + leaderEpochSegment0.put(1, 20L); + leaderEpochSegment0.put(2, 80L); + RemoteLogSegmentId segmentId0 = new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()); + RemoteLogSegmentMetadata metadataSegment0 = new RemoteLogSegmentMetadata(segmentId0, 0L, + 100L, -1L, brokerId0, time.milliseconds(), segSize, leaderEpochSegment0); + metadataManager.addRemoteLogSegmentMetadata(metadataSegment0).get(); + verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(metadataSegment0); // We should not get this as the segment is still getting copied and it is not yet considered successful until - // it reaches RemoteLogSegmentState.COPY_SEGMENT_FINISHED. - Assertions.assertFalse(remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(40, 1).isPresent()); + // it reaches COPY_SEGMENT_FINISHED. + assertFalse(metadataManager.remoteLogSegmentMetadata(topicIdPartition, 1, 40).isPresent()); // Check that these leader epochs are not to be considered for highestOffsetForEpoch API as they are still getting copied. - Stream.of(0, 1, 2).forEach(epoch -> { - try { - Assertions.assertFalse(remoteLogSegmentLifecycleManager.highestOffsetForEpoch(epoch).isPresent()); - } catch (RemoteStorageException e) { - Assertions.fail(e); - } - }); + for (int leaderEpoch = 0; leaderEpoch <= 2; leaderEpoch++) { + assertFalse(metadataManager.highestOffsetForEpoch(topicIdPartition, leaderEpoch).isPresent()); + } - RemoteLogSegmentMetadataUpdate segment0Update = new RemoteLogSegmentMetadataUpdate( - segment0Id, time.milliseconds(), Optional.empty(), - RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1); - remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segment0Update); - RemoteLogSegmentMetadata expectedSegment0Metadata = segment0Metadata.createWithUpdates(segment0Update); + RemoteLogSegmentMetadataUpdate metadataUpdateSegment0 = new RemoteLogSegmentMetadataUpdate( + segmentId0, time.milliseconds(), Optional.empty(), + COPY_SEGMENT_FINISHED, brokerId1); + metadataManager.updateRemoteLogSegmentMetadata(metadataUpdateSegment0).get(); + verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadataUpdate(metadataUpdateSegment0); + metadataSegment0 = metadataSegment0.createWithUpdates(metadataUpdateSegment0); // segment 1 // offsets: [101 - 200] // no changes in leadership with in this segment // leader epochs (2, 101) - Map segment1LeaderEpochs = Collections.singletonMap(2, 101L); - RemoteLogSegmentMetadata segment1Metadata = createSegmentUpdateWithState(remoteLogSegmentLifecycleManager, segment1LeaderEpochs, 101L, - 200L, - RemoteLogSegmentState.COPY_SEGMENT_FINISHED); + Map leaderEpochSegment1 = Collections.singletonMap(2, 101L); + RemoteLogSegmentMetadata metadataSegment1 = upsertSegmentState(metadataManager, leaderEpochSegment1, + 101L, 200L, COPY_SEGMENT_FINISHED); // segment 2 // offsets: [201 - 300] // moved to epoch 3 in between // leader epochs (2, 201), (3, 240) - Map segment2LeaderEpochs = new HashMap<>(); - segment2LeaderEpochs.put(2, 201L); - segment2LeaderEpochs.put(3, 240L); - RemoteLogSegmentMetadata segment2Metadata = createSegmentUpdateWithState(remoteLogSegmentLifecycleManager, segment2LeaderEpochs, 201L, - 300L, - RemoteLogSegmentState.COPY_SEGMENT_FINISHED); + Map leaderEpochSegment2 = new HashMap<>(); + leaderEpochSegment2.put(2, 201L); + leaderEpochSegment2.put(3, 240L); + RemoteLogSegmentMetadata metadataSegment2 = upsertSegmentState(metadataManager, leaderEpochSegment2, + 201L, 300L, COPY_SEGMENT_FINISHED); // segment 3 // offsets: [250 - 400] // leader epochs (3, 250), (4, 370) - Map segment3LeaderEpochs = new HashMap<>(); - segment3LeaderEpochs.put(3, 250L); - segment3LeaderEpochs.put(4, 370L); - RemoteLogSegmentMetadata segment3Metadata = createSegmentUpdateWithState(remoteLogSegmentLifecycleManager, segment3LeaderEpochs, 250L, - 400L, - RemoteLogSegmentState.COPY_SEGMENT_FINISHED); + Map leaderEpochSegment3 = new HashMap<>(); + leaderEpochSegment3.put(3, 250L); + leaderEpochSegment3.put(4, 370L); + RemoteLogSegmentMetadata metadataSegment3 = upsertSegmentState(metadataManager, leaderEpochSegment3, + 250L, 400L, COPY_SEGMENT_FINISHED); ////////////////////////////////////////////////////////////////////////////////////////// // Four segments are added with different boundaries and leader epochs. @@ -133,55 +149,49 @@ public void testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager remot // epochs and offsets ////////////////////////////////////////////////////////////////////////////////////////// - HashMap expectedEpochOffsetToSegmentMetadata = new HashMap<>(); + Map expectedEpochEntryToMetadata = new HashMap<>(); // Existing metadata entries. - expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(1, 40), expectedSegment0Metadata); - expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(2, 110), segment1Metadata); - expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(3, 240), segment2Metadata); - expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(3, 250), segment3Metadata); - expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(4, 375), segment3Metadata); + expectedEpochEntryToMetadata.put(new EpochEntry(1, 40), metadataSegment0); + expectedEpochEntryToMetadata.put(new EpochEntry(2, 110), metadataSegment1); + expectedEpochEntryToMetadata.put(new EpochEntry(3, 240), metadataSegment2); + expectedEpochEntryToMetadata.put(new EpochEntry(3, 250), metadataSegment3); + expectedEpochEntryToMetadata.put(new EpochEntry(4, 375), metadataSegment3); // Non existing metadata entries. // Search for offset 110, epoch 1, and it should not exist. - expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(1, 110), null); + expectedEpochEntryToMetadata.put(new EpochEntry(1, 110), null); // Search for non existing offset 401, epoch 4. - expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(4, 401), null); + expectedEpochEntryToMetadata.put(new EpochEntry(4, 401), null); // Search for non existing epoch 5. - expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(5, 301), null); + expectedEpochEntryToMetadata.put(new EpochEntry(5, 301), null); - for (Map.Entry entry : expectedEpochOffsetToSegmentMetadata.entrySet()) { - EpochOffset epochOffset = entry.getKey(); - Optional segmentMetadata = remoteLogSegmentLifecycleManager - .remoteLogSegmentMetadata(epochOffset.epoch, epochOffset.offset); + for (Map.Entry entry : expectedEpochEntryToMetadata.entrySet()) { + EpochEntry epochEntry = entry.getKey(); + Optional actualMetadataOpt = metadataManager + .remoteLogSegmentMetadata(topicIdPartition, epochEntry.epoch, epochEntry.startOffset); RemoteLogSegmentMetadata expectedSegmentMetadata = entry.getValue(); - log.debug("Searching for {} , result: {}, expected: {} ", epochOffset, segmentMetadata, - expectedSegmentMetadata); if (expectedSegmentMetadata != null) { - Assertions.assertEquals(Optional.of(expectedSegmentMetadata), segmentMetadata); + assertEquals(Optional.of(expectedSegmentMetadata), actualMetadataOpt); } else { - Assertions.assertFalse(segmentMetadata.isPresent()); + assertFalse(actualMetadataOpt.isPresent()); } } // Update segment with state as DELETE_SEGMENT_STARTED. // It should not be available when we search for that segment. - remoteLogSegmentLifecycleManager - .updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(), - time.milliseconds(), - Optional.empty(), - RemoteLogSegmentState.DELETE_SEGMENT_STARTED, - BROKER_ID_1)); - Assertions.assertFalse(remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 10).isPresent()); + RemoteLogSegmentMetadataUpdate metadataDeleteStartedSegment0 = + new RemoteLogSegmentMetadataUpdate(metadataSegment0.remoteLogSegmentId(), time.milliseconds(), + Optional.empty(), DELETE_SEGMENT_STARTED, brokerId1); + metadataManager.updateRemoteLogSegmentMetadata(metadataDeleteStartedSegment0).get(); + assertFalse(metadataManager.remoteLogSegmentMetadata(topicIdPartition, 0, 10).isPresent()); // Update segment with state as DELETE_SEGMENT_FINISHED. // It should not be available when we search for that segment. - remoteLogSegmentLifecycleManager - .updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(), - time.milliseconds(), - Optional.empty(), - RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, - BROKER_ID_1)); - Assertions.assertFalse(remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 10).isPresent()); + RemoteLogSegmentMetadataUpdate metadataDeleteFinishedSegment0 = + new RemoteLogSegmentMetadataUpdate(metadataSegment0.remoteLogSegmentId(), time.milliseconds(), + Optional.empty(), DELETE_SEGMENT_FINISHED, brokerId1); + metadataManager.updateRemoteLogSegmentMetadata(metadataDeleteFinishedSegment0).get(); + assertFalse(metadataManager.remoteLogSegmentMetadata(topicIdPartition, 0, 10).isPresent()); ////////////////////////////////////////////////////////////////////////////////////////// // Search for cache.highestLogOffset(leaderEpoch) for all the leader epochs @@ -197,333 +207,159 @@ public void testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager remot for (Map.Entry entry : expectedEpochToHighestOffset.entrySet()) { Integer epoch = entry.getKey(); Long expectedOffset = entry.getValue(); - Optional offset = remoteLogSegmentLifecycleManager.highestOffsetForEpoch(epoch); - log.debug("Fetching highest offset for epoch: {} , returned: {} , expected: {}", epoch, offset, expectedOffset); - Assertions.assertEquals(Optional.of(expectedOffset), offset); + Optional offset = metadataManager.highestOffsetForEpoch(topicIdPartition, epoch); + assertEquals(Optional.of(expectedOffset), offset); } // Search for non existing leader epoch - Optional highestOffsetForEpoch5 = remoteLogSegmentLifecycleManager.highestOffsetForEpoch(5); - Assertions.assertFalse(highestOffsetForEpoch5.isPresent()); - } finally { - Utils.closeQuietly(remoteLogSegmentLifecycleManager, "RemoteLogSegmentLifecycleManager"); + Optional highestOffsetForEpoch5 = metadataManager.highestOffsetForEpoch(topicIdPartition, 5); + assertFalse(highestOffsetForEpoch5.isPresent()); } } - private RemoteLogSegmentMetadata createSegmentUpdateWithState(RemoteLogSegmentLifecycleManager remoteLogSegmentLifecycleManager, - Map segmentLeaderEpochs, - long startOffset, - long endOffset, - RemoteLogSegmentState state) - throws RemoteStorageException { + private RemoteLogSegmentMetadata upsertSegmentState(RemoteLogMetadataManager metadataManager, + Map segmentLeaderEpochs, + long startOffset, + long endOffset, + RemoteLogSegmentState state) + throws RemoteStorageException, ExecutionException, InterruptedException { RemoteLogSegmentId segmentId = new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()); - RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, -1L, BROKER_ID_0, - time.milliseconds(), SEG_SIZE, segmentLeaderEpochs); - remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segmentMetadata); - - RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(), - Optional.empty(), - state, BROKER_ID_1); - remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segMetadataUpdate); - + RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, + -1L, brokerId0, time.milliseconds(), segSize, segmentLeaderEpochs); + metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get(); + verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(segmentMetadata); + + RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, + time.milliseconds(), Optional.empty(), state, brokerId1); + metadataManager.updateRemoteLogSegmentMetadata(segMetadataUpdate).get(); + verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadataUpdate(segMetadataUpdate); return segmentMetadata.createWithUpdates(segMetadataUpdate); } - private static class EpochOffset { - final int epoch; - final long offset; - - private EpochOffset(int epoch, - long offset) { - this.epoch = epoch; - this.offset = offset; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - EpochOffset that = (EpochOffset) o; - return epoch == that.epoch && offset == that.offset; - } - - @Override - public int hashCode() { - return Objects.hash(epoch, offset); - } - - @Override - public String toString() { - return "EpochOffset{" + - "epoch=" + epoch + - ", offset=" + offset + - '}'; - } - } - - private static Collection remoteLogSegmentLifecycleManagers() { - return Arrays.asList(Arguments.of(new RemoteLogMetadataCacheWrapper()), - Arguments.of(new TopicBasedRemoteLogMetadataManagerWrapper())); - } - - private void checkListSegments(RemoteLogSegmentLifecycleManager remoteLogSegmentLifecycleManager, + private void checkListSegments(RemoteLogMetadataManager metadataManager, int leaderEpoch, - RemoteLogSegmentMetadata expectedSegment) + RemoteLogSegmentMetadata expectedMetadata) throws RemoteStorageException { // cache.listRemoteLogSegments(leaderEpoch) should contain the above segment. - Iterator segmentsIter = remoteLogSegmentLifecycleManager.listRemoteLogSegments(leaderEpoch); - Assertions.assertTrue(segmentsIter.hasNext()); - Assertions.assertEquals(expectedSegment, segmentsIter.next()); + Iterator metadataIter = + metadataManager.listRemoteLogSegments(topicIdPartition, leaderEpoch); + assertTrue(metadataIter.hasNext()); + assertEquals(expectedMetadata, metadataIter.next()); // cache.listAllRemoteLogSegments() should contain the above segment. - Iterator allSegmentsIter = remoteLogSegmentLifecycleManager.listAllRemoteLogSegments(); - Assertions.assertTrue(allSegmentsIter.hasNext()); - Assertions.assertEquals(expectedSegment, allSegmentsIter.next()); + Iterator allMetadataIter = metadataManager.listRemoteLogSegments(topicIdPartition); + assertTrue(allMetadataIter.hasNext()); + assertEquals(expectedMetadata, allMetadataIter.next()); } - @ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}") - @MethodSource("remoteLogSegmentLifecycleManagers") - public void testCacheSegmentWithCopySegmentStartedState(RemoteLogSegmentLifecycleManager remoteLogSegmentLifecycleManager) throws Exception { - - try { - remoteLogSegmentLifecycleManager.initialize(topicIdPartition); - + @ClusterTest + public void testCacheSegmentWithCopySegmentStartedState() throws Exception { + try (RemoteLogMetadataManager metadataManager = createTopicBasedRemoteLogMetadataManager()) { + metadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet()); // Create a segment with state COPY_SEGMENT_STARTED, and check for searching that segment and listing the // segments. RemoteLogSegmentId segmentId = new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()); - RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 0L, 50L, -1L, BROKER_ID_0, - time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); - remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segmentMetadata); + RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 0L, 50L, + -1L, brokerId0, time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); + metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get(); + verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(segmentMetadata); // This segment should not be available as the state is not reached to COPY_SEGMENT_FINISHED. - Optional segMetadataForOffset0Epoch0 = remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 0); - Assertions.assertFalse(segMetadataForOffset0Epoch0.isPresent()); + Optional segMetadataForOffset0Epoch0 = + metadataManager.remoteLogSegmentMetadata(topicIdPartition, 0, 0); + assertFalse(segMetadataForOffset0Epoch0.isPresent()); // cache.listRemoteLogSegments APIs should contain the above segment. - checkListSegments(remoteLogSegmentLifecycleManager, 0, segmentMetadata); - } finally { - Utils.closeQuietly(remoteLogSegmentLifecycleManager, "RemoteLogSegmentLifecycleManager"); + checkListSegments(metadataManager, 0, segmentMetadata); } } - @ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}") - @MethodSource("remoteLogSegmentLifecycleManagers") - public void testCacheSegmentWithCopySegmentFinishedState(RemoteLogSegmentLifecycleManager remoteLogSegmentLifecycleManager) throws Exception { - try { - remoteLogSegmentLifecycleManager.initialize(topicIdPartition); - + @ClusterTest + public void testCacheSegmentWithCopySegmentFinishedState() throws Exception { + try (RemoteLogMetadataManager metadataManager = createTopicBasedRemoteLogMetadataManager()) { + metadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet()); // Create a segment and move it to state COPY_SEGMENT_FINISHED. and check for searching that segment and // listing the segments. - RemoteLogSegmentMetadata segmentMetadata = createSegmentUpdateWithState(remoteLogSegmentLifecycleManager, - Collections.singletonMap(0, 101L), - 101L, 200L, RemoteLogSegmentState.COPY_SEGMENT_FINISHED); + RemoteLogSegmentMetadata segmentMetadata = upsertSegmentState( + metadataManager, Collections.singletonMap(0, 101L), 101L, 200L, COPY_SEGMENT_FINISHED); // Search should return the above segment. - Optional segMetadataForOffset150 = remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 150); - Assertions.assertEquals(Optional.of(segmentMetadata), segMetadataForOffset150); + Optional segMetadataForOffset150 = + metadataManager.remoteLogSegmentMetadata(topicIdPartition, 0, 150); + assertEquals(Optional.of(segmentMetadata), segMetadataForOffset150); // cache.listRemoteLogSegments should contain the above segments. - checkListSegments(remoteLogSegmentLifecycleManager, 0, segmentMetadata); - } finally { - Utils.closeQuietly(remoteLogSegmentLifecycleManager, "RemoteLogSegmentLifecycleManager"); + checkListSegments(metadataManager, 0, segmentMetadata); } } - @ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}") - @MethodSource("remoteLogSegmentLifecycleManagers") - public void testCacheSegmentWithDeleteSegmentStartedState(RemoteLogSegmentLifecycleManager remoteLogSegmentLifecycleManager) throws Exception { - try { - remoteLogSegmentLifecycleManager.initialize(topicIdPartition); - + @ClusterTest + public void testCacheSegmentWithDeleteSegmentStartedState() throws Exception { + try (RemoteLogMetadataManager metadataManager = createTopicBasedRemoteLogMetadataManager()) { + metadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet()); // Create a segment and move it to state DELETE_SEGMENT_STARTED, and check for searching that segment and // listing the segments. - RemoteLogSegmentMetadata segmentMetadata = createSegmentUpdateWithState(remoteLogSegmentLifecycleManager, - Collections.singletonMap(0, 201L), - 201L, 300L, RemoteLogSegmentState.DELETE_SEGMENT_STARTED); + RemoteLogSegmentMetadata segmentMetadata = upsertSegmentState( + metadataManager, Collections.singletonMap(0, 201L), 201L, 300L, DELETE_SEGMENT_STARTED); // Search should not return the above segment as their leader epoch state is cleared. - Optional segmentMetadataForOffset250Epoch0 = remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 250); - Assertions.assertFalse(segmentMetadataForOffset250Epoch0.isPresent()); - - checkListSegments(remoteLogSegmentLifecycleManager, 0, segmentMetadata); - } finally { - Utils.closeQuietly(remoteLogSegmentLifecycleManager, "RemoteLogSegmentLifecycleManager"); + Optional segmentMetadataForOffset250Epoch0 = + metadataManager.remoteLogSegmentMetadata(topicIdPartition, 0, 250); + assertFalse(segmentMetadataForOffset250Epoch0.isPresent()); + checkListSegments(metadataManager, 0, segmentMetadata); } } - @ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}") - @MethodSource("remoteLogSegmentLifecycleManagers") - public void testCacheSegmentsWithDeleteSegmentFinishedState(RemoteLogSegmentLifecycleManager remoteLogSegmentLifecycleManager) throws Exception { - try { - remoteLogSegmentLifecycleManager.initialize(topicIdPartition); - + @ClusterTest + public void testCacheSegmentsWithDeleteSegmentFinishedState() throws Exception { + try (RemoteLogMetadataManager metadataManager = createTopicBasedRemoteLogMetadataManager()) { + metadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet()); // Create a segment and move it to state DELETE_SEGMENT_FINISHED, and check for searching that segment and // listing the segments. - RemoteLogSegmentMetadata segmentMetadata = createSegmentUpdateWithState(remoteLogSegmentLifecycleManager, - Collections.singletonMap(0, 301L), - 301L, 400L, RemoteLogSegmentState.DELETE_SEGMENT_STARTED); + RemoteLogSegmentMetadata segmentMetadata = upsertSegmentState( + metadataManager, Collections.singletonMap(0, 301L), 301L, 400L, DELETE_SEGMENT_STARTED); // Search should not return the above segment as their leader epoch state is cleared. - Assertions.assertFalse(remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 350).isPresent()); + assertFalse(metadataManager.remoteLogSegmentMetadata(topicIdPartition, 0, 350).isPresent()); - RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), - time.milliseconds(), - Optional.empty(), - RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, - BROKER_ID_1); - remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate); + RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate( + segmentMetadata.remoteLogSegmentId(), time.milliseconds(), Optional.empty(), + DELETE_SEGMENT_FINISHED, brokerId1); + metadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get(); + verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadataUpdate(segmentMetadataUpdate); // listRemoteLogSegments(0) and listRemoteLogSegments() should not contain the above segment. - Assertions.assertFalse(remoteLogSegmentLifecycleManager.listRemoteLogSegments(0).hasNext()); - Assertions.assertFalse(remoteLogSegmentLifecycleManager.listAllRemoteLogSegments().hasNext()); - } finally { - Utils.closeQuietly(remoteLogSegmentLifecycleManager, "RemoteLogSegmentLifecycleManager"); + assertFalse(metadataManager.listRemoteLogSegments(topicIdPartition, 0).hasNext()); + assertFalse(metadataManager.listRemoteLogSegments(topicIdPartition).hasNext()); } } - @ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}") - @MethodSource("remoteLogSegmentLifecycleManagers") - public void testCacheListSegments(RemoteLogSegmentLifecycleManager remoteLogSegmentLifecycleManager) throws Exception { - try { - remoteLogSegmentLifecycleManager.initialize(topicIdPartition); - + @ClusterTest + public void testCacheListSegments() throws Exception { + try (RemoteLogMetadataManager metadataManager = createTopicBasedRemoteLogMetadataManager()) { + metadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet()); // Create a few segments and add them to the cache. - RemoteLogSegmentMetadata segment0 = createSegmentUpdateWithState(remoteLogSegmentLifecycleManager, Collections.singletonMap(0, 0L), 0, - 100, - RemoteLogSegmentState.COPY_SEGMENT_FINISHED); - RemoteLogSegmentMetadata segment1 = createSegmentUpdateWithState(remoteLogSegmentLifecycleManager, Collections.singletonMap(0, 101L), 101, - 200, - RemoteLogSegmentState.COPY_SEGMENT_FINISHED); - Map segment2LeaderEpochs = new HashMap<>(); - segment2LeaderEpochs.put(0, 201L); - segment2LeaderEpochs.put(1, 301L); - RemoteLogSegmentMetadata segment2 = createSegmentUpdateWithState(remoteLogSegmentLifecycleManager, segment2LeaderEpochs, 201, 400, - RemoteLogSegmentState.COPY_SEGMENT_FINISHED); + RemoteLogSegmentMetadata segment0 = upsertSegmentState(metadataManager, Collections.singletonMap(0, 0L), + 0, 100, COPY_SEGMENT_FINISHED); + RemoteLogSegmentMetadata segment1 = upsertSegmentState(metadataManager, Collections.singletonMap(0, 101L), + 101, 200, COPY_SEGMENT_FINISHED); + Map leaderEpochSegment2 = new HashMap<>(); + leaderEpochSegment2.put(0, 201L); + leaderEpochSegment2.put(1, 301L); + RemoteLogSegmentMetadata segment2 = upsertSegmentState(metadataManager, leaderEpochSegment2, + 201, 400, COPY_SEGMENT_FINISHED); // listRemoteLogSegments(0) and listAllRemoteLogSegments() should contain all the above segments. List expectedSegmentsForEpoch0 = Arrays.asList(segment0, segment1, segment2); - Assertions.assertTrue(TestUtils.sameElementsWithOrder(remoteLogSegmentLifecycleManager.listRemoteLogSegments(0), - expectedSegmentsForEpoch0.iterator())); - Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(remoteLogSegmentLifecycleManager.listAllRemoteLogSegments(), - expectedSegmentsForEpoch0.iterator())); + assertTrue(TestUtils.sameElementsWithOrder( + expectedSegmentsForEpoch0.iterator(), metadataManager.listRemoteLogSegments(topicIdPartition, 0))); + assertTrue(TestUtils.sameElementsWithoutOrder( + expectedSegmentsForEpoch0.iterator(), metadataManager.listRemoteLogSegments(topicIdPartition))); // listRemoteLogSegments(1) should contain only segment2. List expectedSegmentsForEpoch1 = Collections.singletonList(segment2); - Assertions.assertTrue(TestUtils.sameElementsWithOrder(remoteLogSegmentLifecycleManager.listRemoteLogSegments(1), - expectedSegmentsForEpoch1.iterator())); - } finally { - Utils.closeQuietly(remoteLogSegmentLifecycleManager, "RemoteLogSegmentLifecycleManager"); - } - } - - /** - * This is a wrapper with {@link TopicBasedRemoteLogMetadataManager} implementing {@link RemoteLogSegmentLifecycleManager}. - * This is passed to {@link #testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager)} to test - * {@code RemoteLogMetadataCache} for several lifecycle operations. - *

- * This starts a Kafka cluster with {@link #initialize(Set, boolean)} )} with {@link #brokerCount()} no of servers. It also - * creates the remote log metadata topic required for {@code TopicBasedRemoteLogMetadataManager}. This cluster will - * be stopped by invoking {@link #close()}. - */ - static class TopicBasedRemoteLogMetadataManagerWrapper extends TopicBasedRemoteLogMetadataManagerHarness implements RemoteLogSegmentLifecycleManager { - - private TopicIdPartition topicIdPartition; - - @Override - public synchronized void initialize(TopicIdPartition topicIdPartition) { - this.topicIdPartition = topicIdPartition; - super.initialize(Collections.singleton(topicIdPartition), true); - } - - @Override - public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata) throws RemoteStorageException { - try { - // Wait until the segment is added successfully. - remoteLogMetadataManager().addRemoteLogSegmentMetadata(segmentMetadata).get(); - } catch (Exception e) { - throw new RemoteStorageException(e); - } - } - - @Override - public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmentMetadataUpdate) throws RemoteStorageException { - try { - // Wait until the segment is updated successfully. - remoteLogMetadataManager().updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get(); - } catch (Exception e) { - throw new RemoteStorageException(e); - } - } - - @Override - public Optional highestOffsetForEpoch(int leaderEpoch) throws RemoteStorageException { - return remoteLogMetadataManager().highestOffsetForEpoch(topicIdPartition, leaderEpoch); - } - - @Override - public Optional remoteLogSegmentMetadata(int leaderEpoch, - long offset) throws RemoteStorageException { - return remoteLogMetadataManager().remoteLogSegmentMetadata(topicIdPartition, leaderEpoch, offset); - } - - @Override - public Iterator listRemoteLogSegments(int leaderEpoch) throws RemoteStorageException { - return remoteLogMetadataManager().listRemoteLogSegments(topicIdPartition, leaderEpoch); - } - - @Override - public Iterator listAllRemoteLogSegments() throws RemoteStorageException { - return remoteLogMetadataManager().listRemoteLogSegments(topicIdPartition); - } - - @Override - public void close() throws IOException { - super.close(); - } - - } - - /** - * This is a wrapper with {@link RemoteLogMetadataCache} implementing {@link RemoteLogSegmentLifecycleManager}. - * This is passed to {@link #testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager)} to test - * {@code RemoteLogMetadataCache} for several lifecycle operations. - */ - static class RemoteLogMetadataCacheWrapper implements RemoteLogSegmentLifecycleManager { - - private final RemoteLogMetadataCache metadataCache = new RemoteLogMetadataCache(); - - @Override - public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmentMetadataUpdate) throws RemoteStorageException { - metadataCache.updateRemoteLogSegmentMetadata(segmentMetadataUpdate); - } - - @Override - public Optional highestOffsetForEpoch(int epoch) { - return metadataCache.highestOffsetForEpoch(epoch); - } - - @Override - public Optional remoteLogSegmentMetadata(int leaderEpoch, long offset) { - return metadataCache.remoteLogSegmentMetadata(leaderEpoch, offset); - } - - @Override - public Iterator listRemoteLogSegments(int leaderEpoch) throws RemoteResourceNotFoundException { - return metadataCache.listRemoteLogSegments(leaderEpoch); - } - - @Override - public Iterator listAllRemoteLogSegments() { - return metadataCache.listAllRemoteLogSegments(); - } - - @Override - public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata) { - metadataCache.addCopyInProgressSegment(segmentMetadata); + assertTrue(TestUtils.sameElementsWithOrder( + expectedSegmentsForEpoch1.iterator(), metadataManager.listRemoteLogSegments(topicIdPartition, 1))); } } } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java deleted file mode 100644 index 7af78e750a84f..0000000000000 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.server.log.remote.metadata.storage; - -import kafka.api.IntegrationTestHarness; -import kafka.utils.EmptyTestInfo; -import org.apache.kafka.common.TopicIdPartition; -import org.apache.kafka.common.utils.Utils; - -import java.io.IOException; -import java.util.Collections; -import java.util.Map; -import java.util.Set; -import java.util.function.Function; -import java.util.function.Supplier; - -/** - * A test harness class that brings up 3 brokers and registers {@link TopicBasedRemoteLogMetadataManager} on broker with id as 0. - */ -public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHarness { - - private TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager; - - protected Map overrideRemoteLogMetadataManagerProps() { - return Collections.emptyMap(); - } - - public void initialize(Set topicIdPartitions, - boolean startConsumerThread) { - initialize(topicIdPartitions, startConsumerThread, RemotePartitionMetadataStore::new); - } - - public void initialize(Set topicIdPartitions, - boolean startConsumerThread, - Supplier remotePartitionMetadataStoreSupplier) { - // Call setup to start the cluster. - super.setUp(new EmptyTestInfo()); - - initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, RemoteLogMetadataTopicPartitioner::new, remotePartitionMetadataStoreSupplier); - } - - public void initializeRemoteLogMetadataManager(Set topicIdPartitions, - boolean startConsumerThread, - Function remoteLogMetadataTopicPartitioner, - Supplier remotePartitionMetadataStoreSupplier) { - - topicBasedRemoteLogMetadataManager = RemoteLogMetadataManagerTestUtils.builder() - .topicIdPartitions(topicIdPartitions) - .bootstrapServers(bootstrapServers(listenerName())) - .startConsumerThread(startConsumerThread) - .remoteLogMetadataTopicPartitioner(remoteLogMetadataTopicPartitioner) - .remotePartitionMetadataStore(remotePartitionMetadataStoreSupplier) - .overrideRemoteLogMetadataManagerProps(overrideRemoteLogMetadataManagerProps()) - .build(); - } - - @Override - public int brokerCount() { - return 3; - } - - protected TopicBasedRemoteLogMetadataManager remoteLogMetadataManager() { - return topicBasedRemoteLogMetadataManager; - } - - public void close() throws IOException { - closeRemoteLogMetadataManager(); - - // Stop the servers and zookeeper. - tearDown(); - } - - public void closeRemoteLogMetadataManager() { - Utils.closeQuietly(topicBasedRemoteLogMetadataManager, "TopicBasedRemoteLogMetadataManager"); - } -} diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java index 84b98dcb5be1d..dee686dc036d1 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java @@ -54,7 +54,6 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest { private TopicBasedRemoteLogMetadataManager createTopicBasedRemoteLogMetadataManager() { return RemoteLogMetadataManagerTestUtils.builder() - .topicIdPartitions(Collections.emptySet()) .bootstrapServers(clusterInstance.bootstrapServers()) .startConsumerThread(true) .remoteLogMetadataTopicPartitioner(RemoteLogMetadataTopicPartitioner::new) diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java deleted file mode 100644 index 70ce9190d81c2..0000000000000 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.server.log.remote.metadata.storage; - -import org.apache.kafka.common.TopicIdPartition; -import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; -import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; -import org.apache.kafka.server.log.remote.storage.RemoteStorageException; - -import java.io.IOException; -import java.util.Collections; -import java.util.Iterator; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CompletableFuture; - -public class TopicBasedRemoteLogMetadataManagerWrapperWithHarness implements RemoteLogMetadataManager { - - private final TopicBasedRemoteLogMetadataManagerHarness remoteLogMetadataManagerHarness = new TopicBasedRemoteLogMetadataManagerHarness(); - - @Override - public CompletableFuture addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException { - return remoteLogMetadataManagerHarness.remoteLogMetadataManager().addRemoteLogSegmentMetadata(remoteLogSegmentMetadata); - } - - @Override - public CompletableFuture updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) throws RemoteStorageException { - return remoteLogMetadataManagerHarness.remoteLogMetadataManager().updateRemoteLogSegmentMetadata(remoteLogSegmentMetadataUpdate); - } - - @Override - public Optional remoteLogSegmentMetadata(TopicIdPartition topicIdPartition, - int epochForOffset, - long offset) throws RemoteStorageException { - return remoteLogMetadataManagerHarness.remoteLogMetadataManager().remoteLogSegmentMetadata(topicIdPartition, epochForOffset, offset); - } - - @Override - public Optional highestOffsetForEpoch(TopicIdPartition topicIdPartition, - int leaderEpoch) throws RemoteStorageException { - return remoteLogMetadataManagerHarness.remoteLogMetadataManager().highestOffsetForEpoch(topicIdPartition, leaderEpoch); - } - - @Override - public CompletableFuture putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) throws RemoteStorageException { - return remoteLogMetadataManagerHarness.remoteLogMetadataManager().putRemotePartitionDeleteMetadata(remotePartitionDeleteMetadata); - } - - @Override - public Iterator listRemoteLogSegments(TopicIdPartition topicIdPartition) throws RemoteStorageException { - return remoteLogMetadataManagerHarness.remoteLogMetadataManager().listRemoteLogSegments(topicIdPartition); - } - - @Override - public Iterator listRemoteLogSegments(TopicIdPartition topicIdPartition, - int leaderEpoch) throws RemoteStorageException { - return remoteLogMetadataManagerHarness.remoteLogMetadataManager().listRemoteLogSegments(topicIdPartition, leaderEpoch); - } - - @Override - public void onPartitionLeadershipChanges(Set leaderPartitions, - Set followerPartitions) { - - remoteLogMetadataManagerHarness.remoteLogMetadataManager().onPartitionLeadershipChanges(leaderPartitions, followerPartitions); - } - - @Override - public void onStopPartitions(Set partitions) { - remoteLogMetadataManagerHarness.remoteLogMetadataManager().onStopPartitions(partitions); - } - - @Override - public long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException { - return remoteLogMetadataManagerHarness.remoteLogMetadataManager().remoteLogSize(topicIdPartition, leaderEpoch); - } - - @Override - public void close() throws IOException { - remoteLogMetadataManagerHarness.close(); - } - - @Override - public void configure(Map configs) { - // This will make sure the cluster is up and TopicBasedRemoteLogMetadataManager is initialized. - remoteLogMetadataManagerHarness.initialize(Collections.emptySet(), true); - remoteLogMetadataManagerHarness.remoteLogMetadataManager().configure(configs); - } -}