Skip to content

Commit

Permalink
KAFKA-16709: abortAndPauseCleaning only when future log is not existed (
Browse files Browse the repository at this point in the history
apache#15951)

When doing alter replica logDirs, we'll create a future log and pause log cleaning for the partition( here). And this log cleaning pausing will resume after alter replica logDirs completes (here). And when in the resuming log cleaning, we'll decrement 1 for the LogCleaningPaused count. Once the count reached 0, the cleaning pause is really resuming. (here). For more explanation about the logCleaningPaused state can check here.

But, there's still one factor that could increase the LogCleaningPaused count: leadership change (here). When there's a leadership change, we'll check if there's a future log in this partition, if so, we'll create future log and pauseCleaning (LogCleaningPaused count + 1). So, if during the alter replica logDirs:

1. alter replica logDirs for tp0 triggered (LogCleaningPaused count = 1)
2. tp0 leadership changed (LogCleaningPaused count = 2)
3. alter replica logDirs completes, resuming logCleaning (LogCleaningPaused count = 1)
4. LogCleaning keeps paused because the count is always >  0

This PR fixes this issue by only abortAndPauseCleaning when future log is not existed. We did the same check in alterReplicaLogDirs. So this change can make sure there's only 1 abortAndPauseCleaning for either abortAndPauseCleaning or maybeAddLogDirFetchers. Tests also added.

Reviewers: Chia-Ping Tsai <[email protected]>, Igor Soarez <[email protected]>
  • Loading branch information
showuon authored and TaiJuWu committed Jun 8, 2024
1 parent e720689 commit 49bbf4d
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 12 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ class Partition(val topicPartition: TopicPartition,
* @param highWatermarkCheckpoints Checkpoint to load initial high watermark from
* @return true iff the future replica is created
*/
def maybeCreateFutureReplica(logDir: String, highWatermarkCheckpoints: OffsetCheckpoints): Boolean = {
def maybeCreateFutureReplica(logDir: String, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid] = topicId): Boolean = {
// The writeLock is needed to make sure that while the caller checks the log directory of the
// current replica and the existence of the future replica, no other thread can update the log directory of the
// current replica or remove the future replica.
Expand Down
21 changes: 10 additions & 11 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2114,25 +2114,24 @@ class ReplicaManager(val config: KafkaConfig,
partition.log.foreach { _ =>
val leader = BrokerEndPoint(config.brokerId, "localhost", -1)

// Add future replica log to partition's map
partition.createLogIfNotExists(
isNew = false,
isFutureReplica = true,
offsetCheckpoints,
topicIds(partition.topic))

// pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move
// replica from source dir to destination dir
logManager.abortAndPauseCleaning(topicPartition)
// Add future replica log to partition's map if it's not existed
if (partition.maybeCreateFutureReplica(futureLog.parentDir, offsetCheckpoints, topicIds(partition.topic))) {
// pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move
// replica from source dir to destination dir
logManager.abortAndPauseCleaning(topicPartition)
}

futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader,
partition.getLeaderEpoch, futureLog.highWatermark))
}
}
}

if (futureReplicasAndInitialOffset.nonEmpty)
if (futureReplicasAndInitialOffset.nonEmpty) {
// Even though it's possible that there is another thread adding fetcher for this future log partition,
// but it's fine because `BrokerIdAndFetcherId` will be identical and the operation will be no-op.
replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)
}
}

/*
Expand Down
75 changes: 75 additions & 0 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,81 @@ class ReplicaManagerTest {
}
}

@ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with futureLogCreated: {0}")
@ValueSource(booleans = Array(true, false))
def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): Unit = {
val dir1 = TestUtils.tempDir()
val dir2 = TestUtils.tempDir()
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
val config = KafkaConfig.fromProps(props)
val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties()))
val spyLogManager = spy(logManager)
val metadataCache: MetadataCache = mock(classOf[MetadataCache])
mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0)))
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
val tp0 = new TopicPartition(topic, 0)
val uuid = Uuid.randomUuid()
val rm = new ReplicaManager(
metrics = metrics,
config = config,
time = time,
scheduler = new MockScheduler(time),
logManager = spyLogManager,
quotaManagers = quotaManager,
metadataCache = metadataCache,
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterPartitionManager)

try {
val partition = rm.createPartition(tp0)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), Option.apply(uuid))

val response = rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(0)
.setIsr(Seq[Integer](0).asJava)
.setPartitionEpoch(0)
.setReplicas(Seq[Integer](0).asJava)
.setIsNew(false)).asJava,
Collections.singletonMap(topic, uuid),
Set(new Node(0, "host1", 0)).asJava).build(), (_, _) => ())
// expect the errorCounts only has 1 entry with Errors.NONE
val errorCounts = response.errorCounts()
assertEquals(1, response.errorCounts().size())
assertNotNull(errorCounts.get(Errors.NONE))
spyLogManager.maybeUpdatePreferredLogDir(tp0, dir2.getAbsolutePath)

if (futureLogCreated) {
// create future log before maybeAddLogDirFetchers invoked
partition.createLogIfNotExists(isNew = false, isFutureReplica = true,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None)
} else {
val mockLog = mock(classOf[UnifiedLog])
when(spyLogManager.getLog(tp0, isFuture = true)).thenReturn(Option.apply(mockLog))
when(mockLog.topicId).thenReturn(Option.apply(uuid))
when(mockLog.parentDir).thenReturn(dir2.getAbsolutePath)
}

val topicIdMap: Map[String, Option[Uuid]] = Map(topic -> Option.apply(uuid))
rm.maybeAddLogDirFetchers(Set(partition), new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), topicIdMap)
if (futureLogCreated) {
// since the futureLog is already created, we don't have to abort and pause the cleaning
verify(spyLogManager, never).abortAndPauseCleaning(any[TopicPartition])
} else {
verify(spyLogManager, times(1)).abortAndPauseCleaning(any[TopicPartition])
}
rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => t.fetchState(new TopicPartition(topic, 0)).foreach(s => assertEquals(0L, s.fetchOffset)))
} finally {
rm.shutdown(checkpointHW = false)
}
}

@Test
def testClearPurgatoryOnBecomingFollower(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
Expand Down

0 comments on commit 49bbf4d

Please sign in to comment.