Skip to content

Commit

Permalink
KAFKA-9594: Add a separate lock to pause the follower log append whil…
Browse files Browse the repository at this point in the history
…e checking if the log dir could be replaced.

This PR adds new lock is used to prevent the follower replica from being updated while ReplicaAlterDirThread is executing maybeReplaceCurrentWithFutureReplica() to replace follower replica with the future replica.

Now doAppendRecordsToFollowerOrFutureReplica() doesn't need to hold the lock on leaderIsrUpdateLock for local replica updation and ongoing log appends on the follower will not delay the makeFollower() call.

**Benchmark results for Partition.makeFollower() **
Old:
```
Benchmark                                        Mode  Cnt     Score    Error  Units
PartitionMakeFollowerBenchmark.testMakeFollower  avgt    15  2046.967 ? 22.842  ns/op
```

New:
```
Benchmark                                        Mode  Cnt     Score   Error  Units
PartitionMakeFollowerBenchmark.testMakeFollower  avgt    15  1278.525 ? 5.354  ns/op
```

Author: Manikumar Reddy <[email protected]>

Reviewers: Jun Rao <[email protected]>

Closes #8153 from omkreddy/KAFKA-9594-LAISR
  • Loading branch information
omkreddy committed Feb 26, 2020
1 parent c178494 commit 922a95a
Show file tree
Hide file tree
Showing 3 changed files with 293 additions and 34 deletions.
75 changes: 42 additions & 33 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ case class SimpleAssignmentState(replicas: Seq[Int]) extends AssignmentState
* infrequent.
* 4) HW updates are synchronized using ISR read lock. @Log lock is acquired during the update with
* locking order Partition lock -> Log lock.
* 5) lock is used to prevent the follower replica from being updated while ReplicaAlterDirThread is
* executing maybeReplaceCurrentWithFutureReplica() to replace follower replica with the future replica.
*/
class Partition(val topicPartition: TopicPartition,
val replicaLagTimeMaxMs: Long,
Expand All @@ -203,6 +205,8 @@ class Partition(val topicPartition: TopicPartition,
private val remoteReplicasMap = new Pool[Int, Replica]
// The read lock is only required when multiple reads are executed and needs to be in a consistent manner
private val leaderIsrUpdateLock = new ReentrantReadWriteLock
// lock to prevent the follower replica log update while checking if the log dir could be replaced with future log.
private val futureLogLock = new Object()
private var zkVersion: Int = LeaderAndIsr.initialZKVersion
@volatile private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
// start offset for 'leaderEpoch' above (leader epoch of the current leader for this partition),
Expand Down Expand Up @@ -418,33 +422,36 @@ class Partition(val topicPartition: TopicPartition,
}
}

// Return true iff the future replica exists and it has caught up with the current replica for this partition
// Return true if the future replica exists and it has caught up with the current replica for this partition
// Only ReplicaAlterDirThread will call this method and ReplicaAlterDirThread should remove the partition
// from its partitionStates if this method returns true
def maybeReplaceCurrentWithFutureReplica(): Boolean = {
val localReplicaLEO = localLogOrException.logEndOffset
val futureReplicaLEO = futureLog.map(_.logEndOffset)
if (futureReplicaLEO.contains(localReplicaLEO)) {
// The write lock is needed to make sure that while ReplicaAlterDirThread checks the LEO of the
// current replica, no other thread can update LEO of the current replica via log truncation or log append operation.
inWriteLock(leaderIsrUpdateLock) {
futureLog match {
case Some(futurePartitionLog) =>
if (log.exists(_.logEndOffset == futurePartitionLog.logEndOffset)) {
logManager.replaceCurrentWithFutureLog(topicPartition)
log = futureLog
removeFutureLocalReplica(false)
true
} else false
case None =>
// Future replica is removed by a non-ReplicaAlterLogDirsThread before this method is called
// In this case the partition should have been removed from state of the ReplicaAlterLogDirsThread
// Return false so that ReplicaAlterLogDirsThread does not have to remove this partition from the
// state again to avoid race condition
false
// lock to prevent the log append by followers while checking if the log dir could be replaced with future log.
futureLogLock.synchronized {
val localReplicaLEO = localLogOrException.logEndOffset
val futureReplicaLEO = futureLog.map(_.logEndOffset)
if (futureReplicaLEO.contains(localReplicaLEO)) {
// The write lock is needed to make sure that while ReplicaAlterDirThread checks the LEO of the
// current replica, no other thread can update LEO of the current replica via log truncation or log append operation.
inWriteLock(leaderIsrUpdateLock) {
futureLog match {
case Some(futurePartitionLog) =>
if (log.exists(_.logEndOffset == futurePartitionLog.logEndOffset)) {
logManager.replaceCurrentWithFutureLog(topicPartition)
log = futureLog
removeFutureLocalReplica(false)
true
} else false
case None =>
// Future replica is removed by a non-ReplicaAlterLogDirsThread before this method is called
// In this case the partition should have been removed from state of the ReplicaAlterLogDirsThread
// Return false so that ReplicaAlterLogDirsThread does not have to remove this partition from the
// state again to avoid race condition
false
}
}
}
} else false
} else false
}
}

def delete(): Unit = {
Expand Down Expand Up @@ -540,7 +547,7 @@ class Partition(val topicPartition: TopicPartition,
* Make the local replica the follower by setting the new leader and ISR to empty
* If the leader replica id does not change and the new epoch is equal or one
* greater (that is, no updates have been missed), return false to indicate to the
* replica manager that state is already correct and the become-follower steps can be skipped
* replica manager that state is already correct and the become-follower steps can be skipped
*/
def makeFollower(controllerId: Int,
partitionState: LeaderAndIsrPartitionState,
Expand Down Expand Up @@ -911,16 +918,18 @@ class Partition(val topicPartition: TopicPartition,
}

private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean): Option[LogAppendInfo] = {
// The read lock is needed to handle race condition if request handler thread tries to
// remove future replica after receiving AlterReplicaLogDirsRequest.
inReadLock(leaderIsrUpdateLock) {
if (isFuture) {
if (isFuture) {
// The read lock is needed to handle race condition if request handler thread tries to
// remove future replica after receiving AlterReplicaLogDirsRequest.
inReadLock(leaderIsrUpdateLock) {
// Note the replica may be undefined if it is removed by a non-ReplicaAlterLogDirsThread before
// this method is called
futureLog.map { _.appendAsFollower(records) }
} else {
// The read lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread
// is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica.
}
} else {
// The lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread
// is executing maybeReplaceCurrentWithFutureReplica() to replace follower replica with the future replica.
futureLogLock.synchronized {
Some(localLogOrException.appendAsFollower(records))
}
}
Expand Down Expand Up @@ -1127,7 +1136,7 @@ class Partition(val topicPartition: TopicPartition,
*/
def truncateTo(offset: Long, isFuture: Boolean): Unit = {
// The read lock is needed to prevent the follower replica from being truncated while ReplicaAlterDirThread
// is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica.
// is executing maybeReplaceCurrentWithFutureReplica() to replace follower replica with the future replica.
inReadLock(leaderIsrUpdateLock) {
logManager.truncateTo(Map(topicPartition -> offset), isFuture = isFuture)
}
Expand All @@ -1141,7 +1150,7 @@ class Partition(val topicPartition: TopicPartition,
*/
def truncateFullyAndStartAt(newOffset: Long, isFuture: Boolean): Unit = {
// The read lock is needed to prevent the follower replica from being truncated while ReplicaAlterDirThread
// is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica.
// is executing maybeReplaceCurrentWithFutureReplica() to replace follower replica with the future replica.
inReadLock(leaderIsrUpdateLock) {
logManager.truncateFullyAndStartAt(topicPartition, newOffset, isFuture = isFuture)
}
Expand Down
77 changes: 76 additions & 1 deletion core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package kafka.cluster

import java.nio.ByteBuffer
import java.util.{Optional, Properties}
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit, TimeoutException}
import java.util.concurrent.{CountDownLatch, Executors, Semaphore, TimeUnit, TimeoutException}
import java.util.concurrent.atomic.AtomicBoolean

import com.yammer.metrics.core.Metric
Expand All @@ -27,6 +27,7 @@ import kafka.common.UnexpectedAppendOffsetException
import kafka.log.{Defaults => _, _}
import kafka.metrics.KafkaYammerMetrics
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
import kafka.utils._
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.apache.kafka.common.errors.{ApiException, OffsetNotAvailableException, ReplicaNotAvailableException}
Expand Down Expand Up @@ -135,6 +136,59 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(None, partition.futureLog)
}

// Verify that partition.makeFollower() and partition.appendRecordsToFollowerOrFutureReplica() can run concurrently
@Test
def testMakeFollowerWithWithFollowerAppendRecords(): Unit = {
val appendSemaphore = new Semaphore(0)
val mockTime = new MockTime()

partition = new Partition(
topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
stateStore,
delayedOperations,
metadataCache,
logManager) {

override def createLog(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
val log = super.createLog(replicaId, isNew, isFutureReplica, offsetCheckpoints)
new SlowLog(log, mockTime, appendSemaphore)
}
}

partition.createLogIfNotExists(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)

val appendThread = new Thread {
override def run(): Unit = {
val records = createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes)),
baseOffset = 0)
partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
}
}
appendThread.start()
TestUtils.waitUntilTrue(() => appendSemaphore.hasQueuedThreads, "follower log append is not called.")

val partitionState = new LeaderAndIsrPartitionState()
.setControllerEpoch(0)
.setLeader(2)
.setLeaderEpoch(1)
.setIsr(List[Integer](0, 1, 2, brokerId).asJava)
.setZkVersion(1)
.setReplicas(List[Integer](0, 1, 2, brokerId).asJava)
.setIsNew(false)
assertTrue(partition.makeFollower(0, partitionState, 0, offsetCheckpoints))

appendSemaphore.release()
appendThread.join()

assertEquals(2L, partition.localLogOrException.logEndOffset)
assertEquals(2L, partition.leaderReplicaIdOpt.get)
}

@Test
// Verify that replacement works when the replicas have the same log end offset but different base offsets in the
// active segment
Expand Down Expand Up @@ -1661,4 +1715,25 @@ class PartitionTest extends AbstractPartitionTest {
log.appendAsLeader(records, leaderEpoch)
}
}

private class SlowLog(log: Log, mockTime: MockTime, appendSemaphore: Semaphore) extends Log(
log.dir,
log.config,
log.logStartOffset,
log.recoveryPoint,
mockTime.scheduler,
new BrokerTopicStats,
log.time,
log.maxProducerIdExpirationMs,
log.producerIdExpirationCheckIntervalMs,
log.topicPartition,
log.producerStateManager,
new LogDirFailureChannel(1)) {

override def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
appendSemaphore.acquire()
val appendInfo = super.appendAsFollower(records)
appendInfo
}
}
}
Loading

0 comments on commit 922a95a

Please sign in to comment.