Skip to content

Commit

Permalink
KAFKA-9654; Update epoch in ReplicaAlterLogDirsThread after new Lea…
Browse files Browse the repository at this point in the history
…derAndIsr (#8223)

Currently when there is a leader change with a log dir reassignment in progress, we do not update the leader epoch in the partition state maintained by `ReplicaAlterLogDirsThread`. This can lead to a FENCED_LEADER_EPOCH error, which results in the partition being marked as failed, which is a permanent failure until the broker is restarted. This patch fixes the problem by updating the epoch in `ReplicaAlterLogDirsThread` after receiving a new LeaderAndIsr request from the controller.

Reviewers: Jun Rao <[email protected]>, Jason Gustafson <[email protected]>
  • Loading branch information
chia7712 authored and hachikuji committed Apr 10, 2020
1 parent bfc7a7f commit 66ae1dd
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 34 deletions.
58 changes: 40 additions & 18 deletions core/src/main/scala/kafka/server/AbstractFetcherThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ abstract class AbstractFetcherThread(name: String,
curPartitionState != null && leaderEpochInRequest == curPartitionState.currentLeaderEpoch
}

val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets)
val ResultWithPartitions(fetchOffsets, partitionsWithError) =
maybeTruncateToEpochEndOffsets(epochEndOffsets, latestEpochsForPartitions)
handlePartitionsWithErrors(partitionsWithError)
updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
}
Expand Down Expand Up @@ -232,7 +233,8 @@ abstract class AbstractFetcherThread(name: String,
updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
}

private def maybeTruncateToEpochEndOffsets(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = {
private def maybeTruncateToEpochEndOffsets(fetchedEpochs: Map[TopicPartition, EpochEndOffset],
latestEpochsForPartitions: Map[TopicPartition, EpochData]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = {
val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState]
val partitionsWithError = mutable.HashSet.empty[TopicPartition]

Expand All @@ -245,7 +247,11 @@ abstract class AbstractFetcherThread(name: String,
fetchOffsets.put(tp, offsetTruncationState)

case Errors.FENCED_LEADER_EPOCH =>
onPartitionFenced(tp)
if (onPartitionFenced(tp, latestEpochsForPartitions.get(tp).flatMap {
p =>
if (p.currentLeaderEpoch.isPresent) Some(p.currentLeaderEpoch.get())
else None
})) partitionsWithError += tp

case error =>
info(s"Retrying leaderEpoch request for partition $tp as the leader reported an error: $error")
Expand All @@ -261,12 +267,22 @@ abstract class AbstractFetcherThread(name: String,
ResultWithPartitions(fetchOffsets, partitionsWithError)
}

private def onPartitionFenced(tp: TopicPartition): Unit = inLock(partitionMapLock) {
Option(partitionStates.stateValue(tp)).foreach { currentFetchState =>
/**
* remove the partition if the partition state is NOT updated. Otherwise, keep the partition active.
* @return true if the epoch in this thread is updated. otherwise, false
*/
private def onPartitionFenced(tp: TopicPartition, requestEpoch: Option[Int]): Boolean = inLock(partitionMapLock) {
Option(partitionStates.stateValue(tp)).exists { currentFetchState =>
val currentLeaderEpoch = currentFetchState.currentLeaderEpoch
info(s"Partition $tp has an older epoch ($currentLeaderEpoch) than the current leader. Will await " +
s"the new LeaderAndIsr state before resuming fetching.")
partitionStates.remove(tp)
if (requestEpoch.contains(currentLeaderEpoch)) {
info(s"Partition $tp has an older epoch ($currentLeaderEpoch) than the current leader. Will await " +
s"the new LeaderAndIsr state before resuming fetching.")
partitionStates.remove(tp)
false
} else {
info(s"Partition $tp has an new epoch ($currentLeaderEpoch) than the current leader. retry the partition later")
true
}
}
}

Expand Down Expand Up @@ -303,6 +319,10 @@ abstract class AbstractFetcherThread(name: String,
// the current offset is the same as the offset requested.
val fetchState = fetchStates(topicPartition)
if (fetchState.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
val requestEpoch = if (fetchState.currentLeaderEpoch >= 0)
Some(fetchState.currentLeaderEpoch)
else
None
partitionData.error match {
case Errors.NONE =>
try {
Expand Down Expand Up @@ -342,7 +362,7 @@ abstract class AbstractFetcherThread(name: String,
s"offset ${currentFetchState.fetchOffset}", e)
}
case Errors.OFFSET_OUT_OF_RANGE =>
if (!handleOutOfRangeError(topicPartition, currentFetchState))
if (handleOutOfRangeError(topicPartition, currentFetchState, requestEpoch))
partitionsWithError += topicPartition

case Errors.UNKNOWN_LEADER_EPOCH =>
Expand All @@ -351,7 +371,7 @@ abstract class AbstractFetcherThread(name: String,
partitionsWithError += topicPartition

case Errors.FENCED_LEADER_EPOCH =>
onPartitionFenced(topicPartition)
if (onPartitionFenced(topicPartition, requestEpoch)) partitionsWithError += topicPartition

case Errors.NOT_LEADER_FOR_PARTITION =>
debug(s"Remote broker is not the leader for partition $topicPartition, which could indicate " +
Expand Down Expand Up @@ -504,32 +524,34 @@ abstract class AbstractFetcherThread(name: String,
}

/**
* Handle the out of range error. Return true if the request succeeded or was fenced, which means we need
* not backoff and retry. False if there was a retriable error.
* Handle the out of range error. Return false if
* 1) the request succeeded or
* 2) was fenced and this thread haven't received new epoch,
* which means we need not backoff and retry. True if there was a retriable error.
*/
private def handleOutOfRangeError(topicPartition: TopicPartition,
fetchState: PartitionFetchState): Boolean = {
fetchState: PartitionFetchState,
requestEpoch: Option[Int]): Boolean = {
try {
val newOffset = fetchOffsetAndTruncate(topicPartition, fetchState.currentLeaderEpoch)
val newFetchState = PartitionFetchState(newOffset, fetchState.currentLeaderEpoch, state = Fetching)
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
info(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " +
s"out of range, which typically implies a leader change. Reset fetch offset to $newOffset")
true
false
} catch {
case _: FencedLeaderEpochException =>
onPartitionFenced(topicPartition)
true
onPartitionFenced(topicPartition, requestEpoch)

case e @ (_ : UnknownTopicOrPartitionException |
_ : UnknownLeaderEpochException |
_ : NotLeaderForPartitionException) =>
info(s"Could not fetch offset for $topicPartition due to error: ${e.getMessage}")
false
true

case e: Throwable =>
error(s"Error getting offset for partition $topicPartition", e)
false
true
}
}

Expand Down
15 changes: 6 additions & 9 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1057,14 +1057,10 @@ class ReplicaManager(val config: KafkaConfig,

// First check partition's leader epoch
val partitionState = new mutable.HashMap[Partition, LeaderAndIsrRequest.PartitionState]()
val newPartitions = new mutable.HashSet[Partition]
val updatedPartitions = new mutable.HashSet[Partition]

leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
val partition = getPartition(topicPartition).getOrElse {
val createdPartition = getOrCreatePartition(topicPartition)
newPartitions.add(createdPartition)
createdPartition
}
val partition = getOrCreatePartition(topicPartition)
val currentLeaderEpoch = partition.getLeaderEpoch
val requestLeaderEpoch = stateInfo.basePartitionState.leaderEpoch
if (partition eq ReplicaManager.OfflinePartition) {
Expand All @@ -1076,9 +1072,10 @@ class ReplicaManager(val config: KafkaConfig,
} else if (requestLeaderEpoch > currentLeaderEpoch) {
// If the leader epoch is valid record the epoch of the controller that made the leadership decision.
// This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
if(stateInfo.basePartitionState.replicas.contains(localBrokerId))
if (stateInfo.basePartitionState.replicas.contains(localBrokerId)) {
updatedPartitions.add(partition)
partitionState.put(partition, stateInfo)
else {
} else {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
s"in assigned replica list ${stateInfo.basePartitionState.replicas.asScala.mkString(",")}")
Expand Down Expand Up @@ -1128,7 +1125,7 @@ class ReplicaManager(val config: KafkaConfig,
}

val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition, InitialFetchState]
for (partition <- newPartitions) {
for (partition <- updatedPartitions) {
val topicPartition = partition.topicPartition
if (logManager.getLog(topicPartition, isFuture = true).isDefined) {
partition.localReplica.foreach { replica =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
JAdminClient.create(props)
}

def getRandomLogDirAssignment(brokerId: Int): String = {
def getRandomLogDirAssignment(brokerId: Int, excluded: Option[String] = None): String = {
val server = servers.find(_.config.brokerId == brokerId).get
val logDirs = server.config.logDirs
val logDirs = server.config.logDirs.filterNot(excluded.contains)
new File(logDirs(Random.nextInt(logDirs.size))).getAbsolutePath
}

Expand Down Expand Up @@ -134,18 +134,33 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
}

@Test
def shouldMoveSinglePartitionWithinBroker() {
def shouldMoveSinglePartitionToSameFolderWithinBroker(): Unit = shouldMoveSinglePartitionWithinBroker(true)

@Test
def shouldMoveSinglePartitionToDifferentFolderWithinBroker(): Unit = shouldMoveSinglePartitionWithinBroker(false)

def shouldMoveSinglePartitionWithinBroker(moveToSameFolder: Boolean): Unit = {
// Given a single replica on server 100
startBrokers(Seq(100, 101))
adminClient = createAdminClient(servers)
val expectedLogDir = getRandomLogDirAssignment(100)
createTopic(zkClient, topicName, Map(0 -> Seq(100)), servers = servers)

val replica = new TopicPartitionReplica(topicName, 0, 100)
val currentLogDir = adminClient.describeReplicaLogDirs(java.util.Collections.singleton(replica))
.all()
.get()
.get(replica)
.getCurrentReplicaLogDir

val expectedLogDir = if (moveToSameFolder)
currentLogDir
else
getRandomLogDirAssignment(100, excluded = Some(currentLogDir))

// When we execute an assignment that moves an existing replica to another log directory on the same broker
val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[100],"log_dirs":["$expectedLogDir"]}]}"""
ReassignPartitionsCommand.executeAssignment(zkClient, Some(adminClient), topicJson, NoThrottle)
val replica = new TopicPartitionReplica(topicName, 0, 100)
TestUtils.waitUntilTrue(() => {
waitUntilTrue(() => {
expectedLogDir == adminClient.describeReplicaLogDirs(Collections.singleton(replica)).all().get.get(replica).getCurrentReplicaLogDir
}, "Partition should have been moved to the expected log directory", 1000)
}
Expand Down
52 changes: 51 additions & 1 deletion core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,55 @@ class ReplicaManagerTest {
}
}

@Test
def testFencedErrorCausedByBecomeLeader(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer)
try {
val brokerList = Seq[Integer](0, 1).asJava
val topicPartition = new TopicPartition(topic, 0)
replicaManager.getOrCreatePartition(topicPartition)
.getOrCreateReplica(0, isNew = false)

def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
ApiKeys.LEADER_AND_ISR.latestVersion,
0,
0,
brokerEpoch,
Map(topicPartition -> new LeaderAndIsrRequest.PartitionState(0, 0,
epoch, brokerList, 0, brokerList, true)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava
).build()

replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ())
val partition = replicaManager.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
.localReplica.flatMap(_.log).get
assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == partition.dir.getParentFile).size)

// find the live and different folder
val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_ == partition.dir.getParentFile).head
assertEquals(0, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
replicaManager.alterReplicaLogDirs(Map(topicPartition -> newReplicaFolder.getAbsolutePath))
assertTrue(replicaManager.futureLocalReplica(topicPartition).flatMap(_.log).isDefined)

assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
// change the epoch from 0 to 1 in order to make fenced error
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1), (_, _) => ())
TestUtils.waitUntilTrue(() => replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.values.forall(_.partitionCount() == 0),
s"the partition=$topicPartition should be removed from pending state")
// the partition is added to failedPartitions if fenced error happens
// if the thread is done before ReplicaManager#becomeLeaderOrFollower updates epoch,the fenced error does
// not happen and failedPartitions is empty.
if (replicaManager.replicaAlterLogDirsManager.failedPartitions.size != 0) {
replicaManager.replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
assertEquals(0, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
// send request again
replicaManager.alterReplicaLogDirs(Map(topicPartition -> newReplicaFolder.getAbsolutePath))
// the future folder exists so it fails to invoke thread
assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
}
} finally replicaManager.shutdown(checkpointHW = false)
}

@Test
def testReceiveOutOfOrderSequenceExceptionWithLogStartOffset(): Unit = {
val timer = new MockTimer
Expand Down Expand Up @@ -636,6 +685,7 @@ class ReplicaManagerTest {
EasyMock.expect(mockLogMgr.truncateTo(Map(new TopicPartition(topic, topicPartition) -> offsetFromLeader),
isFuture = false)).once
}
EasyMock.expect(mockLogMgr.getLog(new TopicPartition(topic, topicPartition), isFuture = true)).andReturn(None)
EasyMock.replay(mockLogMgr)

val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId)
Expand Down Expand Up @@ -798,7 +848,7 @@ class ReplicaManagerTest {

private def setupReplicaManagerWithMockedPurgatories(timer: MockTimer, aliveBrokerIds: Seq[Int] = Seq(0, 1)): ReplicaManager = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
props.put("log.dirs", TestUtils.tempRelativeDir("data").getAbsolutePath + "," + TestUtils.tempRelativeDir("data2").getAbsolutePath)
val config = KafkaConfig.fromProps(props)
val logProps = new Properties()
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps))
Expand Down

0 comments on commit 66ae1dd

Please sign in to comment.