From 0308543d596d564b68dcac371a8e3af69e06caea Mon Sep 17 00:00:00 2001 From: Gaurav Narula Date: Fri, 19 Apr 2024 21:18:51 +0100 Subject: [PATCH] KAFKA-16082 Avoid resuming future replica if current replica is in the same directory (#15136) It is observed that for scenario (3), i.e. a broker crashes while it waits for the future replica to catch up for the second time and the `dir1` is unavailable when the broker is restarted, the broker tries to create the partition in `dir2` according to the metadata in the controller. However, ReplicaManager also tries to resume the stale future replica which was abandoned when the broker crashed. This results in the renaming of the future replica to fail eventually because the directory for the topic partition already exists in `dir2` and the broker then marks `dir2` as offline. This PR attempts to fix this behaviour by ignoring any future replicas which are in the same directory as where the log exists. It further marks the stale future replica for deletion. Reviewers: Omnia Ibrahim , Igor Soarez , Proven Provenzano , Chia-Ping Tsai --- .../src/main/scala/kafka/log/LogManager.scala | 101 ++++++++++---- .../metadata/BrokerMetadataPublisher.scala | 8 ++ .../kafka/server/KRaftClusterTest.scala | 125 ++++++++++++++++++ 3 files changed, 205 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 4039bd1295123..3519f72448dd8 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -1178,6 +1178,36 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { + val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) + abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + // We invoke abortAndPauseCleaning here because log cleaner runs asynchronously and replaceCurrentWithFutureLog + // invokes resumeCleaning which requires log cleaner's internal state to have a key for the given topic partition. + abortAndPauseCleaning(tp) + + if (currentLog.isDefined) + info(s"Attempting to recover abandoned future log for $tp at $futureLog and removing ${currentLog.get}") + else + info(s"Attempting to recover abandoned future log for $tp at $futureLog") + replaceCurrentWithFutureLog(currentLog, futureLog) + info(s"Successfully recovered abandoned future log for $tp") + } + } + + private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Iterable[(UnifiedLog, Option[UnifiedLog])] = { + futureLogs.values.flatMap { futureLog => + val topicId = futureLog.topicId.getOrElse { + throw new RuntimeException(s"The log dir $futureLog does not have a topic ID, " + + "which is not allowed when running in KRaft mode.") + } + val partitionId = futureLog.topicPartition.partition() + Option(newTopicsImage.getPartition(topicId, partitionId)) + .filter(pr => directoryId(futureLog.parentDir).contains(pr.directory(brokerId))) + .map(_ => (futureLog, Option(currentLogs.get(futureLog.topicPartition)).filter(currentLog => currentLog.topicId.contains(topicId)))) + } + } + /** * Mark the partition directory in the source log directory for deletion and * rename the future log of this partition in the destination log directory to be the current log @@ -1189,49 +1219,62 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) - info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) - // the metrics tags still contain "future", so we have to remove it. - // we will add metrics back after sourceLog remove the metrics - destLog.removeLogMetrics() - destLog.updateHighWatermark(sourceLog.highWatermark) + info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") + replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true) + info(s"The current replica is successfully replaced with the future replica for $topicPartition") + } + } + + def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = { + val topicPartition = destLog.topicPartition - // Now that future replica has been successfully renamed to be the current replica - // Update the cached map and log cleaner as appropriate. - futureLogs.remove(topicPartition) - currentLogs.put(topicPartition, destLog) - if (cleaner != null) { - cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile) - resumeCleaning(topicPartition) + destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) + // the metrics tags still contain "future", so we have to remove it. + // we will add metrics back after sourceLog remove the metrics + destLog.removeLogMetrics() + if (updateHighWatermark && sourceLog.isDefined) { + destLog.updateHighWatermark(sourceLog.get.highWatermark) + } + + // Now that future replica has been successfully renamed to be the current replica + // Update the cached map and log cleaner as appropriate. + futureLogs.remove(topicPartition) + currentLogs.put(topicPartition, destLog) + if (cleaner != null) { + sourceLog.foreach { srcLog => + cleaner.alterCheckpointDir(topicPartition, srcLog.parentDirFile, destLog.parentDirFile) } + resumeCleaning(topicPartition) + } - try { - sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), shouldReinitialize = true) + try { + sourceLog.foreach { srcLog => + srcLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), shouldReinitialize = true) // Now that replica in source log directory has been successfully renamed for deletion. // Close the log, update checkpoint files, and enqueue this log to be deleted. - sourceLog.close() - val logDir = sourceLog.parentDirFile + srcLog.close() + val logDir = srcLog.parentDirFile val logsToCheckpoint = logsInDir(logDir) checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint) checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint) - sourceLog.removeLogMetrics() - destLog.newMetrics() - addLogToBeDeleted(sourceLog) - } catch { - case e: KafkaStorageException => - // If sourceLog's log directory is offline, we need close its handlers here. - // handleLogDirFailure() will not close handlers of sourceLog because it has been removed from currentLogs map - sourceLog.closeHandlers() - sourceLog.removeLogMetrics() - throw e + srcLog.removeLogMetrics() + addLogToBeDeleted(srcLog) } - - info(s"The current replica is successfully replaced with the future replica for $topicPartition") + destLog.newMetrics() + } catch { + case e: KafkaStorageException => + // If sourceLog's log directory is offline, we need close its handlers here. + // handleLogDirFailure() will not close handlers of sourceLog because it has been removed from currentLogs map + sourceLog.foreach { srcLog => + srcLog.closeHandlers() + srcLog.removeLogMetrics() + } + throw e } } diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index 11e3ca7fddce7..048a665757b74 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -294,6 +294,14 @@ class BrokerMetadataPublisher( isStray = log => LogManager.isStrayKraftReplica(brokerId, newImage.topics(), log) ) + // Rename all future replicas which are in the same directory as the + // one assigned by the controller. This can only happen due to a disk + // failure and broker shutdown after the directory assignment has been + // updated in the controller but before the future replica could be + // promoted. + // See KAFKA-16082 for details. + logManager.recoverAbandonedFutureLogs(brokerId, newImage.topics()) + // Make the LogCleaner available for reconfiguration. We can't do this prior to this // point because LogManager#startup creates the LogCleaner object, if // log.cleaner.enable is true. TODO: improve this (see KAFKA-13610) diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 50be178f76035..8fc23a075eee4 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -17,10 +17,12 @@ package kafka.server +import kafka.log.UnifiedLog import kafka.network.SocketServer import kafka.server.IntegrationTestUtils.connectAndReceive import kafka.testkit.{BrokerNode, KafkaClusterTestKit, TestKitNodes} import kafka.utils.TestUtils +import org.apache.commons.io.FileUtils import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin._ import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter} @@ -1402,6 +1404,129 @@ class KRaftClusterTest { cluster.close() } } + + @Test + def testAbandonedFutureReplicaRecovered_mainReplicaInOfflineLogDir(): Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2). + setBrokerNodes(3, 2). + setNumControllerNodes(1).build()). + build() + try { + cluster.format() + cluster.startup() + val admin = Admin.create(cluster.clientProperties()) + try { + val broker0 = cluster.brokers().get(0) + val broker1 = cluster.brokers().get(1) + val foo0 = new TopicPartition("foo", 0) + + admin.createTopics(Arrays.asList( + new NewTopic("foo", 3, 3.toShort))).all().get() + + // Wait until foo-0 is created on broker0. + TestUtils.retry(60000) { + assertTrue(broker0.logManager.getLog(foo0).isDefined) + } + + // Shut down broker0 and wait until the ISR of foo-0 is set to [1, 2] + broker0.shutdown() + TestUtils.retry(60000) { + val info = broker1.metadataCache.getPartitionInfo("foo", 0) + assertTrue(info.isDefined) + assertEquals(Set(1, 2), info.get.isr().asScala.toSet) + } + + // Modify foo-0 so that it refers to a future replica. + // This is equivalent to a failure during the promotion of the future replica and a restart with directory for + // the main replica being offline + val log = broker0.logManager.getLog(foo0).get + log.renameDir(UnifiedLog.logFutureDirName(foo0), shouldReinitialize = false) + + // Start up broker0 and wait until the ISR of foo-0 is set to [0, 1, 2] + broker0.startup() + TestUtils.retry(60000) { + val info = broker1.metadataCache.getPartitionInfo("foo", 0) + assertTrue(info.isDefined) + assertEquals(Set(0, 1, 2), info.get.isr().asScala.toSet) + assertTrue(broker0.logManager.getLog(foo0, isFuture = true).isEmpty) + } + } finally { + admin.close() + } + } finally { + cluster.close() + } + } + + @Test + def testAbandonedFutureReplicaRecovered_mainReplicaInOnlineLogDir(): Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2). + setBrokerNodes(3, 2). + setNumControllerNodes(1).build()). + build() + try { + cluster.format() + cluster.startup() + val admin = Admin.create(cluster.clientProperties()) + try { + val broker0 = cluster.brokers().get(0) + val broker1 = cluster.brokers().get(1) + val foo0 = new TopicPartition("foo", 0) + + admin.createTopics(Arrays.asList( + new NewTopic("foo", 3, 3.toShort))).all().get() + + // Wait until foo-0 is created on broker0. + TestUtils.retry(60000) { + assertTrue(broker0.logManager.getLog(foo0).isDefined) + } + + // Shut down broker0 and wait until the ISR of foo-0 is set to [1, 2] + broker0.shutdown() + TestUtils.retry(60000) { + val info = broker1.metadataCache.getPartitionInfo("foo", 0) + assertTrue(info.isDefined) + assertEquals(Set(1, 2), info.get.isr().asScala.toSet) + } + + val log = broker0.logManager.getLog(foo0).get + + // Copy foo-0 to targetParentDir + // This is so that we can rename the main replica to a future down below + val parentDir = log.parentDir + val targetParentDir = broker0.config.logDirs.filter(_ != parentDir).head + val targetDirFile = new File(targetParentDir, log.dir.getName) + FileUtils.copyDirectory(log.dir, targetDirFile) + assertTrue(targetDirFile.exists()) + + // Rename original log to a future + // This is equivalent to a failure during the promotion of the future replica and a restart with directory for + // the main replica being online + val originalLogFile = log.dir + log.renameDir(UnifiedLog.logFutureDirName(foo0), shouldReinitialize = false) + assertFalse(originalLogFile.exists()) + + // Start up broker0 and wait until the ISR of foo-0 is set to [0, 1, 2] + broker0.startup() + TestUtils.retry(60000) { + val info = broker1.metadataCache.getPartitionInfo("foo", 0) + assertTrue(info.isDefined) + assertEquals(Set(0, 1, 2), info.get.isr().asScala.toSet) + assertTrue(broker0.logManager.getLog(foo0, isFuture = true).isEmpty) + assertFalse(targetDirFile.exists()) + assertTrue(originalLogFile.exists()) + } + } finally { + admin.close() + } + } finally { + cluster.close() + } + } } class BadAuthorizer() extends Authorizer {