Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9654 ReplicaAlterLogDirsThread can't be created again if the pr… #8223

Merged
merged 1 commit into from
Mar 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 36 additions & 18 deletions core/src/main/scala/kafka/server/AbstractFetcherThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ abstract class AbstractFetcherThread(name: String,
curPartitionState != null && leaderEpochInRequest == curPartitionState.currentLeaderEpoch
}

val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets)
val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets, latestEpochsForPartitions)
handlePartitionsWithErrors(partitionsWithError, "truncateToEpochEndOffsets")
updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
}
Expand All @@ -243,7 +243,8 @@ abstract class AbstractFetcherThread(name: String,
updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
}

private def maybeTruncateToEpochEndOffsets(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = {
private def maybeTruncateToEpochEndOffsets(fetchedEpochs: Map[TopicPartition, EpochEndOffset],
latestEpochsForPartitions: Map[TopicPartition, EpochData]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = {
val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState]
val partitionsWithError = mutable.HashSet.empty[TopicPartition]

Expand All @@ -255,7 +256,11 @@ abstract class AbstractFetcherThread(name: String,
fetchOffsets.put(tp, offsetTruncationState)

case Errors.FENCED_LEADER_EPOCH =>
onPartitionFenced(tp)
if (onPartitionFenced(tp, latestEpochsForPartitions.get(tp).flatMap {
p =>
if (p.currentLeaderEpoch.isPresent) Some(p.currentLeaderEpoch.get())
else None
})) partitionsWithError += tp

case error =>
info(s"Retrying leaderEpoch request for partition $tp as the leader reported an error: $error")
Expand All @@ -266,12 +271,22 @@ abstract class AbstractFetcherThread(name: String,
ResultWithPartitions(fetchOffsets, partitionsWithError)
}

private def onPartitionFenced(tp: TopicPartition): Unit = inLock(partitionMapLock) {
Option(partitionStates.stateValue(tp)).foreach { currentFetchState =>
/**
* remove the partition if the partition state is NOT updated. Otherwise, keep the partition active.
* @return true if the epoch in this thread is updated. otherwise, false
*/
private def onPartitionFenced(tp: TopicPartition, requestEpoch: Option[Int]): Boolean = inLock(partitionMapLock) {
Option(partitionStates.stateValue(tp)).exists { currentFetchState =>
val currentLeaderEpoch = currentFetchState.currentLeaderEpoch
info(s"Partition $tp has an older epoch ($currentLeaderEpoch) than the current leader. Will await " +
s"the new LeaderAndIsr state before resuming fetching.")
markPartitionFailed(tp)
if (requestEpoch.contains(currentLeaderEpoch)) {
info(s"Partition $tp has an older epoch ($currentLeaderEpoch) than the current leader. Will await " +
s"the new LeaderAndIsr state before resuming fetching.")
markPartitionFailed(tp)
false
} else {
info(s"Partition $tp has an new epoch ($currentLeaderEpoch) than the current leader. retry the partition later")
true
}
}
}

Expand Down Expand Up @@ -308,6 +323,7 @@ abstract class AbstractFetcherThread(name: String,
// the current offset is the same as the offset requested.
val fetchPartitionData = sessionPartitions.get(topicPartition)
if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
val requestEpoch = if (fetchPartitionData.currentLeaderEpoch.isPresent) Some(fetchPartitionData.currentLeaderEpoch.get().toInt) else None
partitionData.error match {
case Errors.NONE =>
try {
Expand Down Expand Up @@ -350,7 +366,7 @@ abstract class AbstractFetcherThread(name: String,
markPartitionFailed(topicPartition)
}
case Errors.OFFSET_OUT_OF_RANGE =>
if (!handleOutOfRangeError(topicPartition, currentFetchState))
if (handleOutOfRangeError(topicPartition, currentFetchState, requestEpoch))
partitionsWithError += topicPartition

case Errors.UNKNOWN_LEADER_EPOCH =>
Expand All @@ -359,7 +375,7 @@ abstract class AbstractFetcherThread(name: String,
partitionsWithError += topicPartition

case Errors.FENCED_LEADER_EPOCH =>
onPartitionFenced(topicPartition)
if (onPartitionFenced(topicPartition, requestEpoch)) partitionsWithError += topicPartition

case Errors.NOT_LEADER_FOR_PARTITION =>
debug(s"Remote broker is not the leader for partition $topicPartition, which could indicate " +
Expand Down Expand Up @@ -518,31 +534,33 @@ abstract class AbstractFetcherThread(name: String,
}

/**
* Handle the out of range error. Return true if the request succeeded or was fenced, which means we need
* not backoff and retry. False if there was a retriable error.
* Handle the out of range error. Return false if
* 1) the request succeeded or
* 2) was fenced and this thread haven't received new epoch,
* which means we need not backoff and retry. True if there was a retriable error.
*/
private def handleOutOfRangeError(topicPartition: TopicPartition,
fetchState: PartitionFetchState): Boolean = {
fetchState: PartitionFetchState,
requestEpoch: Option[Int]): Boolean = {
try {
val newFetchState = fetchOffsetAndTruncate(topicPartition, fetchState.currentLeaderEpoch)
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
info(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " +
s"out of range, which typically implies a leader change. Reset fetch offset to ${newFetchState.fetchOffset}")
true
false
} catch {
case _: FencedLeaderEpochException =>
onPartitionFenced(topicPartition)
true
onPartitionFenced(topicPartition, requestEpoch)

case e @ (_ : UnknownTopicOrPartitionException |
_ : UnknownLeaderEpochException |
_ : NotLeaderForPartitionException) =>
info(s"Could not fetch offset for $topicPartition due to error: ${e.getMessage}")
false
true

case e: Throwable =>
error(s"Error getting offset for partition $topicPartition", e)
false
true
}
}

Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1182,7 +1182,7 @@ class ReplicaManager(val config: KafkaConfig,

// First check partition's leader epoch
val partitionStates = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
val newPartitions = new mutable.HashSet[Partition]
val updatedPartitions = new mutable.HashSet[Partition]

leaderAndIsrRequest.partitionStates.asScala.foreach { partitionState =>
val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
Expand All @@ -1195,12 +1195,14 @@ class ReplicaManager(val config: KafkaConfig,
responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
None

case HostedPartition.Online(partition) => Some(partition)
case HostedPartition.Online(partition) =>
updatedPartitions.add(partition)
Some(partition)

case HostedPartition.None =>
val partition = Partition(topicPartition, time, this)
allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
newPartitions.add(partition)
updatedPartitions.add(partition)
Some(partition)
}

Expand Down Expand Up @@ -1282,7 +1284,7 @@ class ReplicaManager(val config: KafkaConfig,
startHighWatermarkCheckPointThread()

val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition, InitialFetchState]
for (partition <- newPartitions) {
for (partition <- updatedPartitions) {
val topicPartition = partition.topicPartition
if (logManager.getLog(topicPartition, isFuture = true).isDefined) {
partition.log.foreach { log =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
Admin.create(props)
}

def getRandomLogDirAssignment(brokerId: Int): String = {
def getRandomLogDirAssignment(brokerId: Int, excluded: Option[String] = None): String = {
val server = servers.find(_.config.brokerId == brokerId).get
val logDirs = server.config.logDirs
val logDirs = server.config.logDirs.filterNot(excluded.contains)
new File(logDirs(Random.nextInt(logDirs.size))).getAbsolutePath
}

Expand Down Expand Up @@ -161,19 +161,31 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
}

@Test
def shouldMoveSinglePartitionWithinBroker(): Unit = {
def shouldMoveSinglePartitionToSameFolderWithinBroker(): Unit = shouldMoveSinglePartitionWithinBroker(true)

@Test
def shouldMoveSinglePartitionToDifferentFolderWithinBroker(): Unit = shouldMoveSinglePartitionWithinBroker(false)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving the replica folder to another location has chance to produce this issue so I separate the origin test case to two cases. The first case always move the folder to same location. Another case does move the folder to different location.


private[this] def shouldMoveSinglePartitionWithinBroker(moveToSameFolder: Boolean): Unit = {
// Given a single replica on server 100
startBrokers(Seq(100, 101))
adminClient = createAdminClient(servers)
val expectedLogDir = getRandomLogDirAssignment(100)
createTopic(zkClient, topicName, Map(tp0.partition() -> Seq(100)), servers = servers)

val replica = new TopicPartitionReplica(topicName, 0, 100)
val currentLogDir = adminClient.describeReplicaLogDirs(java.util.Collections.singleton(replica))
.all()
.get()
.get(replica)
.getCurrentReplicaLogDir

val expectedLogDir = if (moveToSameFolder) currentLogDir else getRandomLogDirAssignment(100, excluded = Some(currentLogDir))

// When we execute an assignment that moves an existing replica to another log directory on the same broker
val topicJson = executeAssignmentJson(Seq(
PartitionAssignmentJson(tp0, replicas = Seq(100), logDirectories = Some(Seq(expectedLogDir)))
))
ReassignPartitionsCommand.executeAssignment(zkClient, Some(adminClient), topicJson, NoThrottle)
val replica = new TopicPartitionReplica(topicName, 0, 100)
waitUntilTrue(() => {
expectedLogDir == adminClient.describeReplicaLogDirs(Collections.singleton(replica)).all().get.get(replica).getCurrentReplicaLogDir
}, "Partition should have been moved to the expected log directory", 1000)
Expand Down
55 changes: 54 additions & 1 deletion core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,58 @@ class ReplicaManagerTest {
}
}

@Test
def testFencedErrorCausedByBecomeLeader(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer)
try {
val brokerList = Seq[Integer](0, 1).asJava
val topicPartition = new TopicPartition(topic, 0)
replicaManager.createPartition(topicPartition)
.createLogIfNotExists(0, isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))

def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(epoch)
.setIsr(brokerList)
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(true)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()

replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ())
val partition = replicaManager.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
.localLogOrException
assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == partition.dir.getParentFile).size)

// find the live and different folder
val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_ == partition.dir.getParentFile).head
assertEquals(0, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
replicaManager.alterReplicaLogDirs(Map(topicPartition -> newReplicaFolder.getAbsolutePath))
replicaManager.futureLocalLogOrException(topicPartition)
assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
// change the epoch from 0 to 1 in order to make fenced error
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1), (_, _) => ())
TestUtils.waitUntilTrue(() => replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.values.forall(_.partitionCount() == 0),
s"the partition=$topicPartition should be removed from pending state")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 In our IC we are consistently getting a failure in this check. Do you have any suggestion on what is happening and how to fix it?

Error Message

org.scalatest.exceptions.TestFailedException: the partition=test-topic-0 should be removed from pending state

Stacktrace

org.scalatest.exceptions.TestFailedException: the partition=test-topic-0 should be removed from pending state
	at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
	at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
	at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
	at org.scalatest.Assertions.fail(Assertions.scala:1091)
	at org.scalatest.Assertions.fail$(Assertions.scala:1087)
	at org.scalatest.Assertions$.fail(Assertions.scala:1389)
	at kafka.server.ReplicaManagerTest.testFencedErrorCausedByBecomeLeader(ReplicaManagerTest.scala:248)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
	at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
	at sun.reflect.GeneratedMethodAccessor112.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
	at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
	at sun.reflect.GeneratedMethodAccessor111.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182)
	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164)
	at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:412)
	at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
	at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)
	at java.lang.Thread.run(Thread.java:748)

Standard Output

[2020-03-24 17:39:19,377] ERROR [ReplicaAlterLogDirsThread-0]: Error due to (kafka.server.ReplicaAlterLogDirsThread:76)
org.apache.kafka.common.errors.ReplicaNotAvailableException: Future log for partition test-topic-0 is not available on broker 0
[2020-03-24 17:39:38,537] ERROR [ReplicaManager broker=0] Error processing append operation on partition test-topic-0 (kafka.server.ReplicaManager:76)
org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order sequence number for producerId 234 at offset 3 in partition test-topic-0: 13 (incoming seq. number), 2 (current end sequence number)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsancio Thanks for this report. Let me dig in it :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I update the Jira with the same information above.

// the partition is added to failedPartitions if fenced error happens
// if the thread is done before ReplicaManager#becomeLeaderOrFollower updates epoch,the fenced error does
// not happen and failedPartitions is empty.
if (replicaManager.replicaAlterLogDirsManager.failedPartitions.size != 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an assertion? Is the test not deterministic for some reason?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the thread is done before ReplicaManager#becomeLeaderOrFollower, the faced error does not happen.

replicaManager.replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
assertEquals(0, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
// send request again
replicaManager.alterReplicaLogDirs(Map(topicPartition -> newReplicaFolder.getAbsolutePath))
// the future folder exists so it fails to invoke thread
assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
}
} finally replicaManager.shutdown(checkpointHW = false)
}

@Test
def testReceiveOutOfOrderSequenceExceptionWithLogStartOffset(): Unit = {
val timer = new MockTimer
Expand Down Expand Up @@ -1279,6 +1331,7 @@ class ReplicaManagerTest {
isFuture = false)).once
}
EasyMock.expect(mockLogMgr.initializingLog(topicPartitionObj)).anyTimes
EasyMock.expect(mockLogMgr.getLog(topicPartitionObj, isFuture = true)).andReturn(None)

EasyMock.expect(mockLogMgr.finishedInitializingLog(
EasyMock.eq(topicPartitionObj), EasyMock.anyObject(), EasyMock.anyObject())).anyTimes
Expand Down Expand Up @@ -1469,7 +1522,7 @@ class ReplicaManagerTest {

private def setupReplicaManagerWithMockedPurgatories(timer: MockTimer, aliveBrokerIds: Seq[Int] = Seq(0, 1)): ReplicaManager = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
props.put("log.dirs", TestUtils.tempRelativeDir("data").getAbsolutePath + "," + TestUtils.tempRelativeDir("data2").getAbsolutePath)
val config = KafkaConfig.fromProps(props)
val logProps = new Properties()
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps))
Expand Down