Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16541 Fix potential leader-epoch checkpoint file corruption #15993

Merged
merged 13 commits into from
Jun 6, 2024
4 changes: 2 additions & 2 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,9 @@ InMemoryLeaderEpochCheckpoint getLeaderEpochCheckpoint(UnifiedLog log, long star
if (log.leaderEpochCache().isDefined()) {
LeaderEpochFileCache cache = log.leaderEpochCache().get().writeTo(checkpoint);
if (startOffset >= 0) {
cache.truncateFromStart(startOffset);
cache.truncateFromStart(startOffset, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the code a bit hard to understand. I am wondering if we could improve it. The existing usage of InMemoryLeaderEpochCheckpoint is kind of awkward. Its only purpose is to get a list of epoch entries from LEaderEpochCache within a specified offset range. But the way that it achieves the goal is indirect and complicated.

Instead of using InMemoryLeaderEpochCheckpoint, perhaps we could add a new method like List<EpochEntry> epochEntriesInRange(long startOffset, long endOffset) in LeaderEpochCache that returns a list of epoch entries within startOffset and endOffset. Then, we could pull the logic in readAsByteBuffer.InMemoryLeaderEpochCheckpoint() to a static method. This way, we don't need to add the flushSync option in truncateFromStart() and can get rid of InMemoryLeaderEpochCheckpoint. What do you think? cc @satishd

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the way that it achieves the goal is indirect and complicated.

Agree. I +1 for adding a method for directly retrieve necessary epoch entries for RLM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao Thank you for pointing out. I removed InMemoryLeaderEpochCheckpoint (and LeaderEpochCheckpoint interface as well) and refactored the PR based on that.

}
cache.truncateFromEnd(endOffset);
cache.truncateFromEnd(endOffset, true);
}
return checkpoint;
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/LogLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,14 @@ class LogLoader(
}
}

leaderEpochCache.ifPresent(_.truncateFromEnd(nextOffset))
leaderEpochCache.ifPresent(_.truncateFromEnd(nextOffset, false))
val newLogStartOffset = if (isRemoteLogEnabled) {
logStartOffsetCheckpoint
} else {
math.max(logStartOffsetCheckpoint, segments.firstSegment.get.baseOffset)
}
// The earliest leader epoch may not be flushed during a hard failure. Recover it here.
leaderEpochCache.ifPresent(_.truncateFromStart(logStartOffsetCheckpoint))
leaderEpochCache.ifPresent(_.truncateFromStart(logStartOffsetCheckpoint, false))

// Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
// from scratch.
Expand Down
67 changes: 47 additions & 20 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}

private def initializeLeaderEpochCache(): Unit = lock synchronized {
leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, recordVersion, logIdent)
leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
dir, topicPartition, logDirFailureChannel, recordVersion, logIdent, leaderEpochCache, scheduler)
}

private def updateHighWatermarkWithLogEndOffset(): Unit = {
Expand Down Expand Up @@ -1015,7 +1016,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
updatedLogStartOffset = true
updateLogStartOffset(newLogStartOffset)
info(s"Incremented log start offset to $newLogStartOffset due to $reason")
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
// We flush the change to the device in the background because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called as part of deleteRecords with holding UnifiedLog#lock.
// - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromStart call on log loading procedure, so it won't be a problem
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset, false))
producerStateManager.onLogStartOffsetIncremented(newLogStartOffset)
maybeIncrementFirstUnstableOffset()
}
Expand Down Expand Up @@ -1806,7 +1815,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// and inserted the first start offset entry, but then failed to append any entries
// before another leader was elected.
lock synchronized {
leaderEpochCache.foreach(_.truncateFromEnd(logEndOffset))
// We flush the change to the device in the background because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called by ReplicaFetcher threads, which could block replica fetching
// then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency.
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromEnd call on log loading procedure, so it won't be a problem
leaderEpochCache.foreach(_.truncateFromEnd(logEndOffset, false))
}

false
Expand All @@ -1819,7 +1836,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
} else {
val deletedSegments = localLog.truncateTo(targetOffset)
deleteProducerSnapshots(deletedSegments, asyncDelete = true)
leaderEpochCache.foreach(_.truncateFromEnd(targetOffset))
// We flush the change to the device in the background because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called by ReplicaFetcher threads, which could block replica fetching
// then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency.
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromEnd call on log loading procedure, so it won't be a problem
leaderEpochCache.foreach(_.truncateFromEnd(targetOffset, false))
logStartOffset = math.min(targetOffset, logStartOffset)
rebuildProducerState(targetOffset, producerStateManager)
if (highWatermark >= localLog.logEndOffset)
Expand Down Expand Up @@ -2004,12 +2029,17 @@ object UnifiedLog extends Logging {
Files.createDirectories(dir.toPath)
val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
val segments = new LogSegments(topicPartition)
// The created leaderEpochCache will be truncated by LogLoader if necessary
// so it is guaranteed that the epoch entries will be correct even when on-disk
// checkpoint was stale (due to async nature of LeaderEpochFileCache#truncateFromStart/End).
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
dir,
topicPartition,
logDirFailureChannel,
config.recordVersion,
s"[UnifiedLog partition=$topicPartition, dir=${dir.getParent}] ")
s"[UnifiedLog partition=$topicPartition, dir=${dir.getParent}] ",
None,
scheduler)
val producerStateManager = new ProducerStateManager(topicPartition, dir,
maxTransactionTimeoutMs, producerStateManagerConfig, time)
val isRemoteLogEnabled = UnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, topicPartition.topic)
Expand Down Expand Up @@ -2096,7 +2126,8 @@ object UnifiedLog extends Logging {
}

/**
* If the recordVersion is >= RecordVersion.V2, then create and return a LeaderEpochFileCache.
* If the recordVersion is >= RecordVersion.V2, create a new LeaderEpochFileCache instance.
* Loading the epoch entries from the backing checkpoint file or the provided currentCache if not empty.
* Otherwise, the message format is considered incompatible and the existing LeaderEpoch file
* is deleted.
*
Expand All @@ -2105,33 +2136,29 @@ object UnifiedLog extends Logging {
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
* @param recordVersion The record version
* @param logPrefix The logging prefix
* @param currentCache The current LeaderEpochFileCache instance (if any)
* @param scheduler The scheduler for executing asynchronous tasks
* @return The new LeaderEpochFileCache instance (if created), none otherwise
*/
def maybeCreateLeaderEpochCache(dir: File,
topicPartition: TopicPartition,
logDirFailureChannel: LogDirFailureChannel,
recordVersion: RecordVersion,
logPrefix: String): Option[LeaderEpochFileCache] = {
logPrefix: String,
currentCache: Option[LeaderEpochFileCache],
scheduler: Scheduler): Option[LeaderEpochFileCache] = {
val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)

def newLeaderEpochFileCache(): LeaderEpochFileCache = {
val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel)
new LeaderEpochFileCache(topicPartition, checkpointFile)
}

if (recordVersion.precedes(RecordVersion.V2)) {
val currentCache = if (leaderEpochFile.exists())
Some(newLeaderEpochFileCache())
else
None

if (currentCache.exists(_.nonEmpty))
if (leaderEpochFile.exists()) {
warn(s"${logPrefix}Deleting non-empty leader epoch cache due to incompatible message format $recordVersion")

}
Files.deleteIfExists(leaderEpochFile.toPath)
None
} else {
Some(newLeaderEpochFileCache())
val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel)
currentCache.map(_.withCheckpoint(checkpointFile))
.orElse(Some(new LeaderEpochFileCache(topicPartition, checkpointFile, scheduler)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureCh
def write(offsets: Map[TopicPartition, Long]): Unit = {
val list: java.util.List[(TopicPartition, Long)] = new java.util.ArrayList[(TopicPartition, Long)](offsets.size)
offsets.foreach(x => list.add(x))
checkpoint.write(list, true)
checkpoint.write(list)
}

def read(): Map[TopicPartition, Long] = {
Expand Down
Loading