diff --git a/core/src/main/java/kafka/server/ReplicaAlterLogDirsTierStateMachine.java b/core/src/main/java/kafka/server/ReplicaAlterLogDirsTierStateMachine.java deleted file mode 100644 index 8561fae01990e..0000000000000 --- a/core/src/main/java/kafka/server/ReplicaAlterLogDirsTierStateMachine.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server; - -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.message.FetchResponseData.PartitionData; - -import java.util.Optional; - -/** - The replica alter log dirs tier state machine is unsupported but is provided to the ReplicaAlterLogDirsThread. - */ -public class ReplicaAlterLogDirsTierStateMachine implements TierStateMachine { - - public PartitionFetchState start(TopicPartition topicPartition, - PartitionFetchState currentFetchState, - PartitionData fetchPartitionData) throws Exception { - // JBOD is not supported with tiered storage. - throw new UnsupportedOperationException("Building remote log aux state is not supported in ReplicaAlterLogDirsThread."); - } - - public Optional maybeAdvanceState(TopicPartition topicPartition, - PartitionFetchState currentFetchState) { - return Optional.empty(); - } -} diff --git a/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java b/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java deleted file mode 100644 index 0462e12c05bca..0000000000000 --- a/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server; - -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.StandardCopyOption; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -import kafka.cluster.Partition; -import kafka.log.UnifiedLog; -import kafka.log.remote.RemoteLogManager; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.message.FetchResponseData.PartitionData; -import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; -import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.server.common.CheckpointFile; -import org.apache.kafka.server.common.OffsetAndEpoch; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; -import org.apache.kafka.server.log.remote.storage.RemoteStorageException; -import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; -import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; -import org.apache.kafka.storage.internals.log.EpochEntry; -import org.apache.kafka.storage.internals.log.LogFileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import scala.Option; -import scala.collection.JavaConverters; - -import static org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason.LeaderOffsetIncremented; - -/** - The replica fetcher tier state machine follows a state machine progression. - - Currently, the tier state machine follows a synchronous execution, and we only need to start the machine. - There is no need to advance the state. - - When started, the tier state machine will fetch the local log start offset of the - leader and then build the follower's remote log aux state until the leader's - local log start offset. - */ -public class ReplicaFetcherTierStateMachine implements TierStateMachine { - private static final Logger log = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class); - - private final LeaderEndPoint leader; - private final ReplicaManager replicaMgr; - - public ReplicaFetcherTierStateMachine(LeaderEndPoint leader, - ReplicaManager replicaMgr) { - this.leader = leader; - this.replicaMgr = replicaMgr; - } - - - /** - * Start the tier state machine for the provided topic partition. Currently, this start method will build the - * entire remote aux log state synchronously. - * - * @param topicPartition the topic partition - * @param currentFetchState the current PartitionFetchState which will - * be used to derive the return value - * @param fetchPartitionData the data from the fetch response that returned the offset moved to tiered storage error - * - * @return the new PartitionFetchState after the successful start of the - * tier state machine - */ - public PartitionFetchState start(TopicPartition topicPartition, - PartitionFetchState currentFetchState, - PartitionData fetchPartitionData) throws Exception { - - OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch()); - int epoch = epochAndLeaderLocalStartOffset.leaderEpoch(); - long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset(); - - long offsetToFetch = 0; - replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark(); - replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark(); - - try { - offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset()); - } catch (RemoteStorageException e) { - replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark(); - replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark(); - throw e; - } - - OffsetAndEpoch fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch()); - long leaderEndOffset = fetchLatestOffsetResult.offset(); - - long initialLag = leaderEndOffset - offsetToFetch; - - return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(), - Fetching$.MODULE$, replicaMgr.localLogOrException(topicPartition).latestEpoch()); - } - - /** - * This is currently a no-op but will be used for implementing async tiering logic in KAFKA-13560. - * - * @param topicPartition the topic partition - * @param currentFetchState the current PartitionFetchState which will - * be used to derive the return value - * - * @return the original PartitionFetchState - */ - public Optional maybeAdvanceState(TopicPartition topicPartition, - PartitionFetchState currentFetchState) { - // No-op for now - return Optional.of(currentFetchState); - } - - private EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch, - TopicPartition partition, - Integer currentLeaderEpoch) { - int previousEpoch = epoch - 1; - - // Find the end-offset for the epoch earlier to the given epoch from the leader - Map partitionsWithEpochs = new HashMap<>(); - partitionsWithEpochs.put(partition, new OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch)); - Option maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.mapAsScalaMap(partitionsWithEpochs)).get(partition); - if (maybeEpochEndOffset.isEmpty()) { - throw new KafkaException("No response received for partition: " + partition); - } - - EpochEndOffset epochEndOffset = maybeEpochEndOffset.get(); - if (epochEndOffset.errorCode() != Errors.NONE.code()) { - throw Errors.forCode(epochEndOffset.errorCode()).exception(); - } - - return epochEndOffset; - } - - private List readLeaderEpochCheckpoint(RemoteLogManager rlm, - RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException { - InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH); - try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { - CheckpointFile.CheckpointReadBuffer readBuffer = new CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, LeaderEpochCheckpointFile.FORMATTER); - return readBuffer.read(); - } - } - - private void buildProducerSnapshotFile(File snapshotFile, - RemoteLogSegmentMetadata remoteLogSegmentMetadata, - RemoteLogManager rlm) throws IOException, RemoteStorageException { - File tmpSnapshotFile = new File(snapshotFile.getAbsolutePath() + ".tmp"); - // Copy it to snapshot file in atomic manner. - Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT), - tmpSnapshotFile.toPath(), StandardCopyOption.REPLACE_EXISTING); - Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath(), snapshotFile.toPath(), false); - } - - /** - * It tries to build the required state for this partition from leader and remote storage so that it can start - * fetching records from the leader. The return value is the next offset to fetch from the leader, which is the - * next offset following the end offset of the remote log portion. - */ - private Long buildRemoteLogAuxState(TopicPartition topicPartition, - Integer currentLeaderEpoch, - Long leaderLocalLogStartOffset, - Integer epochForLeaderLocalLogStartOffset, - Long leaderLogStartOffset) throws IOException, RemoteStorageException { - - UnifiedLog unifiedLog = replicaMgr.localLogOrException(topicPartition); - - long nextOffset; - - if (unifiedLog.remoteStorageSystemEnable() && unifiedLog.config().remoteStorageEnable()) { - if (replicaMgr.remoteLogManager().isEmpty()) throw new IllegalStateException("RemoteLogManager is not yet instantiated"); - - RemoteLogManager rlm = replicaMgr.remoteLogManager().get(); - - // Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache - // until that offset - long previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1; - int targetEpoch; - // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1) - // will have the same epoch. - if (epochForLeaderLocalLogStartOffset == 0) { - targetEpoch = epochForLeaderLocalLogStartOffset; - } else { - // Fetch the earlier epoch/end-offset(exclusive) from the leader. - EpochEndOffset earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition, currentLeaderEpoch); - // Check if the target offset lies within the range of earlier epoch. Here, epoch's end-offset is exclusive. - if (earlierEpochEndOffset.endOffset() > previousOffsetToLeaderLocalLogStartOffset) { - // Always use the leader epoch from returned earlierEpochEndOffset. - // This gives the respective leader epoch, that will handle any gaps in epochs. - // For ex, leader epoch cache contains: - // leader-epoch start-offset - // 0 20 - // 1 85 - // <2> - gap no messages were appended in this leader epoch. - // 3 90 - // 4 98 - // There is a gap in leader epoch. For leaderLocalLogStartOffset as 90, leader-epoch is 3. - // fetchEarlierEpochEndOffset(2) will return leader-epoch as 1, end-offset as 90. - // So, for offset 89, we should return leader epoch as 1 like below. - targetEpoch = earlierEpochEndOffset.leaderEpoch(); - } else { - targetEpoch = epochForLeaderLocalLogStartOffset; - } - } - - Optional maybeRlsm = rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset); - - if (maybeRlsm.isPresent()) { - RemoteLogSegmentMetadata remoteLogSegmentMetadata = maybeRlsm.get(); - // Build leader epoch cache, producer snapshots until remoteLogSegmentMetadata.endOffset() and start - // segments from (remoteLogSegmentMetadata.endOffset() + 1) - // Assign nextOffset with the offset from which next fetch should happen. - nextOffset = remoteLogSegmentMetadata.endOffset() + 1; - - // Truncate the existing local log before restoring the leader epoch cache and producer snapshots. - Partition partition = replicaMgr.getPartitionOrException(topicPartition); - partition.truncateFullyAndStartAt(nextOffset, false, Option.apply(leaderLogStartOffset)); - - // Increment start offsets - unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented); - unifiedLog.maybeIncrementLocalLogStartOffset(nextOffset, LeaderOffsetIncremented); - - // Build leader epoch cache. - List epochs = readLeaderEpochCheckpoint(rlm, remoteLogSegmentMetadata); - if (unifiedLog.leaderEpochCache().isDefined()) { - unifiedLog.leaderEpochCache().get().assign(epochs); - } - - log.debug("Updated the epoch cache from remote tier till offset: {} with size: {} for {}", leaderLocalLogStartOffset, epochs.size(), partition); - - // Restore producer snapshot - File snapshotFile = LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset); - buildProducerSnapshotFile(snapshotFile, remoteLogSegmentMetadata, rlm); - - // Reload producer snapshots. - unifiedLog.producerStateManager().truncateFullyAndReloadSnapshots(); - unifiedLog.loadProducerState(nextOffset); - log.debug("Built the leader epoch cache and producer snapshots from remote tier for {}, " + - "with active producers size: {}, leaderLogStartOffset: {}, and logEndOffset: {}", - partition, unifiedLog.producerStateManager().activeProducers().size(), leaderLogStartOffset, nextOffset); - } else { - throw new RemoteStorageException("Couldn't build the state from remote store for partition: " + topicPartition + - ", currentLeaderEpoch: " + currentLeaderEpoch + - ", leaderLocalLogStartOffset: " + leaderLocalLogStartOffset + - ", leaderLogStartOffset: " + leaderLogStartOffset + - ", epoch: " + targetEpoch + - "as the previous remote log segment metadata was not found"); - } - } else { - // If the tiered storage is not enabled throw an exception back so that it will retry until the tiered storage - // is set as expected. - throw new RemoteStorageException("Couldn't build the state from remote store for partition " + topicPartition + ", as remote log storage is not yet enabled"); - } - - return nextOffset; - } -} diff --git a/core/src/main/java/kafka/server/TierStateMachine.java b/core/src/main/java/kafka/server/TierStateMachine.java index 58a44cc647232..085e6c025dd4c 100644 --- a/core/src/main/java/kafka/server/TierStateMachine.java +++ b/core/src/main/java/kafka/server/TierStateMachine.java @@ -17,15 +17,65 @@ package kafka.server; -import java.util.Optional; +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import kafka.cluster.Partition; +import kafka.log.UnifiedLog; +import kafka.log.remote.RemoteLogManager; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.message.FetchResponseData.PartitionData; +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.server.common.OffsetAndEpoch; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; +import org.apache.kafka.storage.internals.log.EpochEntry; +import org.apache.kafka.storage.internals.log.LogFileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.collection.JavaConverters; + +import static org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason.LeaderOffsetIncremented; /** - * This interface defines the APIs needed to handle any state transitions related to tiering + * This class defines the APIs and implementation needed to handle any state transitions related to tiering + * + * When started, the tier state machine will fetch the local log start offset of the + * leader and then build the follower's remote log aux state until the leader's + * local log start offset. */ -public interface TierStateMachine { +public class TierStateMachine { + private static final Logger log = LoggerFactory.getLogger(TierStateMachine.class); + + private final LeaderEndPoint leader; + private final ReplicaManager replicaMgr; + private final boolean useFutureLog; + public TierStateMachine(LeaderEndPoint leader, + ReplicaManager replicaMgr, + boolean useFutureLog) { + this.leader = leader; + this.replicaMgr = replicaMgr; + this.useFutureLog = useFutureLog; + } /** * Start the tier state machine for the provided topic partition. @@ -40,19 +90,176 @@ public interface TierStateMachine { */ PartitionFetchState start(TopicPartition topicPartition, PartitionFetchState currentFetchState, - PartitionData fetchPartitionData) throws Exception; + PartitionData fetchPartitionData) throws Exception { + OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch()); + int epoch = epochAndLeaderLocalStartOffset.leaderEpoch(); + long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset(); + + long offsetToFetch; + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark(); + + UnifiedLog unifiedLog; + if (useFutureLog) { + unifiedLog = replicaMgr.futureLogOrException(topicPartition); + } else { + unifiedLog = replicaMgr.localLogOrException(topicPartition); + } + + try { + offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset(), unifiedLog); + } catch (RemoteStorageException e) { + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark(); + throw e; + } + + OffsetAndEpoch fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch()); + long leaderEndOffset = fetchLatestOffsetResult.offset(); + + long initialLag = leaderEndOffset - offsetToFetch; + + return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(), + Fetching$.MODULE$, unifiedLog.latestEpoch()); + + } + + private OffsetForLeaderEpochResponseData.EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch, + TopicPartition partition, + Integer currentLeaderEpoch) { + int previousEpoch = epoch - 1; + + // Find the end-offset for the epoch earlier to the given epoch from the leader + Map partitionsWithEpochs = new HashMap<>(); + partitionsWithEpochs.put(partition, new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch)); + Option maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.mapAsScalaMap(partitionsWithEpochs)).get(partition); + if (maybeEpochEndOffset.isEmpty()) { + throw new KafkaException("No response received for partition: " + partition); + } + + OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = maybeEpochEndOffset.get(); + if (epochEndOffset.errorCode() != Errors.NONE.code()) { + throw Errors.forCode(epochEndOffset.errorCode()).exception(); + } + + return epochEndOffset; + } + + private List readLeaderEpochCheckpoint(RemoteLogManager rlm, + RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException { + InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH); + try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { + CheckpointFile.CheckpointReadBuffer readBuffer = new CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, LeaderEpochCheckpointFile.FORMATTER); + return readBuffer.read(); + } + } + + private void buildProducerSnapshotFile(UnifiedLog unifiedLog, + long nextOffset, + RemoteLogSegmentMetadata remoteLogSegmentMetadata, + RemoteLogManager rlm) throws IOException, RemoteStorageException { + // Restore producer snapshot + File snapshotFile = LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset); + Path tmpSnapshotFile = Paths.get(snapshotFile.getAbsolutePath() + ".tmp"); + // Copy it to snapshot file in atomic manner. + Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT), + tmpSnapshotFile, StandardCopyOption.REPLACE_EXISTING); + Utils.atomicMoveWithFallback(tmpSnapshotFile, snapshotFile.toPath(), false); + + // Reload producer snapshots. + unifiedLog.producerStateManager().truncateFullyAndReloadSnapshots(); + unifiedLog.loadProducerState(nextOffset); + } /** - * Optionally advance the state of the tier state machine, based on the - * current PartitionFetchState. The decision to advance the tier - * state machine is implementation specific. - * - * @param topicPartition the topic partition - * @param currentFetchState the current PartitionFetchState which will - * be used to derive the return value - * - * @return the new PartitionFetchState if the tier state machine was advanced, otherwise, return the currentFetchState + * It tries to build the required state for this partition from leader and remote storage so that it can start + * fetching records from the leader. The return value is the next offset to fetch from the leader, which is the + * next offset following the end offset of the remote log portion. */ - Optional maybeAdvanceState(TopicPartition topicPartition, - PartitionFetchState currentFetchState); + private Long buildRemoteLogAuxState(TopicPartition topicPartition, + Integer currentLeaderEpoch, + Long leaderLocalLogStartOffset, + Integer epochForLeaderLocalLogStartOffset, + Long leaderLogStartOffset, + UnifiedLog unifiedLog) throws IOException, RemoteStorageException { + + if (!unifiedLog.remoteStorageSystemEnable() || !unifiedLog.config().remoteStorageEnable()) { + // If the tiered storage is not enabled throw an exception back so that it will retry until the tiered storage + // is set as expected. + throw new RemoteStorageException("Couldn't build the state from remote store for partition " + topicPartition + ", as remote log storage is not yet enabled"); + } + + if (replicaMgr.remoteLogManager().isEmpty()) + throw new IllegalStateException("RemoteLogManager is not yet instantiated"); + + RemoteLogManager rlm = replicaMgr.remoteLogManager().get(); + + // Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache + // until that offset + long previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1; + int targetEpoch; + // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1) + // will have the same epoch. + if (epochForLeaderLocalLogStartOffset == 0) { + targetEpoch = epochForLeaderLocalLogStartOffset; + } else { + // Fetch the earlier epoch/end-offset(exclusive) from the leader. + OffsetForLeaderEpochResponseData.EpochEndOffset earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition, currentLeaderEpoch); + // Check if the target offset lies within the range of earlier epoch. Here, epoch's end-offset is exclusive. + if (earlierEpochEndOffset.endOffset() > previousOffsetToLeaderLocalLogStartOffset) { + // Always use the leader epoch from returned earlierEpochEndOffset. + // This gives the respective leader epoch, that will handle any gaps in epochs. + // For ex, leader epoch cache contains: + // leader-epoch start-offset + // 0 20 + // 1 85 + // <2> - gap no messages were appended in this leader epoch. + // 3 90 + // 4 98 + // There is a gap in leader epoch. For leaderLocalLogStartOffset as 90, leader-epoch is 3. + // fetchEarlierEpochEndOffset(2) will return leader-epoch as 1, end-offset as 90. + // So, for offset 89, we should return leader epoch as 1 like below. + targetEpoch = earlierEpochEndOffset.leaderEpoch(); + } else { + targetEpoch = epochForLeaderLocalLogStartOffset; + } + } + + RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset) + .orElseThrow(() -> new RemoteStorageException("Couldn't build the state from remote store for partition: " + topicPartition + + ", currentLeaderEpoch: " + currentLeaderEpoch + + ", leaderLocalLogStartOffset: " + leaderLocalLogStartOffset + + ", leaderLogStartOffset: " + leaderLogStartOffset + + ", epoch: " + targetEpoch + + "as the previous remote log segment metadata was not found")); + + + // Build leader epoch cache, producer snapshots until remoteLogSegmentMetadata.endOffset() and start + // segments from (remoteLogSegmentMetadata.endOffset() + 1) + // Assign nextOffset with the offset from which next fetch should happen. + long nextOffset = remoteLogSegmentMetadata.endOffset() + 1; + + // Truncate the existing local log before restoring the leader epoch cache and producer snapshots. + Partition partition = replicaMgr.getPartitionOrException(topicPartition); + partition.truncateFullyAndStartAt(nextOffset, useFutureLog, Option.apply(leaderLogStartOffset)); + // Increment start offsets + unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented); + unifiedLog.maybeIncrementLocalLogStartOffset(nextOffset, LeaderOffsetIncremented); + + // Build leader epoch cache. + List epochs = readLeaderEpochCheckpoint(rlm, remoteLogSegmentMetadata); + if (unifiedLog.leaderEpochCache().isDefined()) { + unifiedLog.leaderEpochCache().get().assign(epochs); + } + + log.info("Updated the epoch cache from remote tier till offset: {} with size: {} for {}", leaderLocalLogStartOffset, epochs.size(), partition); + + buildProducerSnapshotFile(unifiedLog, nextOffset, remoteLogSegmentMetadata, rlm); + + log.debug("Built the leader epoch cache and producer snapshots from remote tier for {}, " + + "with active producers size: {}, leaderLogStartOffset: {}, and logEndOffset: {}", + partition, unifiedLog.producerStateManager().activeProducers().size(), leaderLogStartOffset, nextOffset); + + return nextOffset; + } } diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 66964350569f1..5c2c06cabdf9a 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -34,7 +34,7 @@ import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache import org.apache.kafka.common.utils.{LogContext, Time} -import org.apache.kafka.common.{ClusterResource, KafkaException, TopicPartition, Uuid} +import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid} import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics} import org.apache.kafka.coordinator.group.{CoordinatorRecord, GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, CoordinatorRecordSerde} import org.apache.kafka.image.publisher.MetadataPublisher @@ -614,10 +614,6 @@ class BrokerServer( protected def createRemoteLogManager(): Option[RemoteLogManager] = { if (config.remoteLogManagerConfig.enableRemoteStorageSystem()) { - if (config.logDirs.size > 1) { - throw new KafkaException("Tiered storage is not supported with multiple log dirs.") - } - Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time, (tp: TopicPartition) => logManager.getLog(tp).asJava, (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 3687a7cee3cca..af5d0ae9a26c0 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1240,9 +1240,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0") require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, greater than or equal to 1") require(logDirs.nonEmpty, "At least one log directory must be defined via log.dirs or log.dir.") - if (isRemoteLogStorageSystemEnabled && logDirs.size > 1) { - throw new ConfigException(s"Multiple log directories `${logDirs.mkString(",")}` are not supported when remote log storage is enabled") - } require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.") require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" + " to prevent unnecessary socket timeouts") diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 4702b7e5a97a3..ce155e92ffd76 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -44,7 +44,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache import org.apache.kafka.common.security.{JaasContext, JaasUtils} import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils} -import org.apache.kafka.common.{Endpoint, KafkaException, Node, TopicPartition} +import org.apache.kafka.common.{Endpoint, Node, TopicPartition} import org.apache.kafka.coordinator.group.GroupCoordinator import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag @@ -690,10 +690,6 @@ class KafkaServer( protected def createRemoteLogManager(): Option[RemoteLogManager] = { if (config.remoteLogManagerConfig.enableRemoteStorageSystem()) { - if (config.logDirs.size > 1) { - throw new KafkaException("Tiered storage is not supported with multiple log dirs.") - } - Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time, (tp: TopicPartition) => logManager.getLog(tp).asJava, (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => { diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index 2ea09390b3dcb..95c7a5ac3d4b1 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -40,7 +40,7 @@ class ReplicaAlterLogDirsThread(name: String, clientId = name, leader = leader, failedPartitions, - fetchTierStateMachine = new ReplicaAlterLogDirsTierStateMachine(), + fetchTierStateMachine = new TierStateMachine(leader, replicaMgr, true), fetchBackOffMs = fetchBackOffMs, isInterruptible = false, brokerTopicStats) { diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index c45a5d629a69b..bb073682bdfb6 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -37,7 +37,7 @@ class ReplicaFetcherThread(name: String, clientId = name, leader = leader, failedPartitions, - fetchTierStateMachine = new ReplicaFetcherTierStateMachine(leader, replicaMgr), + fetchTierStateMachine = new TierStateMachine(leader, replicaMgr, false), fetchBackOffMs = brokerConfig.replicaFetchBackoffMs, isInterruptible = false, replicaMgr.brokerTopicStats) { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 595932b4291f0..aa56269a2f40d 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -703,6 +703,10 @@ class ReplicaManager(val config: KafkaConfig, getPartitionOrException(topicPartition).futureLog.isDefined } + def futureLogOrException(topicPartition: TopicPartition): UnifiedLog = { + getPartitionOrException(topicPartition).futureLocalLogOrException + } + def localLog(topicPartition: TopicPartition): Option[UnifiedLog] = { onlinePartition(topicPartition).flatMap(_.log) } @@ -1744,8 +1748,8 @@ class ReplicaManager(val config: KafkaConfig, val leaderLogStartOffset = log.logStartOffset val leaderLogEndOffset = log.logEndOffset - if (params.isFromFollower) { - // If it is from a follower then send the offset metadata only as the data is already available in remote + if (params.isFromFollower || params.isFromFuture) { + // If it is from a follower or from a future replica, then send the offset metadata only as the data is already available in remote // storage and throw an error saying that this offset is moved to tiered storage. createLogReadResult(highWatermark, leaderLogStartOffset, leaderLogEndOffset, new OffsetMovedToTieredStorageException("Given offset" + offset + " is moved to tiered storage")) diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala index 11b7ceb2df4b8..f9750618ae0ea 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala @@ -33,7 +33,6 @@ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{BeforeEach, Test} import org.mockito.Mockito.{mock, verify, when} -import java.util.Optional import scala.collection.{Map, Set, mutable} import scala.jdk.CollectionConverters._ @@ -313,12 +312,10 @@ class AbstractFetcherManagerTest { override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = new OffsetAndEpoch(1L, 0) } - private class MockResizeFetcherTierStateMachine extends TierStateMachine { + private class MockResizeFetcherTierStateMachine extends TierStateMachine(null, null, false) { override def start(topicPartition: TopicPartition, currentFetchState: PartitionFetchState, fetchPartitionData: PartitionData): PartitionFetchState = { throw new UnsupportedOperationException("Materializing tier state is not supported in this test.") } - - override def maybeAdvanceState(tp: TopicPartition, currentFetchState: PartitionFetchState): Optional[PartitionFetchState] = Optional.empty[PartitionFetchState] } private class TestResizeFetcherThread(sourceBroker: BrokerEndPoint, failedPartitions: FailedPartitions, fetchTierStateMachine: TierStateMachine) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 8d1fb37f13007..482393e2bf236 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1893,21 +1893,14 @@ class KafkaConfigTest { } } - @Test - def testMultipleLogDirectoriesNotSupportedWithRemoteLogStorage(): Unit = { - val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) - props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, String.valueOf(true)) - props.put(ServerLogConfigs.LOG_DIRS_CONFIG, "/tmp/a,/tmp/b") - - val caught = assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) - assertTrue(caught.getMessage.contains("Multiple log directories `/tmp/a,/tmp/b` are not supported when remote log storage is enabled")) - } - @Test def testSingleLogDirectoryWithRemoteLogStorage(): Unit = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, String.valueOf(true)) props.put(ServerLogConfigs.LOG_DIRS_CONFIG, "/tmp/a") assertDoesNotThrow(() => KafkaConfig.fromProps(props)) + + props.put(ServerLogConfigs.LOG_DIRS_CONFIG, "/tmp/a,/tmp/b") + assertDoesNotThrow(() => KafkaConfig.fromProps(props)) } } diff --git a/core/src/test/scala/unit/kafka/server/MockTierStateMachine.scala b/core/src/test/scala/unit/kafka/server/MockTierStateMachine.scala index 9e225607325d5..86df92d77daf9 100644 --- a/core/src/test/scala/unit/kafka/server/MockTierStateMachine.scala +++ b/core/src/test/scala/unit/kafka/server/MockTierStateMachine.scala @@ -20,9 +20,7 @@ package kafka.server import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.message.FetchResponseData -import java.util.Optional - -class MockTierStateMachine(leader: LeaderEndPoint) extends ReplicaFetcherTierStateMachine(leader, null) { +class MockTierStateMachine(leader: LeaderEndPoint) extends TierStateMachine(leader, null, false) { var fetcher: MockFetcherThread = _ @@ -37,11 +35,6 @@ class MockTierStateMachine(leader: LeaderEndPoint) extends ReplicaFetcherTierSta Fetching, Some(currentFetchState.currentLeaderEpoch)) } - override def maybeAdvanceState(topicPartition: TopicPartition, - currentFetchState: PartitionFetchState): Optional[PartitionFetchState] = { - Optional.of(currentFetchState) - } - def setFetcher(mockFetcherThread: MockFetcherThread): Unit = { fetcher = mockFetcherThread } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 8b02c72a2b00a..6b3eb31ad9d9f 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -3361,12 +3361,8 @@ class ReplicaManagerTest { val props = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect) val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath val path2 = TestUtils.tempRelativeDir("data2").getAbsolutePath - if (enableRemoteStorage) { - props.put("log.dirs", path1) - props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, enableRemoteStorage.toString) - } else { - props.put("log.dirs", path1 + "," + path2) - } + props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, enableRemoteStorage.toString) + props.put("log.dirs", path1 + "," + path2) propsModifier.apply(props) val config = KafkaConfig.fromProps(props) val logProps = new Properties() @@ -5057,9 +5053,8 @@ class ReplicaManagerTest { assertEquals(followerPartitions, actualFollowerPartitions) } - // KAFKA-16031: Enabling remote storage after JBOD is supported in tiered storage @ParameterizedTest - @ValueSource(booleans = Array(false)) + @ValueSource(booleans = Array(true, false)) def testApplyDeltaShouldHandleReplicaAssignedToOnlineDirectory(enableRemoteStorage: Boolean): Unit = { val localId = 1 val topicPartition0 = new TopicPartition("foo", 0) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala b/core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala similarity index 93% rename from core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala rename to core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala index af7d1ce633f13..139aeb053ffea 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala +++ b/core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala @@ -23,20 +23,21 @@ import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.record._ import org.apache.kafka.common.{TopicPartition, Uuid} import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Test import kafka.server.FetcherThreadTestUtils.{initialFetchState, mkBatch} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import scala.collection.Map -class ReplicaFetcherTierStateMachineTest { +class TierStateMachineTest { - val truncateOnFetch = true val topicIds = Map("topic1" -> Uuid.randomUuid(), "topic2" -> Uuid.randomUuid()) val version = ApiKeys.FETCH.latestVersion() private val failedPartitions = new FailedPartitions - @Test - def testFollowerFetchMovedToTieredStore(): Unit = { + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testFollowerFetchMovedToTieredStore(truncateOnFetch: Boolean): Unit = { val partition = new TopicPartition("topic", 0) val replicaLog = Seq( @@ -94,8 +95,9 @@ class ReplicaFetcherTierStateMachineTest { * tiered storage as well. Hence, `X < globalLogStartOffset`. * 4. Follower comes online and tries to fetch X from leader. */ - @Test - def testFollowerFetchOffsetOutOfRangeWithTieredStore(): Unit = { + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testFollowerFetchOffsetOutOfRangeWithTieredStore(truncateOnFetch: Boolean): Unit = { val partition = new TopicPartition("topic", 0) val replicaLog = Seq( @@ -105,7 +107,7 @@ class ReplicaFetcherTierStateMachineTest { val replicaState = PartitionState(replicaLog, leaderEpoch = 7, highWatermark = 0L, rlmEnabled = true) - val mockLeaderEndpoint = new MockLeaderEndPoint + val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine) @@ -153,8 +155,9 @@ class ReplicaFetcherTierStateMachineTest { assertEquals(11L, replicaState.logEndOffset) } - @Test - def testFencedOffsetResetAfterMovedToRemoteTier(): Unit = { + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testFencedOffsetResetAfterMovedToRemoteTier(truncateOnFetch: Boolean): Unit = { val partition = new TopicPartition("topic", 0) var isErrorHandled = false val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) @@ -189,4 +192,5 @@ class ReplicaFetcherTierStateMachineTest { assertTrue(fetcher.fetchState(partition).isEmpty) assertTrue(failedPartitions.contains(partition)) } -} + +} \ No newline at end of file diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java index 700a8e40d5fe0..4af8cd1b39828 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java @@ -22,6 +22,8 @@ import java.util.Objects; import java.util.Optional; +import static org.apache.kafka.common.requests.FetchRequest.FUTURE_LOCAL_REPLICA_ID; + public class FetchParams { public final short requestVersion; public final int replicaId; @@ -56,6 +58,10 @@ public boolean isFromFollower() { return FetchRequest.isValidBrokerId(replicaId); } + public boolean isFromFuture() { + return replicaId == FUTURE_LOCAL_REPLICA_ID; + } + public boolean isFromConsumer() { return FetchRequest.isConsumer(replicaId); } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java index dff637cf85ed6..f3f873b80d6f1 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.tiered.storage; +import org.apache.kafka.tiered.storage.actions.AlterLogDirAction; import org.apache.kafka.tiered.storage.actions.BounceBrokerAction; import org.apache.kafka.tiered.storage.actions.ConsumeAction; import org.apache.kafka.tiered.storage.actions.CreatePartitionsAction; @@ -321,6 +322,14 @@ public TieredStorageTestBuilder reassignReplica(String topic, return this; } + public TieredStorageTestBuilder alterLogDir(String topic, + Integer partition, + int replicaIds) { + TopicPartition topicPartition = new TopicPartition(topic, partition); + actions.add(new AlterLogDirAction(topicPartition, replicaIds)); + return this; + } + public TieredStorageTestBuilder expectUserTopicMappedToMetadataPartitions(String topic, List metadataPartitions) { actions.add(new ExpectUserTopicMappedToMetadataPartitionsAction(topic, metadataPartitions)); diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java index 274bbf893b72a..1183383bea08a 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java @@ -154,7 +154,7 @@ public static List remoteStorageManagers(Seq br @SuppressWarnings("deprecation") public static List localStorages(Seq brokers) { return JavaConverters.seqAsJavaList(brokers).stream() - .map(b -> new BrokerLocalStorage(b.config().brokerId(), b.config().logDirs().head(), + .map(b -> new BrokerLocalStorage(b.config().brokerId(), JavaConverters.setAsJavaSet(b.config().logDirs().toSet()), STORAGE_WAIT_TIMEOUT_SEC)) .collect(Collectors.toList()); } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java new file mode 100644 index 0000000000000..34f4c7d0892aa --- /dev/null +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.actions; + +import org.apache.kafka.clients.admin.AlterReplicaLogDirsResult; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.test.TestUtils; +import org.apache.kafka.tiered.storage.TieredStorageTestAction; +import org.apache.kafka.tiered.storage.TieredStorageTestContext; +import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage; + +import java.io.File; +import java.io.PrintStream; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public final class AlterLogDirAction implements TieredStorageTestAction { + + private final TopicPartition topicPartition; + private final int brokerId; + + public AlterLogDirAction(TopicPartition topicPartition, + int brokerId) { + this.topicPartition = topicPartition; + this.brokerId = brokerId; + } + + @Override + public void doExecute(TieredStorageTestContext context) throws InterruptedException, ExecutionException, TimeoutException { + Optional localStorage = context.localStorages().stream().filter(storage -> storage.getBrokerId() == brokerId).findFirst(); + if (!localStorage.isPresent()) { + throw new IllegalArgumentException("cannot find local storage for this topic partition:" + topicPartition + " in this broker id:" + brokerId); + } + + Optional sourceDir = localStorage.get().getBrokerStorageDirectories().stream().filter(dir -> localStorage.get().dirContainsTopicPartition(topicPartition, dir)).findFirst(); + if (!sourceDir.isPresent()) { + throw new IllegalArgumentException("No log dir with topic partition:" + topicPartition + " in this broker id:" + brokerId); + } + Optional targetDir = localStorage.get().getBrokerStorageDirectories().stream().filter(dir -> !localStorage.get().dirContainsTopicPartition(topicPartition, dir)).findFirst(); + if (!targetDir.isPresent()) { + throw new IllegalArgumentException("No log dir without topic partition:" + topicPartition + " in this broker id:" + brokerId); + } + + // build alterReplicaLogDirs request content to move from sourceDir to targetDir + TopicPartitionReplica topicPartitionReplica = new TopicPartitionReplica(topicPartition.topic(), topicPartition.partition(), brokerId); + Map logDirs = Collections.singletonMap(topicPartitionReplica, targetDir.get().getAbsolutePath()); + + AlterReplicaLogDirsResult results = context.admin().alterReplicaLogDirs(logDirs); + results.values().get(topicPartitionReplica).get(30, TimeUnit.SECONDS); + + // wait until the topic partition folder disappears from source dir and appears in the target dir + TestUtils.waitForCondition(() -> localStorage.get().dirContainsTopicPartition(topicPartition, targetDir.get()) && + !localStorage.get().dirContainsTopicPartition(topicPartition, sourceDir.get()), + "Failed to alter dir:" + logDirs); + } + + @Override + public void describe(PrintStream output) { + output.print("alter dir for topic partition:" + topicPartition + " in this broker id:" + brokerId); + } +} diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectLeaderAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectLeaderAction.java index f81cfcb5cf870..27a6818def90b 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectLeaderAction.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectLeaderAction.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.tiered.storage.actions; +import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult; import org.apache.kafka.tiered.storage.TieredStorageTestAction; import org.apache.kafka.tiered.storage.TieredStorageTestContext; import org.apache.kafka.clients.admin.NewPartitionReassignment; @@ -34,6 +35,8 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.describeTopic; @@ -53,7 +56,7 @@ public ExpectLeaderAction(TopicPartition topicPartition, } @Override - public void doExecute(TieredStorageTestContext context) throws InterruptedException, ExecutionException { + public void doExecute(TieredStorageTestContext context) throws InterruptedException, ExecutionException, TimeoutException { String topic = topicPartition.topic(); int partition = topicPartition.partition(); TestUtils.waitForCondition(() -> { @@ -88,7 +91,7 @@ public void describe(PrintStream output) { topicPartition, replicaId, electLeader); } - private void reassignPartition(TieredStorageTestContext context) throws ExecutionException, InterruptedException { + private void reassignPartition(TieredStorageTestContext context) throws ExecutionException, InterruptedException, TimeoutException { String topic = topicPartition.topic(); TopicPartitionInfo partitionInfo = describeTopic(context, topic) .partitions() @@ -104,6 +107,7 @@ private void reassignPartition(TieredStorageTestContext context) throws Executio Map> proposed = Collections.singletonMap(topicPartition, Optional.of(new NewPartitionReassignment(targetReplicas))); - context.admin().alterPartitionReassignments(proposed); + AlterPartitionReassignmentsResult result = context.admin().alterPartitionReassignments(proposed); + result.all().get(30, TimeUnit.MINUTES); } } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java new file mode 100644 index 0000000000000..d73ba53b677be --- /dev/null +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.integration; + +import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; +import org.apache.kafka.tiered.storage.TieredStorageTestHarness; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; + +import java.util.Arrays; +import java.util.Collections; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + +public final class AlterLogDirTest extends TieredStorageTestHarness { + + @Override + public int brokerCount() { + return 2; + } + + @Override + protected void writeTestSpecifications(TieredStorageTestBuilder builder) { + final String topicB = "topicB"; + final int p0 = 0; + final int partitionCount = 1; + final int replicationFactor = 2; + final int maxBatchCountPerSegment = 1; + final boolean enableRemoteLogStorage = true; + final int broker0 = 0; + final int broker1 = 1; + + builder + // create topicB with 1 partition and 1 RF + .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment, + mkMap(mkEntry(p0, Arrays.asList(broker1, broker0))), enableRemoteLogStorage) + // send records to partition 0 + .expectSegmentToBeOffloaded(broker1, topicB, p0, 0, new KeyValueSpec("k0", "v0")) + .expectSegmentToBeOffloaded(broker1, topicB, p0, 1, new KeyValueSpec("k1", "v1")) + .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L) + .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), + new KeyValueSpec("k2", "v2")) + // alter dir within the replica, we only expect one replicaId + .alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0)) + // make sure the altered replica can still be elected as the leader + .expectLeader(topicB, p0, broker0, true) + // produce some more events and verify the earliest local offset + .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L) + .produce(topicB, p0, new KeyValueSpec("k3", "v3")) + // consume from the beginning of the topic to read data from local and remote storage + .expectFetchFromTieredStorage(broker0, topicB, p0, 3) + .consume(topicB, p0, 0L, 4, 3); + } +} diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java index 1e9736084aaeb..207f9bd4e837f 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java @@ -74,7 +74,7 @@ public Properties topicConfig() { public void maybeWaitForAtLeastOneSegmentUpload(scala.collection.Seq topicPartitions) { JavaConverters.seqAsJavaList(topicPartitions).forEach(topicPartition -> { List localStorages = JavaConverters.bufferAsJavaList(brokers()).stream() - .map(b -> new BrokerLocalStorage(b.config().brokerId(), b.config().logDirs().head(), STORAGE_WAIT_TIMEOUT_SEC)) + .map(b -> new BrokerLocalStorage(b.config().brokerId(), JavaConverters.setAsJavaSet(b.config().logDirs().toSet()), STORAGE_WAIT_TIMEOUT_SEC)) .collect(Collectors.toList()); localStorages .stream() diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java index e303a25a5d372..639a464f3c910 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -39,17 +40,17 @@ public final class BrokerLocalStorage { private final Integer brokerId; - private final File brokerStorageDirectory; + private final Set brokerStorageDirectories; private final Integer storageWaitTimeoutSec; private final int storagePollPeriodSec = 1; private final Time time = Time.SYSTEM; public BrokerLocalStorage(Integer brokerId, - String storageDirname, + Set storageDirnames, Integer storageWaitTimeoutSec) { this.brokerId = brokerId; - this.brokerStorageDirectory = new File(storageDirname); + this.brokerStorageDirectories = storageDirnames.stream().map(File::new).collect(Collectors.toSet()); this.storageWaitTimeoutSec = storageWaitTimeoutSec; } @@ -57,6 +58,10 @@ public Integer getBrokerId() { return brokerId; } + public Set getBrokerStorageDirectories() { + return brokerStorageDirectories; + } + /** * Wait until the first segment offset in Apache Kafka storage for the given topic-partition is * equal to the provided offset. @@ -142,7 +147,12 @@ private boolean isOffsetPresentInFirstLocalSegment(TopicPartition topicPartition if (offsetToSearch.equals(firstLogFileBaseOffset)) { return true; } - File partitionDir = new File(brokerStorageDirectory.getAbsolutePath(), topicPartition.toString()); + File logDir = brokerStorageDirectories.stream() + .filter(dir -> dirContainsTopicPartition(topicPartition, dir)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException(String.format("[BrokerId=%d] Directory for the topic-partition %s " + + "was not found", brokerId, topicPartition))); + File partitionDir = new File(logDir.getAbsolutePath(), topicPartition.toString()); File firstSegmentFile = new File(partitionDir.getAbsolutePath(), LogFileUtils.filenamePrefixFromOffset(firstLogFileBaseOffset) + LogFileUtils.LOG_FILE_SUFFIX); try (FileRecords fileRecords = FileRecords.open(firstSegmentFile, false)) { @@ -158,13 +168,15 @@ private boolean isOffsetPresentInFirstLocalSegment(TopicPartition topicPartition } public void eraseStorage(FilenameFilter filter) throws IOException { - for (File file : Objects.requireNonNull(brokerStorageDirectory.listFiles(filter))) { - Utils.delete(file); + for (File brokerDir : brokerStorageDirectories) { + for (File file : Objects.requireNonNull(brokerDir.listFiles(filter))) { + Utils.delete(file); + } } } private OffsetHolder getEarliestLocalOffset(TopicPartition topicPartition) { - List partitionFiles = getTopicPartitionFiles(topicPartition); + List partitionFiles = getTopicPartitionFileNames(topicPartition); Optional firstLogFile = partitionFiles.stream() .filter(filename -> filename.endsWith(LogFileUtils.LOG_FILE_SUFFIX)) .sorted() @@ -176,8 +188,29 @@ private OffsetHolder getEarliestLocalOffset(TopicPartition topicPartition) { return new OffsetHolder(LogFileUtils.offsetFromFileName(firstLogFile.get()), partitionFiles); } - private List getTopicPartitionFiles(TopicPartition topicPartition) { - File[] files = brokerStorageDirectory.listFiles((dir, name) -> name.equals(topicPartition.toString())); + public boolean dirContainsTopicPartition(TopicPartition topicPartition, File logDir) { + File[] files = getTopicPartitionFiles(topicPartition, Collections.singleton(logDir)); + return files != null && files.length > 0; + } + + private File[] getTopicPartitionFiles(TopicPartition topicPartition) { + return getTopicPartitionFiles(topicPartition, brokerStorageDirectories); + } + + private File[] getTopicPartitionFiles(TopicPartition topicPartition, Set logDirs) { + File[] files = null; + for (File brokerDir : logDirs) { + files = brokerDir.listFiles((dir, name) -> name.equals(topicPartition.toString())); + // currently, we only expect one topic partition in one log dir + if (files != null && files.length != 0) { + break; + } + } + return files; + } + + private List getTopicPartitionFileNames(TopicPartition topicPartition) { + File[] files = getTopicPartitionFiles(topicPartition); if (files == null || files.length == 0) { throw new IllegalArgumentException(String.format("[BrokerId=%d] Directory for the topic-partition %s " + "was not found", brokerId, topicPartition)); diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java index 1c2aef8a02c66..4db4ece844b99 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java @@ -16,10 +16,12 @@ */ package org.apache.kafka.tiered.storage.utils; +import kafka.utils.TestUtils; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.record.Record; +import org.apache.kafka.server.config.ServerLogConfigs; import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager; import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig; import org.apache.kafka.server.log.remote.storage.LocalTieredStorage; @@ -50,6 +52,7 @@ import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP; import static org.apache.kafka.server.config.ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG; +import static org.apache.kafka.storage.internals.log.CleanerConfig.LOG_CLEANER_ENABLE_PROP; public class TieredStorageTestUtils { @@ -149,6 +152,11 @@ public static Properties createPropsForRemoteStorage(String testClassName, overridingProps.setProperty(storageConfigPrefix(testClassName, DELETE_ON_CLOSE_CONFIG), "false"); // Set a small number of retry interval for retrying RemoteLogMetadataManager resources initialization to speed up the test overridingProps.setProperty(metadataConfigPrefix(testClassName, REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP), RLMM_INIT_RETRY_INTERVAL_MS.toString()); + // Set 2 log dirs to make sure JBOD feature is working correctly + overridingProps.setProperty(ServerLogConfigs.LOG_DIRS_CONFIG, TestUtils.tempDir().getAbsolutePath() + "," + TestUtils.tempDir().getAbsolutePath()); + // Disable unnecessary log cleaner + overridingProps.setProperty(LOG_CLEANER_ENABLE_PROP, "false"); + return overridingProps; }