-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9654 ReplicaAlterLogDirsThread can't be created again if the pr… #8223
Conversation
adminClientOpt.get, remainingTimeMs) | ||
Thread.sleep(100) | ||
remainingTimeMs = startTimeMs + timeoutMs - System.currentTimeMillis() | ||
if (proposedReplicaAssignment.nonEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the tests in ReassignPartitionsCommandTest cover this PR. However, ReassignPartitionsCommandTest is still not stable even if this PR fixes the ReplicaAlterLogDirsThread since ReassignPartitionsCommand does not have correct "wait" for replica folder change.
Instead of checking return value of Admin#alterReplicaLogDirs, this PR get the replica folder via Admin#describeLogDirs and make sure the target folder is existent and isFuture = false
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 : Thanks for the PR. Left a comment below.
@@ -278,7 +278,7 @@ class Partition(val topicPartition: TopicPartition, | |||
if (futureLogDir != logDir) | |||
throw new IllegalStateException(s"The future log dir $futureLogDir of $topicPartition is " + | |||
s"different from the requested log dir $logDir") | |||
false | |||
true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, not sure about this. A given topic partition is always hashed into the same ReplicaAlterLogDirsThread. So, if that thread is dead, adding the same partition to the same thread won't help.
Do you know why the ReplicaAlterLogDirsThread died? If it's a bug there, we probably should fix the logic in ReplicaAlterLogDirsThread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao thanks for the feedback!
Do you know why the ReplicaAlterLogDirsThread died? If it's a bug there, we probably should fix the logic in ReplicaAlterLogDirsThread.
- send request to alter replica dir
- future log is created
- ReplicaAlterLogDirsThread is created and it gets epoch N
- send request to change partition reassignment (but the new reassignment is same to current reassignment)
- KafkaController assumes the reassignment is completed
- LeaderAndIsrRequest is sent
- ReplicaManager#becomeLeaderOrFollower updates epoch of partition (from N to N + 1)
- ReplicaAlterLogDirsThread encounters FENCED_LEADER_EPOCH since its fetch request is stale
- ReplicaAlterLogDirsThread move the partition to failedPartitions
- ReplicaAlterLogDirsThread get idle
- ReplicaManager#becomeLeaderOrFollower shutdowns idle ReplicaAlterLogDirsThread
Since the Partition.futureLog is existent, RepliaManager#alterReplicaLogDirs does NOT add ReplicaAlterLogDirsThread for requests.
if (partition.maybeCreateFutureReplica(destinationDir, highWatermarkCheckpoints)) {
val futureLog = futureLocalLogOrException(topicPartition)
logManager.abortAndPauseCleaning(topicPartition)
val initialFetchState = InitialFetchState(BrokerEndPoint(config.brokerId, "localhost", -1),
partition.getLeaderEpoch, futureLog.highWatermark)
replicaAlterLogDirsManager.addFetcherForPartitions(Map(topicPartition -> initialFetchState))
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 : Thanks for the reply. Great finding! This seems to be a bug. So, in step 7, whenever we bump up the leader epoch in ReplicaManager#becomeLeaderOrFollower(). It seems that we should update the leader epoch for existing partitions in ReplicaAlterLogDirsThread. One way to do that is to first call replicaAlterLogDirsManager.removeFetcherForPartitions() and then addFetcherForPartitions() on that partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao thanks for the suggestions!
This issue is still existent even if we updates the partitions in ReplicaManager#becomeLeaderOrFollower. For example, the following execution order causes the updated partition is removed.
- ReplicaManager#updates the leader epoch of partition
- ReplicaAlterLogDirsThread gets epoch error due to (local) stale leader epoch
- ReplicaManager#updates the leader epoch of ReplicaAlterLogDirsThread
- ReplicaAlterLogDirsThread remove fenced partition and then get idle
It seems to me the root cause is that we do not change both parittion epoch and ReplicaAlterLogDirsThread epoch atomically. Hence, removing the fenced partition from ReplicaAlterLogDirsThread is dangerous to this issue. If we want to make ReplicaAlterLogDirsThread be recoverable from fenced error, the fenced partition should be added back as "delay" partition. Also, ReplicaManager#becomeLeaderOrFollower should update the epoch of ReplicaAlterLogDirsThread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 : Thanks for the reply. If a partition is removed (or re-inserted) from AbstractFetcherThread (which ReplicaAlterLogDirsThread inherits from), processFetchRequest() will ignore any pending fetch response for that partition. So, the scenario that you described should be handled even though we don't have the logic to change epoch atomically in multiple places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a partition is removed (or re-inserted) from AbstractFetcherThread (which ReplicaAlterLogDirsThread inherits from), processFetchRequest() will ignore any pending fetch response for that partition
Thanks for the feedback! This comment is totally right but the key point related to this issue is the partition is removed by AbstractFetcherThread#processFetchRequest().
case Errors.FENCED_LEADER_EPOCH =>
onPartitionFenced(topicPartition)
After the partition is removed, the thread becomes idle and then ReplicaManager#becomeLeaderOrFollower remove it.
replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)
replicaFetcherManager.shutdownIdleFetcherThreads()
replicaAlterLogDirsManager.shutdownIdleFetcherThreads() // this line
And noted that futureReplicasAndInitialOffset
is NOT empty only if there is "new" partition. In this case, there is no new partition so nothing is re-added to replicaAlterLogDirsManager
.
In short, the thread is shutdown and it leaves a future folder.
Another issue is ReplicaManager#alterReplicaLogDirs add new partition to ReplicaAlterLogDirsManager only if the future folder is NOT existent (and it is created successfully)
if (partition.maybeCreateFutureReplica(destinationDir, highWatermarkCheckpoints)) {
...
replicaAlterLogDirsManager.addFetcherForPartitions(Map(topicPartition -> initialFetchState))
}
So the following request to alter replica folder don't work util the future replica is removed.
dca4355
to
5cce842
Compare
def shouldMoveSinglePartitionToSameFolderWithinBroker(): Unit = shouldMoveSinglePartitionWithinBroker(true) | ||
|
||
@Test | ||
def shouldMoveSinglePartitionToDifferentFolderWithinBroker(): Unit = shouldMoveSinglePartitionWithinBroker(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving the replica folder to another location has chance to produce this issue so I separate the origin test case to two cases. The first case always move the folder to same location. Another case does move the folder to different location.
retest this please |
the flaky is traced by #8255 |
retest this please |
@junrao I upload a patch including a test case to show the scenario I described before. Please take a look :) |
replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset) | ||
// handle the partitions having new epoch | ||
replicaAlterLogDirsManager.updateEpoch(updatedPartitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the normal fetcher, we force the replica into a truncating phase after each epoch change. I'm trying to convince myself whether we need to do this for the log dir fetcher or not. Although we have similar log reconciliation logic implemented for the log dir fetcher, it seems we only exercise it when the log dir fetcher is first initialized for a partition. Outside of that, we have an asynchronous truncation path which gets executed after the current log is truncated upon becoming follower. If we think this is sufficient to guarantee consistency, then I'm wondering whether the epoch validation when the log dir fetcher is fetching against the active log is actually buying us anything. An alternative would be that we leave the epoch uninitialized which would cause us to skip epoch validation.
@junrao Any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji : Another thing is that epoch could potentially help in the case of unclean leader election.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave this a bit more thought. I think it would be safer to have the log dir fetcher mimic the regular fetcher when it comes to the truncation protocol. That said, I don't think we need to address this here. The fix in this patch addresses the main problem at the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 Thanks for the patch. Left a couple more comments.
@@ -255,7 +255,7 @@ abstract class AbstractFetcherThread(name: String, | |||
fetchOffsets.put(tp, offsetTruncationState) | |||
|
|||
case Errors.FENCED_LEADER_EPOCH => | |||
onPartitionFenced(tp) | |||
partitionsWithError += onPartitionFenced(tp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about this. If we know the partition has been fenced, why retry until the epoch has been updated? Alternatively, we could let it be marked failed and remove it when the epoch gets updated. The downside of the approach here is that a partition which is in a permanently failed state due to a stale epoch will not get reflected in the metric.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the partition in failed epoch is "removed", the thread get idle and is able to be "shutdown" by ReplicaManager#becomeLeaderOrFollower.
replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)
replicaFetcherManager.shutdownIdleFetcherThreads()
replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
And we can't create/update thread for the partition having future log...
if (partition.maybeCreateFutureReplica(destinationDir, highWatermarkCheckpoints)) {
val futureLog = futureLocalLogOrException(topicPartition)
logManager.abortAndPauseCleaning(topicPartition)
val initialFetchState = InitialFetchState(BrokerEndPoint(config.brokerId, "localhost", -1),
partition.getLeaderEpoch, futureLog.highWatermark)
replicaAlterLogDirsManager.addFetcherForPartitions(Map(topicPartition -> initialFetchState))
}
I had uploaded a patch to display above case (see https://github.com/apache/kafka/files/4322767/test_kafka_9654.txt)
It seems to me that it is hard to make sure whether the failed state is permanent or not. Instead of making alter thread recoverable from the failed state since, my previous patch is to make sure the alter thread is capable to be updated even if the target partition has future log.
def maybeCreateFutureReplica(logDir: String, highWatermarkCheckpoints: OffsetCheckpoints): Boolean = {
// The writeLock is needed to make sure that while the caller checks the log directory of the
// current replica and the existence of the future replica, no other thread can update the log directory of the
// current replica or remove the future replica.
inWriteLock(leaderIsrUpdateLock) {
val currentLogDir = localLogOrException.dir.getParent
if (currentLogDir == logDir) {
info(s"Current log directory $currentLogDir is same as requested log dir $logDir. " +
s"Skipping future replica creation.")
false
} else {
futureLog match {
case Some(partitionFutureLog) =>
val futureLogDir = partitionFutureLog.dir.getParent
if (futureLogDir != logDir)
throw new IllegalStateException(s"The future log dir $futureLogDir of $topicPartition is " +
s"different from the requested log dir $logDir")
false // change this to true
case None =>
createLogIfNotExists(Request.FutureLocalReplicaId, isNew = false, isFutureReplica = true, highWatermarkCheckpoints)
true
}
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand the complication, but it does not seem like a big problem. When we update the epoch in ReplicaAlterLogDirsManager
, we can first make a call to addFetcherForPartitions
, which creates the fetcher thread if needed and removes the partition from the failed state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji thanks for the feedback.
we can first make a call to addFetcherForPartitions, which creates the fetcher thread if needed and removes the partition from the failed state.
Do you mean it should re-add the updated partition to fetcher thread instead of updating only epoch?
replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset ++ updatedPartitions)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. That is how it works for the replica fetcher, for example. If a follower's epoch gets fenced, the partition is temporarily marked as failed. After we receive the LeaderAndIsr request and update the epoch, the partition is re-added to the fetcher manager.
// Send AlterReplicaLogDirsRequest again to make sure broker will start to move replica to the specified log directory. | ||
// It may take some time for controller to create replica in the broker. Retry if the replica has not been created. | ||
var remainingTimeMs = startTimeMs + timeoutMs - System.currentTimeMillis() | ||
def nonFutureReplicas() = adminClientOpt.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be reasonable to pull this change into a separate PR? I think the fix in the log dir fetcher is more important and a more isolated change will be easier to backport to older versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. trace this by https://issues.apache.org/jira/browse/KAFKA-9721
replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset) | ||
// handle the partitions having new epoch | ||
replicaAlterLogDirsManager.updateEpoch(updatedPartitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave this a bit more thought. I think it would be safer to have the log dir fetcher mimic the regular fetcher when it comes to the truncation protocol. That said, I don't think we need to address this here. The fix in this patch addresses the main problem at the moment.
update this PR with following changes
|
1dd4e3a
to
221a422
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. Left a few more comments.
info(s"Partition $tp has an older epoch ($currentLeaderEpoch) than the current leader. Will await " + | ||
s"the new LeaderAndIsr state before resuming fetching.") | ||
markPartitionFailed(tp) | ||
if (requestEpoch.forall(_ == currentLeaderEpoch)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be requestEpoch.contains(currentLeaderEpoch)
? If we got this error and we didn't provide an epoch, then it suggests an error on the leader. Maybe retrying is best in that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right !
@@ -531,7 +548,7 @@ abstract class AbstractFetcherThread(name: String, | |||
true | |||
} catch { | |||
case _: FencedLeaderEpochException => | |||
onPartitionFenced(topicPartition) | |||
onPartitionFenced(topicPartition, requestEpoch) | |||
true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we return the result of onPartitionFenced
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right again :)
TestUtils.waitUntilTrue(() => replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.values.forall(_.partitionCount() == 0), | ||
s"the partition=$topicPartition should be removed from pending state") | ||
// the partition is added to failedPartitions if fenced error happens | ||
if (replicaManager.replicaAlterLogDirsManager.failedPartitions.size != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be an assertion? Is the test not deterministic for some reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the thread is done before ReplicaManager#becomeLeaderOrFollower, the faced error does not happen.
…evious ReplicaAlterLogDirsThreadmeet encounters leader epoch error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the patch!
…derAndIsr (#8223) Currently when there is a leader change with a log dir reassignment in progress, we do not update the leader epoch in the partition state maintained by `ReplicaAlterLogDirsThread`. This can lead to a FENCED_LEADER_EPOCH error, which results in the partition being marked as failed, which is a permanent failure until the broker is restarted. This patch fixes the problem by updating the epoch in `ReplicaAlterLogDirsThread` after receiving a new LeaderAndIsr request from the controller. Reviewers: Jun Rao <[email protected]>, Jason Gustafson <[email protected]>
…derAndIsr (#8223) Currently when there is a leader change with a log dir reassignment in progress, we do not update the leader epoch in the partition state maintained by `ReplicaAlterLogDirsThread`. This can lead to a FENCED_LEADER_EPOCH error, which results in the partition being marked as failed, which is a permanent failure until the broker is restarted. This patch fixes the problem by updating the epoch in `ReplicaAlterLogDirsThread` after receiving a new LeaderAndIsr request from the controller. Reviewers: Jun Rao <[email protected]>, Jason Gustafson <[email protected]>
// change the epoch from 0 to 1 in order to make fenced error | ||
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1), (_, _) => ()) | ||
TestUtils.waitUntilTrue(() => replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.values.forall(_.partitionCount() == 0), | ||
s"the partition=$topicPartition should be removed from pending state") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 In our IC we are consistently getting a failure in this check. Do you have any suggestion on what is happening and how to fix it?
Error Message
org.scalatest.exceptions.TestFailedException: the partition=test-topic-0 should be removed from pending state
Stacktrace
org.scalatest.exceptions.TestFailedException: the partition=test-topic-0 should be removed from pending state
at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
at org.scalatest.Assertions.fail(Assertions.scala:1091)
at org.scalatest.Assertions.fail$(Assertions.scala:1087)
at org.scalatest.Assertions$.fail(Assertions.scala:1389)
at kafka.server.ReplicaManagerTest.testFencedErrorCausedByBecomeLeader(ReplicaManagerTest.scala:248)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.GeneratedMethodAccessor112.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
at sun.reflect.GeneratedMethodAccessor111.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182)
at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164)
at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:412)
at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)
at java.lang.Thread.run(Thread.java:748)
Standard Output
[2020-03-24 17:39:19,377] ERROR [ReplicaAlterLogDirsThread-0]: Error due to (kafka.server.ReplicaAlterLogDirsThread:76)
org.apache.kafka.common.errors.ReplicaNotAvailableException: Future log for partition test-topic-0 is not available on broker 0
[2020-03-24 17:39:38,537] ERROR [ReplicaManager broker=0] Error processing append operation on partition test-topic-0 (kafka.server.ReplicaManager:76)
org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order sequence number for producerId 234 at offset 3 in partition test-topic-0: 13 (incoming seq. number), 2 (current end sequence number)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jsancio Thanks for this report. Let me dig in it :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems there is a ticket (https://issues.apache.org/jira/browse/KAFKA-9750)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I update the Jira with the same information above.
…derAndIsr (#8223) Currently when there is a leader change with a log dir reassignment in progress, we do not update the leader epoch in the partition state maintained by `ReplicaAlterLogDirsThread`. This can lead to a FENCED_LEADER_EPOCH error, which results in the partition being marked as failed, which is a permanent failure until the broker is restarted. This patch fixes the problem by updating the epoch in `ReplicaAlterLogDirsThread` after receiving a new LeaderAndIsr request from the controller. Reviewers: Jun Rao <[email protected]>, Jason Gustafson <[email protected]>
…derAndIsr (#8223) Currently when there is a leader change with a log dir reassignment in progress, we do not update the leader epoch in the partition state maintained by `ReplicaAlterLogDirsThread`. This can lead to a FENCED_LEADER_EPOCH error, which results in the partition being marked as failed, which is a permanent failure until the broker is restarted. This patch fixes the problem by updating the epoch in `ReplicaAlterLogDirsThread` after receiving a new LeaderAndIsr request from the controller. Reviewers: Jun Rao <[email protected]>, Jason Gustafson <[email protected]>
…derAndIsr (#8223) Currently when there is a leader change with a log dir reassignment in progress, we do not update the leader epoch in the partition state maintained by `ReplicaAlterLogDirsThread`. This can lead to a FENCED_LEADER_EPOCH error, which results in the partition being marked as failed, which is a permanent failure until the broker is restarted. This patch fixes the problem by updating the epoch in `ReplicaAlterLogDirsThread` after receiving a new LeaderAndIsr request from the controller. Reviewers: Jun Rao <[email protected]>, Jason Gustafson <[email protected]>
…derAndIsr (apache#8223) Currently when there is a leader change with a log dir reassignment in progress, we do not update the leader epoch in the partition state maintained by `ReplicaAlterLogDirsThread`. This can lead to a FENCED_LEADER_EPOCH error, which results in the partition being marked as failed, which is a permanent failure until the broker is restarted. This patch fixes the problem by updating the epoch in `ReplicaAlterLogDirsThread` after receiving a new LeaderAndIsr request from the controller. Reviewers: Jun Rao <[email protected]>, Jason Gustafson <[email protected]>
ReplicaManager does create ReplicaAlterLogDirsThread only if an new future log is created. If the previous ReplicaAlterLogDirsThread encounters error when moving data, the target partition is moved to "failedPartitions" and ReplicaAlterLogDirsThread get idle due to empty partitions. The future log is still existent so we CAN'T either create another ReplicaAlterLogDirsThread to handle the parition or update the paritions of the idler ReplicaAlterLogDirsThread.
ReplicaManager should call ReplicaAlterLogDirsManager#addFetcherForPartitions even if there is already a future log since we can create an new ReplicaAlterLogDirsThread to handle the new partitions or update the partitions of existent ReplicaAlterLogDirsThread to make it busy again.
https://issues.apache.org/jira/browse/KAFKA-9654
Committer Checklist (excluded from commit message)