Skip to content

Commit

Permalink
KAFKA-7415; Persist leader epoch and start offset on becoming a leader (
Browse files Browse the repository at this point in the history
#5678) (#5749)

This patch ensures that the leader epoch cache is updated when a broker becomes leader with the latest epoch and the log end offset as its starting offset. This guarantees that the leader will be able to provide the right truncation point even if the follower has data from leader epochs which the leader itself does not have. This situation can occur when there are back to back leader elections.

Additionally, we have made the following changes:

1. The leader epoch cache enforces monotonically increase epochs and starting offsets among its entry. Whenever a new entry is appended which violates requirement, we remove the conflicting entries from the cache.
2. Previously we returned an unknown epoch and offset if an epoch is queried which comes before the first entry in the cache. Now we return the smallest . For example, if the earliest entry in the cache is (epoch=5, startOffset=10), then a query for epoch 4 will return (epoch=4, endOffset=10). This ensures that followers (and consumers in KIP-320) can always determine where the correct starting point is for the active log range on the leader.

Reviewers: Jun Rao <[email protected]>
  • Loading branch information
Jason Gustafson authored Oct 5, 2018
1 parent f93d5bd commit 68b1d49
Show file tree
Hide file tree
Showing 17 changed files with 397 additions and 470 deletions.
11 changes: 10 additions & 1 deletion core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,17 @@ class Partition(val topic: String,
leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
zkVersion = partitionStateInfo.basePartitionState.zkVersion
val isNewLeader = leaderReplicaIdOpt.map(_ != localBrokerId).getOrElse(true)

// In the case of successive leader elections in a short time period, a follower may have
// entries in its log from a later epoch than any entry in the new leader's log. In order
// to ensure that these followers can truncate to the right offset, we must cache the new
// leader epoch and the start offset since it should be larger than any epoch that a follower
// would try to query.
leaderReplica.epochs.foreach { epochCache =>
epochCache.assign(leaderEpoch, leaderEpochStartOffset)
}

val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId)
val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
val curTimeMs = time.milliseconds
// initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/cluster/Replica.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kafka.cluster

import kafka.log.Log
import kafka.server.epoch.LeaderEpochFileCache
import kafka.utils.Logging
import kafka.server.{LogOffsetMetadata, LogReadResult}
import kafka.common.KafkaException
Expand Down Expand Up @@ -55,7 +56,7 @@ class Replica(val brokerId: Int,

def lastCaughtUpTimeMs = _lastCaughtUpTimeMs

val epochs = log.map(_.leaderEpochCache)
val epochs: Option[LeaderEpochFileCache] = log.map(_.leaderEpochCache)

info(s"Replica loaded for partition $topicPartition with initial high watermark $initialHighWatermarkValue")
log.foreach(_.onHighWatermarkIncremented(initialHighWatermarkValue))
Expand Down
20 changes: 10 additions & 10 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import com.yammer.metrics.core.Gauge
import org.apache.kafka.common.utils.{Time, Utils}
import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache}
import kafka.server.epoch.LeaderEpochFileCache
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
import java.util.Map.{Entry => JEntry}
Expand Down Expand Up @@ -208,7 +208,7 @@ class Log(@volatile var dir: File,
/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

@volatile private var _leaderEpochCache: LeaderEpochCache = initializeLeaderEpochCache()
@volatile private var _leaderEpochCache: LeaderEpochFileCache = initializeLeaderEpochCache()

locally {
val startMs = time.milliseconds
Expand All @@ -218,12 +218,12 @@ class Log(@volatile var dir: File,
/* Calculate the offset of the next message */
nextOffsetMetadata = new LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)

_leaderEpochCache.clearAndFlushLatest(nextOffsetMetadata.messageOffset)
_leaderEpochCache.truncateFromEnd(nextOffsetMetadata.messageOffset)

logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset)

// The earliest leader epoch may not be flushed during a hard failure. Recover it here.
_leaderEpochCache.clearAndFlushEarliest(logStartOffset)
_leaderEpochCache.truncateFromStart(logStartOffset)

loadProducerState(logEndOffset, reloadFromCleanShutdown = hasCleanShutdownFile)

Expand Down Expand Up @@ -271,11 +271,11 @@ class Log(@volatile var dir: File,

def leaderEpochCache = _leaderEpochCache

private def initializeLeaderEpochCache(): LeaderEpochCache = {
private def initializeLeaderEpochCache(): LeaderEpochFileCache = {
// create the log directory if it doesn't exist
Files.createDirectories(dir.toPath)
new LeaderEpochFileCache(topicPartition, () => logEndOffsetMetadata,
new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir), logDirFailureChannel))
val checkpointFile = new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir), logDirFailureChannel)
new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile)
}

private def removeTempFilesAndCollectSwapFiles(): Set[File] = {
Expand Down Expand Up @@ -352,7 +352,7 @@ class Log(@volatile var dir: File,
}
}

private def recoverSegment(segment: LogSegment, leaderEpochCache: Option[LeaderEpochCache] = None): Int = lock synchronized {
private def recoverSegment(segment: LogSegment, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = lock synchronized {
val stateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
stateManager.truncateAndReload(logStartOffset, segment.baseOffset, time.milliseconds)
logSegments(stateManager.mapEndOffset, segment.baseOffset).foreach { segment =>
Expand Down Expand Up @@ -830,7 +830,7 @@ class Log(@volatile var dir: File,
if (newLogStartOffset > logStartOffset) {
info(s"Incrementing log start offset to $newLogStartOffset")
logStartOffset = newLogStartOffset
_leaderEpochCache.clearAndFlushEarliest(logStartOffset)
_leaderEpochCache.truncateFromStart(logStartOffset)
producerStateManager.truncateHead(logStartOffset)
updateFirstUnstableOffset()
}
Expand Down Expand Up @@ -1513,7 +1513,7 @@ class Log(@volatile var dir: File,
updateLogEndOffset(targetOffset)
this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
this.logStartOffset = math.min(targetOffset, this.logStartOffset)
_leaderEpochCache.clearAndFlushLatest(targetOffset)
_leaderEpochCache.truncateFromEnd(targetOffset)
loadProducerState(targetOffset, reloadFromCleanShutdown = false)
}
true
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/log/LogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.nio.file.attribute.FileTime
import java.util.concurrent.TimeUnit

import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.server.epoch.LeaderEpochCache
import kafka.server.epoch.LeaderEpochFileCache
import kafka.server.{FetchDataInfo, LogOffsetMetadata}
import kafka.utils._
import org.apache.kafka.common.errors.CorruptRecordException
Expand Down Expand Up @@ -265,7 +265,7 @@ class LogSegment private[log] (val log: FileRecords,
* @return The number of bytes truncated from the log
*/
@nonthreadsafe
def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochCache] = None): Int = {
def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = {
offsetIndex.reset()
timeIndex.reset()
txnIndex.reset()
Expand Down Expand Up @@ -293,7 +293,7 @@ class LogSegment private[log] (val log: FileRecords,

if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
leaderEpochCache.foreach { cache =>
if (batch.partitionLeaderEpoch > cache.latestEpoch()) // this is to avoid unnecessary warning in cache.assign()
if (batch.partitionLeaderEpoch > cache.latestEpoch) // this is to avoid unnecessary warning in cache.assign()
cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
}
updateProducerState(producerStateManager, batch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import ReplicaAlterLogDirsThread.FetchRequest
import ReplicaAlterLogDirsThread.PartitionData
import kafka.api.Request
import kafka.server.QuotaFactory.UnboundedQuota
import kafka.server.epoch.LeaderEpochCache
import kafka.server.epoch.LeaderEpochFileCache
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{FileRecords, MemoryRecords}
Expand Down Expand Up @@ -58,7 +58,7 @@ class ReplicaAlterLogDirsThread(name: String,
private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
private val fetchSize = brokerConfig.replicaFetchMaxBytes

private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] = replicaMgr.getReplica(tp).map(_.epochs.get)
private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochFileCache] = replicaMgr.getReplica(tp).map(_.epochs.get)

def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
var partitionData: Seq[(TopicPartition, FetchResponse.PartitionData)] = null
Expand Down Expand Up @@ -148,7 +148,7 @@ class ReplicaAlterLogDirsThread(name: String,

val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (tp, epochCacheOpt) => epochCacheOpt.nonEmpty }

val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch() }
val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch }
ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet)
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import AbstractFetcherThread.ResultWithPartitions
import kafka.api.{FetchRequest => _, _}
import kafka.cluster.{BrokerEndPoint, Replica}
import kafka.server.ReplicaFetcherThread._
import kafka.server.epoch.LeaderEpochCache
import kafka.server.epoch.LeaderEpochFileCache
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.TopicPartition
Expand Down Expand Up @@ -79,7 +79,7 @@ class ReplicaFetcherThread(name: String,
private val shouldSendLeaderEpochRequest: Boolean = brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2
private val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)

private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] = replicaMgr.getReplica(tp).map(_.epochs.get)
private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochFileCache] = replicaMgr.getReplica(tp).map(_.epochs.get)

override def initiateShutdown(): Boolean = {
val justShutdown = super.initiateShutdown()
Expand Down Expand Up @@ -324,7 +324,7 @@ class ReplicaFetcherThread(name: String,
val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (tp, epochCacheOpt) => epochCacheOpt.nonEmpty }

debug(s"Build leaderEpoch request $partitionsWithEpoch")
val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch() }
val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch }
ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet)
}

Expand Down
Loading

0 comments on commit 68b1d49

Please sign in to comment.