Skip to content

Commit

Permalink
KAFKA-16082 Avoid resuming future replica if current replica is in th…
Browse files Browse the repository at this point in the history
…e same directory (apache#15136)

It is observed that for scenario (3), i.e. a broker crashes while it
waits for the future replica to catch up for the second time and the
`dir1` is unavailable when the broker is restarted, the
broker tries to create the partition in `dir2` according to the metadata
in the controller. However, ReplicaManager also tries to resume the
stale future replica which was abandoned when the broker crashed. This
results in the renaming of the future replica to fail eventually because
the directory for the topic partition already exists in `dir2` and the
broker then marks `dir2` as offline.

This PR attempts to fix this behaviour by ignoring any future replicas
which are in the same directory as where the log exists. It further
marks the stale future replica for deletion.

Reviewers: Omnia Ibrahim <[email protected]>,  Igor Soarez <[email protected]>, Proven Provenzano <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
gaurav-narula authored Apr 19, 2024
1 parent ecb2dd4 commit 0308543
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 29 deletions.
101 changes: 72 additions & 29 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,36 @@ class LogManager(logDirs: Seq[File],
}
}

def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = {
val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
val tp = futureLog.topicPartition
// We invoke abortAndPauseCleaning here because log cleaner runs asynchronously and replaceCurrentWithFutureLog
// invokes resumeCleaning which requires log cleaner's internal state to have a key for the given topic partition.
abortAndPauseCleaning(tp)

if (currentLog.isDefined)
info(s"Attempting to recover abandoned future log for $tp at $futureLog and removing ${currentLog.get}")
else
info(s"Attempting to recover abandoned future log for $tp at $futureLog")
replaceCurrentWithFutureLog(currentLog, futureLog)
info(s"Successfully recovered abandoned future log for $tp")
}
}

private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Iterable[(UnifiedLog, Option[UnifiedLog])] = {
futureLogs.values.flatMap { futureLog =>
val topicId = futureLog.topicId.getOrElse {
throw new RuntimeException(s"The log dir $futureLog does not have a topic ID, " +
"which is not allowed when running in KRaft mode.")
}
val partitionId = futureLog.topicPartition.partition()
Option(newTopicsImage.getPartition(topicId, partitionId))
.filter(pr => directoryId(futureLog.parentDir).contains(pr.directory(brokerId)))
.map(_ => (futureLog, Option(currentLogs.get(futureLog.topicPartition)).filter(currentLog => currentLog.topicId.contains(topicId))))
}
}

/**
* Mark the partition directory in the source log directory for deletion and
* rename the future log of this partition in the destination log directory to be the current log
Expand All @@ -1189,49 +1219,62 @@ class LogManager(logDirs: Seq[File],
val sourceLog = currentLogs.get(topicPartition)
val destLog = futureLogs.get(topicPartition)

info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition")
if (sourceLog == null)
throw new KafkaStorageException(s"The current replica for $topicPartition is offline")
if (destLog == null)
throw new KafkaStorageException(s"The future replica for $topicPartition is offline")

destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true)
// the metrics tags still contain "future", so we have to remove it.
// we will add metrics back after sourceLog remove the metrics
destLog.removeLogMetrics()
destLog.updateHighWatermark(sourceLog.highWatermark)
info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition")
replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true)
info(s"The current replica is successfully replaced with the future replica for $topicPartition")
}
}

def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = {
val topicPartition = destLog.topicPartition

// Now that future replica has been successfully renamed to be the current replica
// Update the cached map and log cleaner as appropriate.
futureLogs.remove(topicPartition)
currentLogs.put(topicPartition, destLog)
if (cleaner != null) {
cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile)
resumeCleaning(topicPartition)
destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true)
// the metrics tags still contain "future", so we have to remove it.
// we will add metrics back after sourceLog remove the metrics
destLog.removeLogMetrics()
if (updateHighWatermark && sourceLog.isDefined) {
destLog.updateHighWatermark(sourceLog.get.highWatermark)
}

// Now that future replica has been successfully renamed to be the current replica
// Update the cached map and log cleaner as appropriate.
futureLogs.remove(topicPartition)
currentLogs.put(topicPartition, destLog)
if (cleaner != null) {
sourceLog.foreach { srcLog =>
cleaner.alterCheckpointDir(topicPartition, srcLog.parentDirFile, destLog.parentDirFile)
}
resumeCleaning(topicPartition)
}

try {
sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), shouldReinitialize = true)
try {
sourceLog.foreach { srcLog =>
srcLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), shouldReinitialize = true)
// Now that replica in source log directory has been successfully renamed for deletion.
// Close the log, update checkpoint files, and enqueue this log to be deleted.
sourceLog.close()
val logDir = sourceLog.parentDirFile
srcLog.close()
val logDir = srcLog.parentDirFile
val logsToCheckpoint = logsInDir(logDir)
checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint)
checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
sourceLog.removeLogMetrics()
destLog.newMetrics()
addLogToBeDeleted(sourceLog)
} catch {
case e: KafkaStorageException =>
// If sourceLog's log directory is offline, we need close its handlers here.
// handleLogDirFailure() will not close handlers of sourceLog because it has been removed from currentLogs map
sourceLog.closeHandlers()
sourceLog.removeLogMetrics()
throw e
srcLog.removeLogMetrics()
addLogToBeDeleted(srcLog)
}

info(s"The current replica is successfully replaced with the future replica for $topicPartition")
destLog.newMetrics()
} catch {
case e: KafkaStorageException =>
// If sourceLog's log directory is offline, we need close its handlers here.
// handleLogDirFailure() will not close handlers of sourceLog because it has been removed from currentLogs map
sourceLog.foreach { srcLog =>
srcLog.closeHandlers()
srcLog.removeLogMetrics()
}
throw e
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,14 @@ class BrokerMetadataPublisher(
isStray = log => LogManager.isStrayKraftReplica(brokerId, newImage.topics(), log)
)

// Rename all future replicas which are in the same directory as the
// one assigned by the controller. This can only happen due to a disk
// failure and broker shutdown after the directory assignment has been
// updated in the controller but before the future replica could be
// promoted.
// See KAFKA-16082 for details.
logManager.recoverAbandonedFutureLogs(brokerId, newImage.topics())

// Make the LogCleaner available for reconfiguration. We can't do this prior to this
// point because LogManager#startup creates the LogCleaner object, if
// log.cleaner.enable is true. TODO: improve this (see KAFKA-13610)
Expand Down
125 changes: 125 additions & 0 deletions core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package kafka.server

import kafka.log.UnifiedLog
import kafka.network.SocketServer
import kafka.server.IntegrationTestUtils.connectAndReceive
import kafka.testkit.{BrokerNode, KafkaClusterTestKit, TestKitNodes}
import kafka.utils.TestUtils
import org.apache.commons.io.FileUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
Expand Down Expand Up @@ -1402,6 +1404,129 @@ class KRaftClusterTest {
cluster.close()
}
}

@Test
def testAbandonedFutureReplicaRecovered_mainReplicaInOfflineLogDir(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2).
setBrokerNodes(3, 2).
setNumControllerNodes(1).build()).
build()
try {
cluster.format()
cluster.startup()
val admin = Admin.create(cluster.clientProperties())
try {
val broker0 = cluster.brokers().get(0)
val broker1 = cluster.brokers().get(1)
val foo0 = new TopicPartition("foo", 0)

admin.createTopics(Arrays.asList(
new NewTopic("foo", 3, 3.toShort))).all().get()

// Wait until foo-0 is created on broker0.
TestUtils.retry(60000) {
assertTrue(broker0.logManager.getLog(foo0).isDefined)
}

// Shut down broker0 and wait until the ISR of foo-0 is set to [1, 2]
broker0.shutdown()
TestUtils.retry(60000) {
val info = broker1.metadataCache.getPartitionInfo("foo", 0)
assertTrue(info.isDefined)
assertEquals(Set(1, 2), info.get.isr().asScala.toSet)
}

// Modify foo-0 so that it refers to a future replica.
// This is equivalent to a failure during the promotion of the future replica and a restart with directory for
// the main replica being offline
val log = broker0.logManager.getLog(foo0).get
log.renameDir(UnifiedLog.logFutureDirName(foo0), shouldReinitialize = false)

// Start up broker0 and wait until the ISR of foo-0 is set to [0, 1, 2]
broker0.startup()
TestUtils.retry(60000) {
val info = broker1.metadataCache.getPartitionInfo("foo", 0)
assertTrue(info.isDefined)
assertEquals(Set(0, 1, 2), info.get.isr().asScala.toSet)
assertTrue(broker0.logManager.getLog(foo0, isFuture = true).isEmpty)
}
} finally {
admin.close()
}
} finally {
cluster.close()
}
}

@Test
def testAbandonedFutureReplicaRecovered_mainReplicaInOnlineLogDir(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2).
setBrokerNodes(3, 2).
setNumControllerNodes(1).build()).
build()
try {
cluster.format()
cluster.startup()
val admin = Admin.create(cluster.clientProperties())
try {
val broker0 = cluster.brokers().get(0)
val broker1 = cluster.brokers().get(1)
val foo0 = new TopicPartition("foo", 0)

admin.createTopics(Arrays.asList(
new NewTopic("foo", 3, 3.toShort))).all().get()

// Wait until foo-0 is created on broker0.
TestUtils.retry(60000) {
assertTrue(broker0.logManager.getLog(foo0).isDefined)
}

// Shut down broker0 and wait until the ISR of foo-0 is set to [1, 2]
broker0.shutdown()
TestUtils.retry(60000) {
val info = broker1.metadataCache.getPartitionInfo("foo", 0)
assertTrue(info.isDefined)
assertEquals(Set(1, 2), info.get.isr().asScala.toSet)
}

val log = broker0.logManager.getLog(foo0).get

// Copy foo-0 to targetParentDir
// This is so that we can rename the main replica to a future down below
val parentDir = log.parentDir
val targetParentDir = broker0.config.logDirs.filter(_ != parentDir).head
val targetDirFile = new File(targetParentDir, log.dir.getName)
FileUtils.copyDirectory(log.dir, targetDirFile)
assertTrue(targetDirFile.exists())

// Rename original log to a future
// This is equivalent to a failure during the promotion of the future replica and a restart with directory for
// the main replica being online
val originalLogFile = log.dir
log.renameDir(UnifiedLog.logFutureDirName(foo0), shouldReinitialize = false)
assertFalse(originalLogFile.exists())

// Start up broker0 and wait until the ISR of foo-0 is set to [0, 1, 2]
broker0.startup()
TestUtils.retry(60000) {
val info = broker1.metadataCache.getPartitionInfo("foo", 0)
assertTrue(info.isDefined)
assertEquals(Set(0, 1, 2), info.get.isr().asScala.toSet)
assertTrue(broker0.logManager.getLog(foo0, isFuture = true).isEmpty)
assertFalse(targetDirFile.exists())
assertTrue(originalLogFile.exists())
}
} finally {
admin.close()
}
} finally {
cluster.close()
}
}
}

class BadAuthorizer() extends Authorizer {
Expand Down

0 comments on commit 0308543

Please sign in to comment.