Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16709: abortAndPauseCleaning only when future log is not existed #15951

Merged
merged 6 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ class Partition(val topicPartition: TopicPartition,
* @param highWatermarkCheckpoints Checkpoint to load initial high watermark from
* @return true iff the future replica is created
*/
def maybeCreateFutureReplica(logDir: String, highWatermarkCheckpoints: OffsetCheckpoints): Boolean = {
def maybeCreateFutureReplica(logDir: String, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid] = topicId): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add the new param to the javadoc?

// The writeLock is needed to make sure that while the caller checks the log directory of the
// current replica and the existence of the future replica, no other thread can update the log directory of the
// current replica or remove the future replica.
Expand Down
21 changes: 10 additions & 11 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2114,25 +2114,24 @@ class ReplicaManager(val config: KafkaConfig,
partition.log.foreach { _ =>
val leader = BrokerEndPoint(config.brokerId, "localhost", -1)

// Add future replica log to partition's map
partition.createLogIfNotExists(
isNew = false,
isFutureReplica = true,
offsetCheckpoints,
topicIds(partition.topic))

// pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move
// replica from source dir to destination dir
logManager.abortAndPauseCleaning(topicPartition)
// Add future replica log to partition's map if it's not existed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If maybeCreateFutureReplica return false, we assume another thread already add the future log to partition and invoke alter thread. Hence, we don't need to abort cleaning since another thread does it.

However, adding alter thread (replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)) is not in this check. Is it possible that alter thread, which is invoked by another thread, just remove the future log and then this thread add the topic partition to replicaAlterLogDirsManager? It seems to me that alter thread will get fail as future log of partition is gone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, adding alter thread (replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)) is not in this check. Is it possible that alter thread, which is invoked by another thread, just remove the future log and then this thread add the topic partition to replicaAlterLogDirsManager? It seems to me that alter thread will get fail as future log of partition is gone.

That's possible. But I think that's fine because the removal of future log could because:

  1. alter logDir completes. In this case, the new leaderAndIsr request or topic partition update will updated and this fetcher will be removed then in ReplicaManager#makeLeader or makeFollower.
  2. Another log failure happened. In this case the createLogIfInexsted will fail, too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, my previous comments is incorrect. Both alterReplicaLogDirs and maybeAddLogDirFetchers are in replicaStateChangeLock, so the race condition I described should not happen.

However, I'm thinking whether it is fine to add alter thread by maybeAddLogDirFetchers even though the future log of partition is already created by another thread. Although no new alter thread will be created as BrokerIdAndFetcherId is identical.

In short, alterReplicaLogDirs adds alter thread [0] only if it succeeds to create future log of partition. Maybe maybeAddLogDirFetchers should follow same rule? Or we can add comments to say "that is fine as replicaAlterLogDirsManager.addFetcherForPartitions will be a no-op in this case?

[0]

if (partition.maybeCreateFutureReplica(destinationDir, highWatermarkCheckpoints)) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 , thanks for the comment.
For this:

In short, alterReplicaLogDirs adds alter thread [0] only if it succeeds to create future log of partition. Maybe maybeAddLogDirFetchers should follow same rule? Or we can add comments to say "that is fine as replicaAlterLogDirsManager.addFetcherForPartitions will be a no-op in this case?

I chose latter option because if we only create fetcher when future log is inexisted, it might cause potential side effect that this fetcher is removed when leadership change, but not get added later. I've added the comment in this commit: 0d78e49 . Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's not existed => if it doesn't exist

if (partition.maybeCreateFutureReplica(futureLog.parentDir, offsetCheckpoints, topicIds(partition.topic))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partition.maybeCreateFutureReplica() only calls partition.createLogIfNotExists() if the future replica doesn't yet exist. If that happens, partition.futureLog won't be set, so we need to call partition.setLog(futureLog, true)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sincre this section is inside a block of logManager.getLog(topicPartition, isFuture = true).foreach { futureLog =>, doesn't that mean this only runs if the future replica exists?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partition.maybeCreateFutureReplica() only calls partition.createLogIfNotExists() if the future replica doesn't yet exist. If that happens, partition.futureLog won't be set, so we need to call partition.setLog(futureLog, true)?

partition.setLog(futureLog, true) is not necessary because when in partition.createLogIfNotExists(), we'll also set futureLog:

  if (isFutureReplica) {
      this.futureLog = Some(maybeCreate(this.futureLog))

Sincre this section is inside a block of logManager.getLog(topicPartition, isFuture = true).foreach { futureLog =>, doesn't that mean this only runs if the future replica exists?

Yes, normally, when we created future log is in this path:

  1. ReplicaManager#alterReplicaLogDirs
  2. partition.maybeCreateFutureReplica
  3. partition.createLogIfNotExists
  4. partition.createLog
  5. logManager.getOrCreateLog

So in the end, we'll have future log added in both logManager#futureLogs map and partition#futureLog` map.

But it's possible that the future log only exists in logManager#futureLogs map, but not in partition#futureLog map, when the future log is created, and before alter logDir completed, the broker restarted. So, after restarted, we'll add the future log into logManager map during loadLog. But the partition#futureLog map won't be updated, until we got leadership update and ReplicaManager#maybeAddLogDirFetchers is called. In this case, the partition.maybeCreateFutureReplica will not create the future log since it's existed, but the partition#futureLog will get updated with the one in logManager.

That means, we have to do the partition.createLogIfNotExists() here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That means, we have to do the partition.createLogIfNotExists() here.

Given that partition.maybeCreateFutureReplica() only calls partition.createLogIfNotExists() if the future replica doesn't yet exist, and that it's possible that the future log only exists in logManager#futureLogs map, but not in partition#futureLog map – do we need to call partition.createLogIfNotExists() directly instead of ``partition.maybeCreateFutureReplica()` ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a way to determine if partition#futureLog map exists or not, so if we call partition.createLogIfNotExists(), we can't know if it is created or not. partition.maybeCreateFutureReplica() returns a boolean value to notify us what we need.

That is, partition.maybeCreateFutureReplica() only calls partition.createLogIfNotExists() when the partition#futureLog map doesn't contain the partition. That's fine because there is no chances that futureLog exists in partition#futureLog but not in logManager#futureLogs map.

Sorry, maybe I didn't get your question here. Could you explain again if I misunderstand it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine because there is no chances that futureLog exists in partition#futureLog but not in logManager#futureLogs map.

The scenario I was thinking of is when the broker starts up, logManager loads the future log, so futureLog exists in partition#futureLog but not in logManager#futureLogs map yet. Only later the when the broker catches up with metatada (ReplicaManager#applyDelta) or receives a LeaderAndIsr request (becomeLeaderOrFollower) and this method maybeAddLogDirFetchers is called, is when we need to make sure partition#futureLog is populated too.

But you're right, my confusion here was with where maybeCreateFutureReplica checks if the futureLog already exists, it checks in itself (Partition) not in LogManager, so this makes sense.

// pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move
// replica from source dir to destination dir
logManager.abortAndPauseCleaning(topicPartition)
}

futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader,
partition.getLeaderEpoch, futureLog.highWatermark))
}
}
}

if (futureReplicasAndInitialOffset.nonEmpty)
if (futureReplicasAndInitialOffset.nonEmpty) {
// Even though it's possible that there is another thread adding fetcher for this future log partition,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, becomeLeaderOrFollower(), alterReplicaLogDirs(), and applyDelta() are done under the replicaStateChangeLock. Is it really possible for another thread to add fetcher for the future log?

// but it's fine because `BrokerIdAndFetcherId` will be identical and the operation will be no-op.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the leader changes, we need to propagate the new leader epoch to ReplicaAlterLogDirsThread (see #8223). So, the operation is not a no-op?

replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)
}
}

/*
Expand Down
75 changes: 75 additions & 0 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,81 @@ class ReplicaManagerTest {
}
}

@ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with futureLogCreated: {0}")
@ValueSource(booleans = Array(true, false))
def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): Unit = {
val dir1 = TestUtils.tempDir()
val dir2 = TestUtils.tempDir()
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
val config = KafkaConfig.fromProps(props)
val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties()))
val spyLogManager = spy(logManager)
val metadataCache: MetadataCache = mock(classOf[MetadataCache])
mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0)))
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
val tp0 = new TopicPartition(topic, 0)
val uuid = Uuid.randomUuid()
val rm = new ReplicaManager(
metrics = metrics,
config = config,
time = time,
scheduler = new MockScheduler(time),
logManager = spyLogManager,
quotaManagers = quotaManager,
metadataCache = metadataCache,
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterPartitionManager)

try {
val partition = rm.createPartition(tp0)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), Option.apply(uuid))

val response = rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(0)
.setIsr(Seq[Integer](0).asJava)
.setPartitionEpoch(0)
.setReplicas(Seq[Integer](0).asJava)
.setIsNew(false)).asJava,
Collections.singletonMap(topic, uuid),
Set(new Node(0, "host1", 0)).asJava).build(), (_, _) => ())
// expect the errorCounts only has 1 entry with Errors.NONE
val errorCounts = response.errorCounts()
assertEquals(1, response.errorCounts().size())
assertNotNull(errorCounts.get(Errors.NONE))
spyLogManager.maybeUpdatePreferredLogDir(tp0, dir2.getAbsolutePath)

if (futureLogCreated) {
// create future log before maybeAddLogDirFetchers invoked
partition.createLogIfNotExists(isNew = false, isFutureReplica = true,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None)
} else {
val mockLog = mock(classOf[UnifiedLog])
when(spyLogManager.getLog(tp0, isFuture = true)).thenReturn(Option.apply(mockLog))
when(mockLog.topicId).thenReturn(Option.apply(uuid))
when(mockLog.parentDir).thenReturn(dir2.getAbsolutePath)
}

val topicIdMap: Map[String, Option[Uuid]] = Map(topic -> Option.apply(uuid))
rm.maybeAddLogDirFetchers(Set(partition), new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), topicIdMap)
if (futureLogCreated) {
// since the futureLog is already created, we don't have to abort and pause the cleaning
verify(spyLogManager, never).abortAndPauseCleaning(any[TopicPartition])
} else {
verify(spyLogManager, times(1)).abortAndPauseCleaning(any[TopicPartition])
}
rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => t.fetchState(new TopicPartition(topic, 0)).foreach(s => assertEquals(0L, s.fetchOffset)))
} finally {
rm.shutdown(checkpointHW = false)
}
}

@Test
def testClearPurgatoryOnBecomingFollower(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
Expand Down