From 12cfd0e1762b926e7b7961cd74c2e8258b948d57 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Fri, 20 Mar 2020 07:49:35 +0800 Subject: [PATCH] KAFKA-9654; Update epoch in `ReplicaAlterLogDirsThread` after new LeaderAndIsr (#8223) Currently when there is a leader change with a log dir reassignment in progress, we do not update the leader epoch in the partition state maintained by `ReplicaAlterLogDirsThread`. This can lead to a FENCED_LEADER_EPOCH error, which results in the partition being marked as failed, which is a permanent failure until the broker is restarted. This patch fixes the problem by updating the epoch in `ReplicaAlterLogDirsThread` after receiving a new LeaderAndIsr request from the controller. Reviewers: Jun Rao , Jason Gustafson --- .../kafka/server/AbstractFetcherThread.scala | 54 ++++++++++++------ .../scala/kafka/server/ReplicaManager.scala | 10 ++-- .../admin/ReassignPartitionsClusterTest.scala | 22 ++++++-- .../kafka/server/ReplicaManagerTest.scala | 55 ++++++++++++++++++- 4 files changed, 113 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 5f0d25ba23123..be680744d5492 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -218,7 +218,7 @@ abstract class AbstractFetcherThread(name: String, curPartitionState != null && leaderEpochInRequest == curPartitionState.currentLeaderEpoch } - val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets) + val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets, latestEpochsForPartitions) handlePartitionsWithErrors(partitionsWithError, "truncateToEpochEndOffsets") updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets) } @@ -243,7 +243,8 @@ abstract class AbstractFetcherThread(name: String, updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets) } - private def maybeTruncateToEpochEndOffsets(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = { + private def maybeTruncateToEpochEndOffsets(fetchedEpochs: Map[TopicPartition, EpochEndOffset], + latestEpochsForPartitions: Map[TopicPartition, EpochData]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = { val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState] val partitionsWithError = mutable.HashSet.empty[TopicPartition] @@ -255,7 +256,11 @@ abstract class AbstractFetcherThread(name: String, fetchOffsets.put(tp, offsetTruncationState) case Errors.FENCED_LEADER_EPOCH => - onPartitionFenced(tp) + if (onPartitionFenced(tp, latestEpochsForPartitions.get(tp).flatMap { + p => + if (p.currentLeaderEpoch.isPresent) Some(p.currentLeaderEpoch.get()) + else None + })) partitionsWithError += tp case error => info(s"Retrying leaderEpoch request for partition $tp as the leader reported an error: $error") @@ -266,12 +271,22 @@ abstract class AbstractFetcherThread(name: String, ResultWithPartitions(fetchOffsets, partitionsWithError) } - private def onPartitionFenced(tp: TopicPartition): Unit = inLock(partitionMapLock) { - Option(partitionStates.stateValue(tp)).foreach { currentFetchState => + /** + * remove the partition if the partition state is NOT updated. Otherwise, keep the partition active. + * @return true if the epoch in this thread is updated. otherwise, false + */ + private def onPartitionFenced(tp: TopicPartition, requestEpoch: Option[Int]): Boolean = inLock(partitionMapLock) { + Option(partitionStates.stateValue(tp)).exists { currentFetchState => val currentLeaderEpoch = currentFetchState.currentLeaderEpoch - info(s"Partition $tp has an older epoch ($currentLeaderEpoch) than the current leader. Will await " + - s"the new LeaderAndIsr state before resuming fetching.") - markPartitionFailed(tp) + if (requestEpoch.contains(currentLeaderEpoch)) { + info(s"Partition $tp has an older epoch ($currentLeaderEpoch) than the current leader. Will await " + + s"the new LeaderAndIsr state before resuming fetching.") + markPartitionFailed(tp) + false + } else { + info(s"Partition $tp has an new epoch ($currentLeaderEpoch) than the current leader. retry the partition later") + true + } } } @@ -308,6 +323,7 @@ abstract class AbstractFetcherThread(name: String, // the current offset is the same as the offset requested. val fetchPartitionData = sessionPartitions.get(topicPartition) if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) { + val requestEpoch = if (fetchPartitionData.currentLeaderEpoch.isPresent) Some(fetchPartitionData.currentLeaderEpoch.get().toInt) else None partitionData.error match { case Errors.NONE => try { @@ -350,7 +366,7 @@ abstract class AbstractFetcherThread(name: String, markPartitionFailed(topicPartition) } case Errors.OFFSET_OUT_OF_RANGE => - if (!handleOutOfRangeError(topicPartition, currentFetchState)) + if (handleOutOfRangeError(topicPartition, currentFetchState, requestEpoch)) partitionsWithError += topicPartition case Errors.UNKNOWN_LEADER_EPOCH => @@ -359,7 +375,7 @@ abstract class AbstractFetcherThread(name: String, partitionsWithError += topicPartition case Errors.FENCED_LEADER_EPOCH => - onPartitionFenced(topicPartition) + if (onPartitionFenced(topicPartition, requestEpoch)) partitionsWithError += topicPartition case Errors.NOT_LEADER_FOR_PARTITION => debug(s"Remote broker is not the leader for partition $topicPartition, which could indicate " + @@ -518,31 +534,33 @@ abstract class AbstractFetcherThread(name: String, } /** - * Handle the out of range error. Return true if the request succeeded or was fenced, which means we need - * not backoff and retry. False if there was a retriable error. + * Handle the out of range error. Return false if + * 1) the request succeeded or + * 2) was fenced and this thread haven't received new epoch, + * which means we need not backoff and retry. True if there was a retriable error. */ private def handleOutOfRangeError(topicPartition: TopicPartition, - fetchState: PartitionFetchState): Boolean = { + fetchState: PartitionFetchState, + requestEpoch: Option[Int]): Boolean = { try { val newFetchState = fetchOffsetAndTruncate(topicPartition, fetchState.currentLeaderEpoch) partitionStates.updateAndMoveToEnd(topicPartition, newFetchState) info(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " + s"out of range, which typically implies a leader change. Reset fetch offset to ${newFetchState.fetchOffset}") - true + false } catch { case _: FencedLeaderEpochException => - onPartitionFenced(topicPartition) - true + onPartitionFenced(topicPartition, requestEpoch) case e @ (_ : UnknownTopicOrPartitionException | _ : UnknownLeaderEpochException | _ : NotLeaderForPartitionException) => info(s"Could not fetch offset for $topicPartition due to error: ${e.getMessage}") - false + true case e: Throwable => error(s"Error getting offset for partition $topicPartition", e) - false + true } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index dfd03ec216086..e0994fedb6700 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1178,7 +1178,7 @@ class ReplicaManager(val config: KafkaConfig, // First check partition's leader epoch val partitionStates = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]() - val newPartitions = new mutable.HashSet[Partition] + val updatedPartitions = new mutable.HashSet[Partition] leaderAndIsrRequest.partitionStates.asScala.foreach { partitionState => val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex) @@ -1191,12 +1191,14 @@ class ReplicaManager(val config: KafkaConfig, responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR) None - case HostedPartition.Online(partition) => Some(partition) + case HostedPartition.Online(partition) => + updatedPartitions.add(partition) + Some(partition) case HostedPartition.None => val partition = Partition(topicPartition, time, this) allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition)) - newPartitions.add(partition) + updatedPartitions.add(partition) Some(partition) } @@ -1278,7 +1280,7 @@ class ReplicaManager(val config: KafkaConfig, startHighWatermarkCheckPointThread() val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition, InitialFetchState] - for (partition <- newPartitions) { + for (partition <- updatedPartitions) { val topicPartition = partition.topicPartition if (logManager.getLog(topicPartition, isFuture = true).isDefined) { partition.log.foreach { log => diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala index 1cbf33e0f0b81..6dccdd086df03 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala @@ -71,9 +71,9 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { Admin.create(props) } - def getRandomLogDirAssignment(brokerId: Int): String = { + def getRandomLogDirAssignment(brokerId: Int, excluded: Option[String] = None): String = { val server = servers.find(_.config.brokerId == brokerId).get - val logDirs = server.config.logDirs + val logDirs = server.config.logDirs.filterNot(excluded.contains) new File(logDirs(Random.nextInt(logDirs.size))).getAbsolutePath } @@ -161,19 +161,31 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { } @Test - def shouldMoveSinglePartitionWithinBroker(): Unit = { + def shouldMoveSinglePartitionToSameFolderWithinBroker(): Unit = shouldMoveSinglePartitionWithinBroker(true) + + @Test + def shouldMoveSinglePartitionToDifferentFolderWithinBroker(): Unit = shouldMoveSinglePartitionWithinBroker(false) + + private[this] def shouldMoveSinglePartitionWithinBroker(moveToSameFolder: Boolean): Unit = { // Given a single replica on server 100 startBrokers(Seq(100, 101)) adminClient = createAdminClient(servers) - val expectedLogDir = getRandomLogDirAssignment(100) createTopic(zkClient, topicName, Map(tp0.partition() -> Seq(100)), servers = servers) + val replica = new TopicPartitionReplica(topicName, 0, 100) + val currentLogDir = adminClient.describeReplicaLogDirs(java.util.Collections.singleton(replica)) + .all() + .get() + .get(replica) + .getCurrentReplicaLogDir + + val expectedLogDir = if (moveToSameFolder) currentLogDir else getRandomLogDirAssignment(100, excluded = Some(currentLogDir)) + // When we execute an assignment that moves an existing replica to another log directory on the same broker val topicJson = executeAssignmentJson(Seq( PartitionAssignmentJson(tp0, replicas = Seq(100), logDirectories = Some(Seq(expectedLogDir))) )) ReassignPartitionsCommand.executeAssignment(zkClient, Some(adminClient), topicJson, NoThrottle) - val replica = new TopicPartitionReplica(topicName, 0, 100) waitUntilTrue(() => { expectedLogDir == adminClient.describeReplicaLogDirs(Collections.singleton(replica)).all().get.get(replica).getCurrentReplicaLogDir }, "Partition should have been moved to the expected log directory", 1000) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index a7c59ea03e135..e20ed10ebb698 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -209,6 +209,58 @@ class ReplicaManagerTest { } } + @Test + def testFencedErrorCausedByBecomeLeader(): Unit = { + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer) + try { + val brokerList = Seq[Integer](0, 1).asJava + val topicPartition = new TopicPartition(topic, 0) + replicaManager.createPartition(topicPartition) + .createLogIfNotExists(0, isNew = false, isFutureReplica = false, + new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)) + + def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(epoch) + .setIsr(brokerList) + .setZkVersion(0) + .setReplicas(brokerList) + .setIsNew(true)).asJava, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ()) + val partition = replicaManager.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true) + .localLogOrException + assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == partition.dir.getParentFile).size) + + // find the live and different folder + val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_ == partition.dir.getParentFile).head + assertEquals(0, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size) + replicaManager.alterReplicaLogDirs(Map(topicPartition -> newReplicaFolder.getAbsolutePath)) + replicaManager.futureLocalLogOrException(topicPartition) + assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size) + // change the epoch from 0 to 1 in order to make fenced error + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1), (_, _) => ()) + TestUtils.waitUntilTrue(() => replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.values.forall(_.partitionCount() == 0), + s"the partition=$topicPartition should be removed from pending state") + // the partition is added to failedPartitions if fenced error happens + // if the thread is done before ReplicaManager#becomeLeaderOrFollower updates epoch,the fenced error does + // not happen and failedPartitions is empty. + if (replicaManager.replicaAlterLogDirsManager.failedPartitions.size != 0) { + replicaManager.replicaAlterLogDirsManager.shutdownIdleFetcherThreads() + assertEquals(0, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size) + // send request again + replicaManager.alterReplicaLogDirs(Map(topicPartition -> newReplicaFolder.getAbsolutePath)) + // the future folder exists so it fails to invoke thread + assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size) + } + } finally replicaManager.shutdown(checkpointHW = false) + } + @Test def testReceiveOutOfOrderSequenceExceptionWithLogStartOffset(): Unit = { val timer = new MockTimer @@ -1279,6 +1331,7 @@ class ReplicaManagerTest { isFuture = false)).once } EasyMock.expect(mockLogMgr.initializingLog(topicPartitionObj)).anyTimes + EasyMock.expect(mockLogMgr.getLog(topicPartitionObj, isFuture = true)).andReturn(None) EasyMock.expect(mockLogMgr.finishedInitializingLog( EasyMock.eq(topicPartitionObj), EasyMock.anyObject(), EasyMock.anyObject())).anyTimes @@ -1469,7 +1522,7 @@ class ReplicaManagerTest { private def setupReplicaManagerWithMockedPurgatories(timer: MockTimer, aliveBrokerIds: Seq[Int] = Seq(0, 1)): ReplicaManager = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) - props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) + props.put("log.dirs", TestUtils.tempRelativeDir("data").getAbsolutePath + "," + TestUtils.tempRelativeDir("data2").getAbsolutePath) val config = KafkaConfig.fromProps(props) val logProps = new Properties() val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps))