Skip to content

Commit

Permalink
KAFKA-16541 Fix potential leader-epoch checkpoint file corruption (ap…
Browse files Browse the repository at this point in the history
…ache#15993)

A patch for KAFKA-15046 got rid of fsync on LeaderEpochFileCache#truncateFromStart/End for performance reason, but it turned out this could cause corrupted leader-epoch checkpoint file on ungraceful OS shutdown, i.e. OS shuts down in the middle when kernel is writing dirty pages back to the device.

To address this problem, this PR makes below changes: (1) Revert LeaderEpochCheckpoint#write to always fsync
(2) truncateFromStart/End now call LeaderEpochCheckpoint#write asynchronously on scheduler thread
(3) UnifiedLog#maybeCreateLeaderEpochCache now loads epoch entries from checkpoint file only when current cache is absent

Reviewers: Jun Rao <[email protected]>
  • Loading branch information
ocadaruma authored and TaiJuWu committed Jun 8, 2024
1 parent 7ca26b6 commit 2bdd281
Show file tree
Hide file tree
Showing 22 changed files with 364 additions and 358 deletions.
46 changes: 31 additions & 15 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.CheckpointFile;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager;
Expand All @@ -61,7 +62,7 @@
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.AbortedTxn;
import org.apache.kafka.storage.internals.log.EpochEntry;
Expand All @@ -83,12 +84,16 @@
import scala.Option;
import scala.collection.JavaConverters;

import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.security.PrivilegedAction;
import java.util.ArrayList;
Expand Down Expand Up @@ -612,25 +617,23 @@ public boolean isCancelled() {
}

/**
* Returns the leader epoch checkpoint by truncating with the given start[exclusive] and end[inclusive] offset
* Returns the leader epoch entries within the range of the given start[exclusive] and end[inclusive] offset.
* <p>
* Visible for testing.
*
* @param log The actual log from where to take the leader-epoch checkpoint
* @param startOffset The start offset of the checkpoint file (exclusive in the truncation).
* @param startOffset The start offset of the epoch entries (inclusive).
* If start offset is 6, then it will retain an entry at offset 6.
* @param endOffset The end offset of the checkpoint file (inclusive in the truncation)
* @param endOffset The end offset of the epoch entries (exclusive)
* If end offset is 100, then it will remove the entries greater than or equal to 100.
* @return the truncated leader epoch checkpoint
* @return the leader epoch entries
*/
InMemoryLeaderEpochCheckpoint getLeaderEpochCheckpoint(UnifiedLog log, long startOffset, long endOffset) {
InMemoryLeaderEpochCheckpoint checkpoint = new InMemoryLeaderEpochCheckpoint();
List<EpochEntry> getLeaderEpochEntries(UnifiedLog log, long startOffset, long endOffset) {
if (log.leaderEpochCache().isDefined()) {
LeaderEpochFileCache cache = log.leaderEpochCache().get().writeTo(checkpoint);
if (startOffset >= 0) {
cache.truncateFromStart(startOffset);
}
cache.truncateFromEnd(endOffset);
return log.leaderEpochCache().get().epochEntriesInRange(startOffset, endOffset);
} else {
return Collections.emptyList();
}
return checkpoint;
}

class RLMTask extends CancellableRunnable {
Expand Down Expand Up @@ -788,7 +791,7 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment
long endOffset = nextSegmentBaseOffset - 1;
File producerStateSnapshotFile = log.producerStateManager().fetchSnapshot(nextSegmentBaseOffset).orElse(null);

List<EpochEntry> epochEntries = getLeaderEpochCheckpoint(log, segment.baseOffset(), nextSegmentBaseOffset).read();
List<EpochEntry> epochEntries = getLeaderEpochEntries(log, segment.baseOffset(), nextSegmentBaseOffset);
Map<Integer, Long> segmentLeaderEpochs = new HashMap<>(epochEntries.size());
epochEntries.forEach(entry -> segmentLeaderEpochs.put(entry.epoch, entry.startOffset));

Expand All @@ -798,7 +801,7 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment

remoteLogMetadataManager.addRemoteLogSegmentMetadata(copySegmentStartedRlsm).get();

ByteBuffer leaderEpochsIndex = getLeaderEpochCheckpoint(log, -1, nextSegmentBaseOffset).readAsByteBuffer();
ByteBuffer leaderEpochsIndex = epochEntriesAsByteBuffer(getLeaderEpochEntries(log, -1, nextSegmentBaseOffset));
LogSegmentData segmentData = new LogSegmentData(logFile.toPath(), toPathIfExists(segment.offsetIndex().file()),
toPathIfExists(segment.timeIndex().file()), Optional.ofNullable(toPathIfExists(segment.txnIndex().file())),
producerStateSnapshotFile.toPath(), leaderEpochsIndex);
Expand Down Expand Up @@ -1751,6 +1754,19 @@ private static void shutdownAndAwaitTermination(ExecutorService pool, String poo
LOGGER.info("Shutting down of thread pool {} is completed", poolName);
}

//Visible for testing
static ByteBuffer epochEntriesAsByteBuffer(List<EpochEntry> epochEntries) throws IOException {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(stream, StandardCharsets.UTF_8))) {
CheckpointFile.CheckpointWriteBuffer<EpochEntry> writeBuffer =
new CheckpointFile.CheckpointWriteBuffer<>(writer, 0, LeaderEpochCheckpointFile.FORMATTER);
writeBuffer.write(epochEntries);
writer.flush();
}

return ByteBuffer.wrap(stream.toByteArray());
}

private void removeRemoteTopicPartitionMetrics(TopicIdPartition topicIdPartition) {
String topic = topicIdPartition.topic();
if (!brokerTopicStats.isTopicStatsExisted(topicIdPartition.topic())) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/LogLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,14 @@ class LogLoader(
}
}

leaderEpochCache.ifPresent(_.truncateFromEnd(nextOffset))
leaderEpochCache.ifPresent(_.truncateFromEndAsyncFlush(nextOffset))
val newLogStartOffset = if (isRemoteLogEnabled) {
logStartOffsetCheckpoint
} else {
math.max(logStartOffsetCheckpoint, segments.firstSegment.get.baseOffset)
}
// The earliest leader epoch may not be flushed during a hard failure. Recover it here.
leaderEpochCache.ifPresent(_.truncateFromStart(logStartOffsetCheckpoint))
leaderEpochCache.ifPresent(_.truncateFromStartAsyncFlush(logStartOffsetCheckpoint))

// Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
// from scratch.
Expand Down
43 changes: 23 additions & 20 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}

private def initializeLeaderEpochCache(): Unit = lock synchronized {
leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, recordVersion, logIdent)
leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
dir, topicPartition, logDirFailureChannel, recordVersion, logIdent, leaderEpochCache, scheduler)
}

private def updateHighWatermarkWithLogEndOffset(): Unit = {
Expand Down Expand Up @@ -1015,7 +1016,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
updatedLogStartOffset = true
updateLogStartOffset(newLogStartOffset)
info(s"Incremented log start offset to $newLogStartOffset due to $reason")
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
leaderEpochCache.foreach(_.truncateFromStartAsyncFlush(logStartOffset))
producerStateManager.onLogStartOffsetIncremented(newLogStartOffset)
maybeIncrementFirstUnstableOffset()
}
Expand Down Expand Up @@ -1813,7 +1814,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// and inserted the first start offset entry, but then failed to append any entries
// before another leader was elected.
lock synchronized {
leaderEpochCache.foreach(_.truncateFromEnd(logEndOffset))
leaderEpochCache.foreach(_.truncateFromEndAsyncFlush(logEndOffset))
}

false
Expand All @@ -1826,7 +1827,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
} else {
val deletedSegments = localLog.truncateTo(targetOffset)
deleteProducerSnapshots(deletedSegments, asyncDelete = true)
leaderEpochCache.foreach(_.truncateFromEnd(targetOffset))
leaderEpochCache.foreach(_.truncateFromEndAsyncFlush(targetOffset))
logStartOffset = math.min(targetOffset, logStartOffset)
rebuildProducerState(targetOffset, producerStateManager)
if (highWatermark >= localLog.logEndOffset)
Expand Down Expand Up @@ -2011,12 +2012,17 @@ object UnifiedLog extends Logging {
Files.createDirectories(dir.toPath)
val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
val segments = new LogSegments(topicPartition)
// The created leaderEpochCache will be truncated by LogLoader if necessary
// so it is guaranteed that the epoch entries will be correct even when on-disk
// checkpoint was stale (due to async nature of LeaderEpochFileCache#truncateFromStart/End).
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
dir,
topicPartition,
logDirFailureChannel,
config.recordVersion,
s"[UnifiedLog partition=$topicPartition, dir=${dir.getParent}] ")
s"[UnifiedLog partition=$topicPartition, dir=${dir.getParent}] ",
None,
scheduler)
val producerStateManager = new ProducerStateManager(topicPartition, dir,
maxTransactionTimeoutMs, producerStateManagerConfig, time)
val isRemoteLogEnabled = UnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, topicPartition.topic)
Expand Down Expand Up @@ -2103,7 +2109,8 @@ object UnifiedLog extends Logging {
}

/**
* If the recordVersion is >= RecordVersion.V2, then create and return a LeaderEpochFileCache.
* If the recordVersion is >= RecordVersion.V2, create a new LeaderEpochFileCache instance.
* Loading the epoch entries from the backing checkpoint file or the provided currentCache if not empty.
* Otherwise, the message format is considered incompatible and the existing LeaderEpoch file
* is deleted.
*
Expand All @@ -2112,33 +2119,29 @@ object UnifiedLog extends Logging {
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
* @param recordVersion The record version
* @param logPrefix The logging prefix
* @param currentCache The current LeaderEpochFileCache instance (if any)
* @param scheduler The scheduler for executing asynchronous tasks
* @return The new LeaderEpochFileCache instance (if created), none otherwise
*/
def maybeCreateLeaderEpochCache(dir: File,
topicPartition: TopicPartition,
logDirFailureChannel: LogDirFailureChannel,
recordVersion: RecordVersion,
logPrefix: String): Option[LeaderEpochFileCache] = {
logPrefix: String,
currentCache: Option[LeaderEpochFileCache],
scheduler: Scheduler): Option[LeaderEpochFileCache] = {
val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)

def newLeaderEpochFileCache(): LeaderEpochFileCache = {
val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel)
new LeaderEpochFileCache(topicPartition, checkpointFile)
}

if (recordVersion.precedes(RecordVersion.V2)) {
val currentCache = if (leaderEpochFile.exists())
Some(newLeaderEpochFileCache())
else
None

if (currentCache.exists(_.nonEmpty))
if (leaderEpochFile.exists()) {
warn(s"${logPrefix}Deleting non-empty leader epoch cache due to incompatible message format $recordVersion")

}
Files.deleteIfExists(leaderEpochFile.toPath)
None
} else {
Some(newLeaderEpochFileCache())
val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel)
currentCache.map(_.withCheckpoint(checkpointFile))
.orElse(Some(new LeaderEpochFileCache(topicPartition, checkpointFile, scheduler)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureCh
def write(offsets: Map[TopicPartition, Long]): Unit = {
val list: java.util.List[(TopicPartition, Long)] = new java.util.ArrayList[(TopicPartition, Long)](offsets.size)
offsets.foreach(x => list.add(x))
checkpoint.write(list, true)
checkpoint.write(list)
}

def read(): Map[TopicPartition, Long] = {
Expand Down
Loading

0 comments on commit 2bdd281

Please sign in to comment.