From 49bbf4d50dccecba612930740f5fccf0b87dfa0b Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Tue, 28 May 2024 12:23:34 +0800 Subject: [PATCH] KAFKA-16709: abortAndPauseCleaning only when future log is not existed (#15951) When doing alter replica logDirs, we'll create a future log and pause log cleaning for the partition( here). And this log cleaning pausing will resume after alter replica logDirs completes (here). And when in the resuming log cleaning, we'll decrement 1 for the LogCleaningPaused count. Once the count reached 0, the cleaning pause is really resuming. (here). For more explanation about the logCleaningPaused state can check here. But, there's still one factor that could increase the LogCleaningPaused count: leadership change (here). When there's a leadership change, we'll check if there's a future log in this partition, if so, we'll create future log and pauseCleaning (LogCleaningPaused count + 1). So, if during the alter replica logDirs: 1. alter replica logDirs for tp0 triggered (LogCleaningPaused count = 1) 2. tp0 leadership changed (LogCleaningPaused count = 2) 3. alter replica logDirs completes, resuming logCleaning (LogCleaningPaused count = 1) 4. LogCleaning keeps paused because the count is always > 0 This PR fixes this issue by only abortAndPauseCleaning when future log is not existed. We did the same check in alterReplicaLogDirs. So this change can make sure there's only 1 abortAndPauseCleaning for either abortAndPauseCleaning or maybeAddLogDirFetchers. Tests also added. Reviewers: Chia-Ping Tsai , Igor Soarez --- .../main/scala/kafka/cluster/Partition.scala | 2 +- .../scala/kafka/server/ReplicaManager.scala | 21 +++--- .../kafka/server/ReplicaManagerTest.scala | 75 +++++++++++++++++++ 3 files changed, 86 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 980c6ccb258fe..e6783b7d190c1 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -427,7 +427,7 @@ class Partition(val topicPartition: TopicPartition, * @param highWatermarkCheckpoints Checkpoint to load initial high watermark from * @return true iff the future replica is created */ - def maybeCreateFutureReplica(logDir: String, highWatermarkCheckpoints: OffsetCheckpoints): Boolean = { + def maybeCreateFutureReplica(logDir: String, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid] = topicId): Boolean = { // The writeLock is needed to make sure that while the caller checks the log directory of the // current replica and the existence of the future replica, no other thread can update the log directory of the // current replica or remove the future replica. diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index e69fd60385f93..595932b4291f0 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2114,16 +2114,12 @@ class ReplicaManager(val config: KafkaConfig, partition.log.foreach { _ => val leader = BrokerEndPoint(config.brokerId, "localhost", -1) - // Add future replica log to partition's map - partition.createLogIfNotExists( - isNew = false, - isFutureReplica = true, - offsetCheckpoints, - topicIds(partition.topic)) - - // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move - // replica from source dir to destination dir - logManager.abortAndPauseCleaning(topicPartition) + // Add future replica log to partition's map if it's not existed + if (partition.maybeCreateFutureReplica(futureLog.parentDir, offsetCheckpoints, topicIds(partition.topic))) { + // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move + // replica from source dir to destination dir + logManager.abortAndPauseCleaning(topicPartition) + } futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader, partition.getLeaderEpoch, futureLog.highWatermark)) @@ -2131,8 +2127,11 @@ class ReplicaManager(val config: KafkaConfig, } } - if (futureReplicasAndInitialOffset.nonEmpty) + if (futureReplicasAndInitialOffset.nonEmpty) { + // Even though it's possible that there is another thread adding fetcher for this future log partition, + // but it's fine because `BrokerIdAndFetcherId` will be identical and the operation will be no-op. replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset) + } } /* diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 5f1c41ba3f1c6..8b02c72a2b00a 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -315,6 +315,81 @@ class ReplicaManagerTest { } } + @ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with futureLogCreated: {0}") + @ValueSource(booleans = Array(true, false)) + def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): Unit = { + val dir1 = TestUtils.tempDir() + val dir2 = TestUtils.tempDir() + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) + props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath) + val config = KafkaConfig.fromProps(props) + val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties())) + val spyLogManager = spy(logManager) + val metadataCache: MetadataCache = mock(classOf[MetadataCache]) + mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0))) + when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) + val tp0 = new TopicPartition(topic, 0) + val uuid = Uuid.randomUuid() + val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = spyLogManager, + quotaManagers = quotaManager, + metadataCache = metadataCache, + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + + try { + val partition = rm.createPartition(tp0) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, + new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), Option.apply(uuid)) + + val response = rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(0) + .setIsr(Seq[Integer](0).asJava) + .setPartitionEpoch(0) + .setReplicas(Seq[Integer](0).asJava) + .setIsNew(false)).asJava, + Collections.singletonMap(topic, uuid), + Set(new Node(0, "host1", 0)).asJava).build(), (_, _) => ()) + // expect the errorCounts only has 1 entry with Errors.NONE + val errorCounts = response.errorCounts() + assertEquals(1, response.errorCounts().size()) + assertNotNull(errorCounts.get(Errors.NONE)) + spyLogManager.maybeUpdatePreferredLogDir(tp0, dir2.getAbsolutePath) + + if (futureLogCreated) { + // create future log before maybeAddLogDirFetchers invoked + partition.createLogIfNotExists(isNew = false, isFutureReplica = true, + new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) + } else { + val mockLog = mock(classOf[UnifiedLog]) + when(spyLogManager.getLog(tp0, isFuture = true)).thenReturn(Option.apply(mockLog)) + when(mockLog.topicId).thenReturn(Option.apply(uuid)) + when(mockLog.parentDir).thenReturn(dir2.getAbsolutePath) + } + + val topicIdMap: Map[String, Option[Uuid]] = Map(topic -> Option.apply(uuid)) + rm.maybeAddLogDirFetchers(Set(partition), new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), topicIdMap) + if (futureLogCreated) { + // since the futureLog is already created, we don't have to abort and pause the cleaning + verify(spyLogManager, never).abortAndPauseCleaning(any[TopicPartition]) + } else { + verify(spyLogManager, times(1)).abortAndPauseCleaning(any[TopicPartition]) + } + rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => t.fetchState(new TopicPartition(topic, 0)).foreach(s => assertEquals(0L, s.fetchOffset))) + } finally { + rm.shutdown(checkpointHW = false) + } + } + @Test def testClearPurgatoryOnBecomingFollower(): Unit = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)