Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7415; Persist leader epoch and start offset on becoming a leader #5678

Merged
merged 4 commits into from
Oct 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,17 @@ class Partition(val topic: String,
leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
zkVersion = partitionStateInfo.basePartitionState.zkVersion
val isNewLeader = leaderReplicaIdOpt.map(_ != localBrokerId).getOrElse(true)

// In the case of successive leader elections in a short time period, a follower may have
// entries in its log from a later epoch than any entry in the new leader's log. In order
// to ensure that these followers can truncate to the right offset, we must cache the new
// leader epoch and the start offset since it should be larger than any epoch that a follower
// would try to query.
leaderReplica.epochs.foreach { epochCache =>
epochCache.assign(leaderEpoch, leaderEpochStartOffset)
}

val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId)
val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
val curTimeMs = time.milliseconds
// initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/cluster/Replica.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package kafka.cluster

import kafka.log.Log
import kafka.server.epoch.LeaderEpochCache
import kafka.server.epoch.LeaderEpochFileCache
import kafka.utils.Logging
import kafka.server.{LogOffsetMetadata, LogReadResult}
import org.apache.kafka.common.{KafkaException, TopicPartition}
Expand Down Expand Up @@ -55,7 +55,7 @@ class Replica(val brokerId: Int,

def lastCaughtUpTimeMs: Long = _lastCaughtUpTimeMs

val epochs: Option[LeaderEpochCache] = log.map(_.leaderEpochCache)
val epochs: Option[LeaderEpochFileCache] = log.map(_.leaderEpochCache)

info(s"Replica loaded for partition $topicPartition with initial high watermark $initialHighWatermarkValue")
log.foreach(_.onHighWatermarkIncremented(initialHighWatermarkValue))
Expand Down
24 changes: 12 additions & 12 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ import kafka.common.{LogSegmentOffsetOverflowException, LongRef, OffsetsOutOfOrd
import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache}
import kafka.server.epoch.LeaderEpochFileCache
import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
import kafka.utils._
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.{CorruptRecordException, InvalidOffsetException, KafkaStorageException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
import org.apache.kafka.common.requests.{IsolationLevel, ListOffsetRequest}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
Expand Down Expand Up @@ -229,7 +229,7 @@ class Log(@volatile var dir: File,
/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

@volatile private var _leaderEpochCache: LeaderEpochCache = initializeLeaderEpochCache()
@volatile private var _leaderEpochCache: LeaderEpochFileCache = initializeLeaderEpochCache()

locally {
val startMs = time.milliseconds
Expand All @@ -239,12 +239,12 @@ class Log(@volatile var dir: File,
/* Calculate the offset of the next message */
nextOffsetMetadata = new LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)

_leaderEpochCache.clearAndFlushLatest(nextOffsetMetadata.messageOffset)
_leaderEpochCache.truncateFromEnd(nextOffsetMetadata.messageOffset)

logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset)

// The earliest leader epoch may not be flushed during a hard failure. Recover it here.
_leaderEpochCache.clearAndFlushEarliest(logStartOffset)
_leaderEpochCache.truncateFromStart(logStartOffset)

// Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
// from scratch.
Expand Down Expand Up @@ -296,11 +296,11 @@ class Log(@volatile var dir: File,

def leaderEpochCache = _leaderEpochCache

private def initializeLeaderEpochCache(): LeaderEpochCache = {
private def initializeLeaderEpochCache(): LeaderEpochFileCache = {
// create the log directory if it doesn't exist
Files.createDirectories(dir.toPath)
new LeaderEpochFileCache(topicPartition, () => logEndOffsetMetadata,
new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir), logDirFailureChannel))
val checkpointFile = new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir), logDirFailureChannel)
new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile)
}

/**
Expand Down Expand Up @@ -422,7 +422,7 @@ class Log(@volatile var dir: File,
* @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index offset overflow
*/
private def recoverSegment(segment: LogSegment,
leaderEpochCache: Option[LeaderEpochCache] = None): Int = lock synchronized {
leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = lock synchronized {
val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
rebuildProducerState(segment.baseOffset, reloadFromCleanShutdown = false, producerStateManager)
val bytesTruncated = segment.recover(producerStateManager, leaderEpochCache)
Expand Down Expand Up @@ -941,7 +941,7 @@ class Log(@volatile var dir: File,
if (newLogStartOffset > logStartOffset) {
info(s"Incrementing log start offset to $newLogStartOffset")
logStartOffset = newLogStartOffset
_leaderEpochCache.clearAndFlushEarliest(logStartOffset)
_leaderEpochCache.truncateFromStart(logStartOffset)
producerStateManager.truncateHead(logStartOffset)
updateFirstUnstableOffset()
}
Expand Down Expand Up @@ -1645,7 +1645,7 @@ class Log(@volatile var dir: File,
updateLogEndOffset(targetOffset)
this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
this.logStartOffset = math.min(targetOffset, this.logStartOffset)
_leaderEpochCache.clearAndFlushLatest(targetOffset)
_leaderEpochCache.truncateFromEnd(targetOffset)
loadProducerState(targetOffset, reloadFromCleanShutdown = false)
}
true
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/LogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit

import kafka.common.LogSegmentOffsetOverflowException
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.server.epoch.LeaderEpochCache
import kafka.server.epoch.LeaderEpochFileCache
import kafka.server.{FetchDataInfo, LogOffsetMetadata}
import kafka.utils._
import org.apache.kafka.common.errors.CorruptRecordException
Expand Down Expand Up @@ -330,7 +330,7 @@ class LogSegment private[log] (val log: FileRecords,
* @throws LogSegmentOffsetOverflowException if the log segment contains an offset that causes the index offset to overflow
*/
@nonthreadsafe
def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochCache] = None): Int = {
def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = {
offsetIndex.reset()
timeIndex.reset()
txnIndex.reset()
Expand Down
Loading