diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 980c6ccb258fe..e6783b7d190c1 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -427,7 +427,7 @@ class Partition(val topicPartition: TopicPartition, * @param highWatermarkCheckpoints Checkpoint to load initial high watermark from * @return true iff the future replica is created */ - def maybeCreateFutureReplica(logDir: String, highWatermarkCheckpoints: OffsetCheckpoints): Boolean = { + def maybeCreateFutureReplica(logDir: String, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid] = topicId): Boolean = { // The writeLock is needed to make sure that while the caller checks the log directory of the // current replica and the existence of the future replica, no other thread can update the log directory of the // current replica or remove the future replica. diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index e69fd60385f93..595932b4291f0 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2114,16 +2114,12 @@ class ReplicaManager(val config: KafkaConfig, partition.log.foreach { _ => val leader = BrokerEndPoint(config.brokerId, "localhost", -1) - // Add future replica log to partition's map - partition.createLogIfNotExists( - isNew = false, - isFutureReplica = true, - offsetCheckpoints, - topicIds(partition.topic)) - - // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move - // replica from source dir to destination dir - logManager.abortAndPauseCleaning(topicPartition) + // Add future replica log to partition's map if it's not existed + if (partition.maybeCreateFutureReplica(futureLog.parentDir, offsetCheckpoints, topicIds(partition.topic))) { + // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move + // replica from source dir to destination dir + logManager.abortAndPauseCleaning(topicPartition) + } futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader, partition.getLeaderEpoch, futureLog.highWatermark)) @@ -2131,8 +2127,11 @@ class ReplicaManager(val config: KafkaConfig, } } - if (futureReplicasAndInitialOffset.nonEmpty) + if (futureReplicasAndInitialOffset.nonEmpty) { + // Even though it's possible that there is another thread adding fetcher for this future log partition, + // but it's fine because `BrokerIdAndFetcherId` will be identical and the operation will be no-op. replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset) + } } /* diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 5f1c41ba3f1c6..8b02c72a2b00a 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -315,6 +315,81 @@ class ReplicaManagerTest { } } + @ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with futureLogCreated: {0}") + @ValueSource(booleans = Array(true, false)) + def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): Unit = { + val dir1 = TestUtils.tempDir() + val dir2 = TestUtils.tempDir() + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) + props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath) + val config = KafkaConfig.fromProps(props) + val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties())) + val spyLogManager = spy(logManager) + val metadataCache: MetadataCache = mock(classOf[MetadataCache]) + mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0))) + when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) + val tp0 = new TopicPartition(topic, 0) + val uuid = Uuid.randomUuid() + val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = spyLogManager, + quotaManagers = quotaManager, + metadataCache = metadataCache, + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + + try { + val partition = rm.createPartition(tp0) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, + new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), Option.apply(uuid)) + + val response = rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(0) + .setIsr(Seq[Integer](0).asJava) + .setPartitionEpoch(0) + .setReplicas(Seq[Integer](0).asJava) + .setIsNew(false)).asJava, + Collections.singletonMap(topic, uuid), + Set(new Node(0, "host1", 0)).asJava).build(), (_, _) => ()) + // expect the errorCounts only has 1 entry with Errors.NONE + val errorCounts = response.errorCounts() + assertEquals(1, response.errorCounts().size()) + assertNotNull(errorCounts.get(Errors.NONE)) + spyLogManager.maybeUpdatePreferredLogDir(tp0, dir2.getAbsolutePath) + + if (futureLogCreated) { + // create future log before maybeAddLogDirFetchers invoked + partition.createLogIfNotExists(isNew = false, isFutureReplica = true, + new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) + } else { + val mockLog = mock(classOf[UnifiedLog]) + when(spyLogManager.getLog(tp0, isFuture = true)).thenReturn(Option.apply(mockLog)) + when(mockLog.topicId).thenReturn(Option.apply(uuid)) + when(mockLog.parentDir).thenReturn(dir2.getAbsolutePath) + } + + val topicIdMap: Map[String, Option[Uuid]] = Map(topic -> Option.apply(uuid)) + rm.maybeAddLogDirFetchers(Set(partition), new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), topicIdMap) + if (futureLogCreated) { + // since the futureLog is already created, we don't have to abort and pause the cleaning + verify(spyLogManager, never).abortAndPauseCleaning(any[TopicPartition]) + } else { + verify(spyLogManager, times(1)).abortAndPauseCleaning(any[TopicPartition]) + } + rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => t.fetchState(new TopicPartition(topic, 0)).foreach(s => assertEquals(0L, s.fetchOffset))) + } finally { + rm.shutdown(checkpointHW = false) + } + } + @Test def testClearPurgatoryOnBecomingFollower(): Unit = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)