diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 5b0d91ff439f7..c524238f62341 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -46,6 +46,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager;
@@ -61,7 +62,7 @@
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
-import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.AbortedTxn;
import org.apache.kafka.storage.internals.log.EpochEntry;
@@ -83,12 +84,16 @@
import scala.Option;
import scala.collection.JavaConverters;
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStreamWriter;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.security.PrivilegedAction;
import java.util.ArrayList;
@@ -612,25 +617,23 @@ public boolean isCancelled() {
}
/**
- * Returns the leader epoch checkpoint by truncating with the given start[exclusive] and end[inclusive] offset
+ * Returns the leader epoch entries within the range of the given start[exclusive] and end[inclusive] offset.
+ *
+ * Visible for testing.
*
* @param log The actual log from where to take the leader-epoch checkpoint
- * @param startOffset The start offset of the checkpoint file (exclusive in the truncation).
+ * @param startOffset The start offset of the epoch entries (inclusive).
* If start offset is 6, then it will retain an entry at offset 6.
- * @param endOffset The end offset of the checkpoint file (inclusive in the truncation)
+ * @param endOffset The end offset of the epoch entries (exclusive)
* If end offset is 100, then it will remove the entries greater than or equal to 100.
- * @return the truncated leader epoch checkpoint
+ * @return the leader epoch entries
*/
- InMemoryLeaderEpochCheckpoint getLeaderEpochCheckpoint(UnifiedLog log, long startOffset, long endOffset) {
- InMemoryLeaderEpochCheckpoint checkpoint = new InMemoryLeaderEpochCheckpoint();
+ List getLeaderEpochEntries(UnifiedLog log, long startOffset, long endOffset) {
if (log.leaderEpochCache().isDefined()) {
- LeaderEpochFileCache cache = log.leaderEpochCache().get().writeTo(checkpoint);
- if (startOffset >= 0) {
- cache.truncateFromStart(startOffset);
- }
- cache.truncateFromEnd(endOffset);
+ return log.leaderEpochCache().get().epochEntriesInRange(startOffset, endOffset);
+ } else {
+ return Collections.emptyList();
}
- return checkpoint;
}
class RLMTask extends CancellableRunnable {
@@ -788,7 +791,7 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment
long endOffset = nextSegmentBaseOffset - 1;
File producerStateSnapshotFile = log.producerStateManager().fetchSnapshot(nextSegmentBaseOffset).orElse(null);
- List epochEntries = getLeaderEpochCheckpoint(log, segment.baseOffset(), nextSegmentBaseOffset).read();
+ List epochEntries = getLeaderEpochEntries(log, segment.baseOffset(), nextSegmentBaseOffset);
Map segmentLeaderEpochs = new HashMap<>(epochEntries.size());
epochEntries.forEach(entry -> segmentLeaderEpochs.put(entry.epoch, entry.startOffset));
@@ -798,7 +801,7 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment
remoteLogMetadataManager.addRemoteLogSegmentMetadata(copySegmentStartedRlsm).get();
- ByteBuffer leaderEpochsIndex = getLeaderEpochCheckpoint(log, -1, nextSegmentBaseOffset).readAsByteBuffer();
+ ByteBuffer leaderEpochsIndex = epochEntriesAsByteBuffer(getLeaderEpochEntries(log, -1, nextSegmentBaseOffset));
LogSegmentData segmentData = new LogSegmentData(logFile.toPath(), toPathIfExists(segment.offsetIndex().file()),
toPathIfExists(segment.timeIndex().file()), Optional.ofNullable(toPathIfExists(segment.txnIndex().file())),
producerStateSnapshotFile.toPath(), leaderEpochsIndex);
@@ -1751,6 +1754,19 @@ private static void shutdownAndAwaitTermination(ExecutorService pool, String poo
LOGGER.info("Shutting down of thread pool {} is completed", poolName);
}
+ //Visible for testing
+ static ByteBuffer epochEntriesAsByteBuffer(List epochEntries) throws IOException {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(stream, StandardCharsets.UTF_8))) {
+ CheckpointFile.CheckpointWriteBuffer writeBuffer =
+ new CheckpointFile.CheckpointWriteBuffer<>(writer, 0, LeaderEpochCheckpointFile.FORMATTER);
+ writeBuffer.write(epochEntries);
+ writer.flush();
+ }
+
+ return ByteBuffer.wrap(stream.toByteArray());
+ }
+
private void removeRemoteTopicPartitionMetrics(TopicIdPartition topicIdPartition) {
String topic = topicIdPartition.topic();
if (!brokerTopicStats.isTopicStatsExisted(topicIdPartition.topic())) {
diff --git a/core/src/main/scala/kafka/log/LogLoader.scala b/core/src/main/scala/kafka/log/LogLoader.scala
index b0f1fdd0e1ca2..b3b0ec2c63362 100644
--- a/core/src/main/scala/kafka/log/LogLoader.scala
+++ b/core/src/main/scala/kafka/log/LogLoader.scala
@@ -173,14 +173,14 @@ class LogLoader(
}
}
- leaderEpochCache.ifPresent(_.truncateFromEnd(nextOffset))
+ leaderEpochCache.ifPresent(_.truncateFromEndAsyncFlush(nextOffset))
val newLogStartOffset = if (isRemoteLogEnabled) {
logStartOffsetCheckpoint
} else {
math.max(logStartOffsetCheckpoint, segments.firstSegment.get.baseOffset)
}
// The earliest leader epoch may not be flushed during a hard failure. Recover it here.
- leaderEpochCache.ifPresent(_.truncateFromStart(logStartOffsetCheckpoint))
+ leaderEpochCache.ifPresent(_.truncateFromStartAsyncFlush(logStartOffsetCheckpoint))
// Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
// from scratch.
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index ba1c8656a8848..bef18806b0dc8 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -521,7 +521,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
private def initializeLeaderEpochCache(): Unit = lock synchronized {
- leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, recordVersion, logIdent)
+ leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
+ dir, topicPartition, logDirFailureChannel, recordVersion, logIdent, leaderEpochCache, scheduler)
}
private def updateHighWatermarkWithLogEndOffset(): Unit = {
@@ -1015,7 +1016,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
updatedLogStartOffset = true
updateLogStartOffset(newLogStartOffset)
info(s"Incremented log start offset to $newLogStartOffset due to $reason")
- leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
+ leaderEpochCache.foreach(_.truncateFromStartAsyncFlush(logStartOffset))
producerStateManager.onLogStartOffsetIncremented(newLogStartOffset)
maybeIncrementFirstUnstableOffset()
}
@@ -1813,7 +1814,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// and inserted the first start offset entry, but then failed to append any entries
// before another leader was elected.
lock synchronized {
- leaderEpochCache.foreach(_.truncateFromEnd(logEndOffset))
+ leaderEpochCache.foreach(_.truncateFromEndAsyncFlush(logEndOffset))
}
false
@@ -1826,7 +1827,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
} else {
val deletedSegments = localLog.truncateTo(targetOffset)
deleteProducerSnapshots(deletedSegments, asyncDelete = true)
- leaderEpochCache.foreach(_.truncateFromEnd(targetOffset))
+ leaderEpochCache.foreach(_.truncateFromEndAsyncFlush(targetOffset))
logStartOffset = math.min(targetOffset, logStartOffset)
rebuildProducerState(targetOffset, producerStateManager)
if (highWatermark >= localLog.logEndOffset)
@@ -2011,12 +2012,17 @@ object UnifiedLog extends Logging {
Files.createDirectories(dir.toPath)
val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
val segments = new LogSegments(topicPartition)
+ // The created leaderEpochCache will be truncated by LogLoader if necessary
+ // so it is guaranteed that the epoch entries will be correct even when on-disk
+ // checkpoint was stale (due to async nature of LeaderEpochFileCache#truncateFromStart/End).
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
dir,
topicPartition,
logDirFailureChannel,
config.recordVersion,
- s"[UnifiedLog partition=$topicPartition, dir=${dir.getParent}] ")
+ s"[UnifiedLog partition=$topicPartition, dir=${dir.getParent}] ",
+ None,
+ scheduler)
val producerStateManager = new ProducerStateManager(topicPartition, dir,
maxTransactionTimeoutMs, producerStateManagerConfig, time)
val isRemoteLogEnabled = UnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, topicPartition.topic)
@@ -2103,7 +2109,8 @@ object UnifiedLog extends Logging {
}
/**
- * If the recordVersion is >= RecordVersion.V2, then create and return a LeaderEpochFileCache.
+ * If the recordVersion is >= RecordVersion.V2, create a new LeaderEpochFileCache instance.
+ * Loading the epoch entries from the backing checkpoint file or the provided currentCache if not empty.
* Otherwise, the message format is considered incompatible and the existing LeaderEpoch file
* is deleted.
*
@@ -2112,33 +2119,29 @@ object UnifiedLog extends Logging {
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
* @param recordVersion The record version
* @param logPrefix The logging prefix
+ * @param currentCache The current LeaderEpochFileCache instance (if any)
+ * @param scheduler The scheduler for executing asynchronous tasks
* @return The new LeaderEpochFileCache instance (if created), none otherwise
*/
def maybeCreateLeaderEpochCache(dir: File,
topicPartition: TopicPartition,
logDirFailureChannel: LogDirFailureChannel,
recordVersion: RecordVersion,
- logPrefix: String): Option[LeaderEpochFileCache] = {
+ logPrefix: String,
+ currentCache: Option[LeaderEpochFileCache],
+ scheduler: Scheduler): Option[LeaderEpochFileCache] = {
val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)
- def newLeaderEpochFileCache(): LeaderEpochFileCache = {
- val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel)
- new LeaderEpochFileCache(topicPartition, checkpointFile)
- }
-
if (recordVersion.precedes(RecordVersion.V2)) {
- val currentCache = if (leaderEpochFile.exists())
- Some(newLeaderEpochFileCache())
- else
- None
-
- if (currentCache.exists(_.nonEmpty))
+ if (leaderEpochFile.exists()) {
warn(s"${logPrefix}Deleting non-empty leader epoch cache due to incompatible message format $recordVersion")
-
+ }
Files.deleteIfExists(leaderEpochFile.toPath)
None
} else {
- Some(newLeaderEpochFileCache())
+ val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel)
+ currentCache.map(_.withCheckpoint(checkpointFile))
+ .orElse(Some(new LeaderEpochFileCache(topicPartition, checkpointFile, scheduler)))
}
}
diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
index de3283d21fd42..084e46c5ef266 100644
--- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
@@ -68,7 +68,7 @@ class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureCh
def write(offsets: Map[TopicPartition, Long]): Unit = {
val list: java.util.List[(TopicPartition, Long)] = new java.util.ArrayList[(TopicPartition, Long)](offsets.size)
offsets.foreach(x => list.add(x))
- checkpoint.write(list, true)
+ checkpoint.write(list)
}
def read(): Map[TopicPartition, Long] = {
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 50b581fdf4ee5..19fa4a8a4431d 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -60,14 +60,15 @@
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
-import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint;
-import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
+import org.apache.kafka.server.util.MockScheduler;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.FetchIsolation;
import org.apache.kafka.storage.internals.log.LazyIndex;
import org.apache.kafka.storage.internals.log.LogConfig;
+import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.internals.log.LogFileUtils;
import org.apache.kafka.storage.internals.log.LogSegment;
import org.apache.kafka.storage.internals.log.OffsetIndex;
@@ -89,16 +90,19 @@
import scala.Option;
import scala.collection.JavaConverters;
+import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -197,25 +201,16 @@ public class RemoteLogManagerTest {
private final EpochEntry epochEntry1 = new EpochEntry(1, 100);
private final EpochEntry epochEntry2 = new EpochEntry(2, 200);
private final List totalEpochEntries = Arrays.asList(epochEntry0, epochEntry1, epochEntry2);
- private final LeaderEpochCheckpoint checkpoint = new LeaderEpochCheckpoint() {
- List epochs = Collections.emptyList();
-
- @Override
- public void write(Collection epochs, boolean ignored) {
- this.epochs = new ArrayList<>(epochs);
- }
-
- @Override
- public List read() {
- return epochs;
- }
- };
+ private LeaderEpochCheckpointFile checkpoint;
private final AtomicLong currentLogStartOffset = new AtomicLong(0L);
private UnifiedLog mockLog = mock(UnifiedLog.class);
+ private final MockScheduler scheduler = new MockScheduler(time);
+
@BeforeEach
void setUp() throws Exception {
+ checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1));
topicIds.put(leaderTopicIdPartition.topicPartition().topic(), leaderTopicIdPartition.topicId());
topicIds.put(followerTopicIdPartition.topicPartition().topic(), followerTopicIdPartition.topicId());
Properties props = kafka.utils.TestUtils.createDummyBrokerConfig();
@@ -253,13 +248,11 @@ void tearDown() {
@Test
void testGetLeaderEpochCheckpoint() {
checkpoint.write(totalEpochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
- InMemoryLeaderEpochCheckpoint inMemoryCheckpoint = remoteLogManager.getLeaderEpochCheckpoint(mockLog, 0, 300);
- assertEquals(totalEpochEntries, inMemoryCheckpoint.read());
+ assertEquals(totalEpochEntries, remoteLogManager.getLeaderEpochEntries(mockLog, 0, 300));
- InMemoryLeaderEpochCheckpoint inMemoryCheckpoint2 = remoteLogManager.getLeaderEpochCheckpoint(mockLog, 100, 200);
- List epochEntries = inMemoryCheckpoint2.read();
+ List epochEntries = remoteLogManager.getLeaderEpochEntries(mockLog, 100, 200);
assertEquals(1, epochEntries.size());
assertEquals(epochEntry1, epochEntries.get(0));
}
@@ -271,7 +264,7 @@ void testFindHighestRemoteOffsetOnEmptyRemoteStorage() throws RemoteStorageExce
new EpochEntry(1, 500)
);
checkpoint.write(totalEpochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp);
OffsetAndEpoch offsetAndEpoch = remoteLogManager.findHighestRemoteOffset(tpId, mockLog);
@@ -285,7 +278,7 @@ void testFindHighestRemoteOffset() throws RemoteStorageException {
new EpochEntry(1, 500)
);
checkpoint.write(totalEpochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp);
when(remoteLogMetadataManager.highestOffsetForEpoch(eq(tpId), anyInt())).thenAnswer(ans -> {
@@ -308,7 +301,7 @@ void testFindHighestRemoteOffsetWithUncleanLeaderElection() throws RemoteStorage
new EpochEntry(2, 300)
);
checkpoint.write(totalEpochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp);
when(remoteLogMetadataManager.highestOffsetForEpoch(eq(tpId), anyInt())).thenAnswer(ans -> {
@@ -470,7 +463,7 @@ private void assertCopyExpectedLogSegmentsToRemote(long oldSegmentStartOffset,
// leader epoch preparation
checkpoint.write(totalEpochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(-1L));
@@ -584,7 +577,7 @@ void testCustomMetadataSizeExceedsLimit() throws Exception {
// leader epoch preparation
checkpoint.write(totalEpochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(-1L));
@@ -684,7 +677,7 @@ void testRemoteLogManagerTasksAvgIdlePercentAndMetadataCountMetrics() throws Exc
// leader epoch preparation
checkpoint.write(totalEpochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L));
@@ -803,7 +796,7 @@ void testRemoteLogTaskUpdateRemoteLogSegmentMetadataAfterLogDirChanged() throws
// leader epoch preparation
checkpoint.write(totalEpochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt()))
.thenReturn(Optional.of(0L))
@@ -917,7 +910,7 @@ void testRemoteLogManagerRemoteMetrics() throws Exception {
// leader epoch preparation
checkpoint.write(totalEpochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L));
@@ -1067,7 +1060,7 @@ void testMetricsUpdateOnCopyLogSegmentsFailure() throws Exception {
// leader epoch preparation
checkpoint.write(totalEpochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L));
@@ -1140,7 +1133,7 @@ void testCopyLogSegmentsToRemoteShouldNotCopySegmentForFollower() throws Excepti
// leader epoch preparation
checkpoint.write(totalEpochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L));
@@ -1176,7 +1169,7 @@ void testRLMTaskDoesNotUploadSegmentsWhenRemoteLogMetadataManagerIsNotInitialize
// leader epoch preparation
checkpoint.write(totalEpochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
// Throw a retryable exception so indicate that the remote log metadata manager is not initialized yet
@@ -1258,9 +1251,7 @@ private void verifyLogSegmentData(LogSegmentData logSegmentData,
assertEquals(tempFile.getAbsolutePath(), logSegmentData.logSegment().toAbsolutePath().toString());
assertEquals(mockProducerSnapshotIndex.getAbsolutePath(), logSegmentData.producerSnapshotIndex().toAbsolutePath().toString());
- InMemoryLeaderEpochCheckpoint inMemoryLeaderEpochCheckpoint = new InMemoryLeaderEpochCheckpoint();
- inMemoryLeaderEpochCheckpoint.write(expectedLeaderEpoch);
- assertEquals(inMemoryLeaderEpochCheckpoint.readAsByteBuffer(), logSegmentData.leaderEpochIndex());
+ assertEquals(RemoteLogManager.epochEntriesAsByteBuffer(expectedLeaderEpoch), logSegmentData.leaderEpochIndex());
}
@Test
@@ -1379,7 +1370,7 @@ void testFindOffsetByTimestamp() throws IOException, RemoteStorageException {
TreeMap validSegmentEpochs = new TreeMap<>();
validSegmentEpochs.put(targetLeaderEpoch, startOffset);
- LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(tp, checkpoint);
+ LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(tp, checkpoint, scheduler);
leaderEpochFileCache.assign(4, 99L);
leaderEpochFileCache.assign(5, 99L);
leaderEpochFileCache.assign(targetLeaderEpoch, startOffset);
@@ -1414,7 +1405,7 @@ void testFindOffsetByTimestampWithInvalidEpochSegments() throws IOException, Rem
validSegmentEpochs.put(targetLeaderEpoch - 1, startOffset - 1); // invalid epochs not aligning with leader epoch cache
validSegmentEpochs.put(targetLeaderEpoch, startOffset);
- LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(tp, checkpoint);
+ LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(tp, checkpoint, scheduler);
leaderEpochFileCache.assign(4, 99L);
leaderEpochFileCache.assign(5, 99L);
leaderEpochFileCache.assign(targetLeaderEpoch, startOffset);
@@ -1445,7 +1436,7 @@ void testFindOffsetByTimestampWithSegmentNotReady() throws IOException, RemoteSt
TreeMap validSegmentEpochs = new TreeMap<>();
validSegmentEpochs.put(targetLeaderEpoch, startOffset);
- LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(tp, checkpoint);
+ LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(tp, checkpoint, scheduler);
leaderEpochFileCache.assign(4, 99L);
leaderEpochFileCache.assign(5, 99L);
leaderEpochFileCache.assign(targetLeaderEpoch, startOffset);
@@ -1902,7 +1893,7 @@ public void testFindLogStartOffset() throws RemoteStorageException, IOException
epochEntries.add(new EpochEntry(2, 550L));
checkpoint.write(epochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
long timestamp = time.milliseconds();
@@ -1940,7 +1931,7 @@ public void testFindLogStartOffsetFallbackToLocalLogStartOffsetWhenRemoteIsEmpty
epochEntries.add(new EpochEntry(2, 550L));
checkpoint.write(epochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(mockLog.localLogStartOffset()).thenReturn(250L);
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), anyInt()))
@@ -1965,7 +1956,7 @@ public void testLogStartOffsetUpdatedOnStartup() throws RemoteStorageException,
epochEntries.add(new EpochEntry(2, 550L));
checkpoint.write(epochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
RemoteLogSegmentMetadata metadata = mock(RemoteLogSegmentMetadata.class);
@@ -2008,7 +1999,7 @@ public void testDeletionOnRetentionBreachedSegments(long retentionSize,
List epochEntries = Collections.singletonList(epochEntry0);
checkpoint.write(epochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
@@ -2061,7 +2052,7 @@ public void testRemoteDeleteLagsOnRetentionBreachedSegments(long retentionSize,
List epochEntries = Collections.singletonList(epochEntry0);
checkpoint.write(epochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
@@ -2132,7 +2123,7 @@ public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws Remot
.thenAnswer(ans -> metadataList.iterator());
checkpoint.write(epochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
Map logProps = new HashMap<>();
@@ -2194,7 +2185,7 @@ public void testFailedDeleteExpiredSegments(long retentionSize,
List epochEntries = Collections.singletonList(epochEntry0);
checkpoint.write(epochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
@@ -2247,7 +2238,7 @@ public void testDeleteLogSegmentDueToRetentionSizeBreach(int segmentCount,
new EpochEntry(4, 100L)
);
checkpoint.write(epochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler);
int currentLeaderEpoch = epochEntries.get(epochEntries.size() - 1).epoch;
long localLogSegmentsSize = 512L;
@@ -2285,7 +2276,7 @@ public void testDeleteLogSegmentDueToRetentionTimeBreach(int segmentCount,
new EpochEntry(4, 100L)
);
checkpoint.write(epochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler);
int currentLeaderEpoch = epochEntries.get(epochEntries.size() - 1).epoch;
long localLogSegmentsSize = 512L;
@@ -2372,7 +2363,7 @@ public RemoteLogMetadataManager createRemoteLogMetadataManager() {
.thenReturn(remoteLogSegmentMetadatas.iterator());
checkpoint.write(epochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
Map logProps = new HashMap<>();
@@ -2447,11 +2438,17 @@ private List listRemoteLogSegmentMetadataByTime(TopicI
private Map truncateAndGetLeaderEpochs(List entries,
Long startOffset,
Long endOffset) {
- InMemoryLeaderEpochCheckpoint myCheckpoint = new InMemoryLeaderEpochCheckpoint();
+ LeaderEpochCheckpointFile myCheckpoint;
+ try {
+ myCheckpoint = new LeaderEpochCheckpointFile(
+ TestUtils.tempFile(), new LogDirFailureChannel(1));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
myCheckpoint.write(entries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(null, myCheckpoint);
- cache.truncateFromStart(startOffset);
- cache.truncateFromEnd(endOffset);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(null, myCheckpoint, scheduler);
+ cache.truncateFromStartAsyncFlush(startOffset);
+ cache.truncateFromEndAsyncFlush(endOffset);
return myCheckpoint.read().stream().collect(Collectors.toMap(e -> e.epoch, e -> e.startOffset));
}
@@ -2678,7 +2675,7 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l
}
}
-
+
@Test
public void testCopyQuotaManagerConfig() {
Properties defaultProps = new Properties();
@@ -2698,7 +2695,7 @@ public void testCopyQuotaManagerConfig() {
assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples());
assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds());
}
-
+
@Test
public void testFetchQuotaManagerConfig() {
Properties defaultProps = new Properties();
@@ -2719,6 +2716,21 @@ public void testFetchQuotaManagerConfig() {
assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
}
+ @Test
+ public void testEpochEntriesAsByteBuffer() throws Exception {
+ int expectedEpoch = 0;
+ long expectedStartOffset = 1L;
+ int expectedVersion = 0;
+ List epochs = Arrays.asList(new EpochEntry(expectedEpoch, expectedStartOffset));
+ ByteBuffer buffer = RemoteLogManager.epochEntriesAsByteBuffer(epochs);
+ BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(buffer.array()), StandardCharsets.UTF_8));
+
+ assertEquals(String.valueOf(expectedVersion), bufferedReader.readLine());
+ assertEquals(String.valueOf(epochs.size()), bufferedReader.readLine());
+ assertEquals(expectedEpoch + " " + expectedStartOffset, bufferedReader.readLine());
+ }
+
+
private Partition mockPartition(TopicIdPartition topicIdPartition) {
TopicPartition tp = topicIdPartition.topicPartition();
Partition partition = mock(Partition.class);
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 32ddfc6418d4d..2e9bc068978bd 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -299,7 +299,8 @@ class PartitionLockTest extends Logging {
val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, None, None)
val logDirFailureChannel = new LogDirFailureChannel(1)
val segments = new LogSegments(log.topicPartition)
- val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "")
+ val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
+ log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "", None, mockTime.scheduler)
val maxTransactionTimeout = 5 * 60 * 1000
val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false)
val producerStateManager = new ProducerStateManager(
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 2134dcfaaa0c0..6dc6cc2a3c1f7 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -434,7 +434,8 @@ class PartitionTest extends AbstractPartitionTest {
val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, None, None)
val logDirFailureChannel = new LogDirFailureChannel(1)
val segments = new LogSegments(log.topicPartition)
- val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "")
+ val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
+ log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "", None, time.scheduler)
val maxTransactionTimeoutMs = 5 * 60 * 1000
val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, true)
val producerStateManager = new ProducerStateManager(
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index f17c724066fe0..bdbcac462b8c2 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -108,7 +108,8 @@ class LogCleanerManagerTest extends Logging {
val maxTransactionTimeoutMs = 5 * 60 * 1000
val producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
val segments = new LogSegments(tp)
- val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(tpDir, topicPartition, logDirFailureChannel, config.recordVersion, "")
+ val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
+ tpDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, time.scheduler)
val producerStateManager = new ProducerStateManager(topicPartition, tpDir, maxTransactionTimeoutMs, producerStateManagerConfig, time)
val offsets = new LogLoader(
tpDir,
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index b61eb28530ca9..ee62b3f0f21a8 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -92,7 +92,7 @@ class LogCleanerTest extends Logging {
val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
val numMetricsRegistered = LogCleaner.MetricNames.size
verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any())
-
+
// verify that each metric in `LogCleaner` is removed
LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))
@@ -188,7 +188,8 @@ class LogCleanerTest extends Logging {
val maxTransactionTimeoutMs = 5 * 60 * 1000
val producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
val logSegments = new LogSegments(topicPartition)
- val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, config.recordVersion, "")
+ val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
+ dir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, time.scheduler)
val producerStateManager = new ProducerStateManager(topicPartition, dir,
maxTransactionTimeoutMs, producerStateManagerConfig, time)
val offsets = new LogLoader(
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index 1a781a93ea667..7838da5417364 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -154,7 +154,8 @@ class LogLoaderTest {
val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L)
val logDirFailureChannel: LogDirFailureChannel = new LogDirFailureChannel(1)
val segments = new LogSegments(topicPartition)
- val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.recordVersion, "")
+ val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
+ logDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, time.scheduler)
val producerStateManager = new ProducerStateManager(topicPartition, logDir,
this.maxTransactionTimeoutMs, this.producerStateManagerConfig, time)
val logLoader = new LogLoader(logDir, topicPartition, config, time.scheduler, time,
@@ -367,7 +368,8 @@ class LogLoaderTest {
super.add(wrapper)
}
}
- val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "")
+ val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
+ logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "", None, mockTime.scheduler)
val producerStateManager = new ProducerStateManager(topicPartition, logDir,
maxTransactionTimeoutMs, producerStateManagerConfig, mockTime)
val logLoader = new LogLoader(
@@ -431,7 +433,8 @@ class LogLoaderTest {
val logDirFailureChannel: LogDirFailureChannel = new LogDirFailureChannel(1)
val config = new LogConfig(new Properties())
val segments = new LogSegments(topicPartition)
- val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.recordVersion, "")
+ val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
+ logDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, mockTime.scheduler)
val offsets = new LogLoader(
logDir,
topicPartition,
@@ -540,7 +543,8 @@ class LogLoaderTest {
val config = new LogConfig(logProps)
val logDirFailureChannel = null
val segments = new LogSegments(topicPartition)
- val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.recordVersion, "")
+ val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
+ logDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, mockTime.scheduler)
val offsets = new LogLoader(
logDir,
topicPartition,
@@ -594,7 +598,8 @@ class LogLoaderTest {
val config = new LogConfig(logProps)
val logDirFailureChannel = null
val segments = new LogSegments(topicPartition)
- val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.recordVersion, "")
+ val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
+ logDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, mockTime.scheduler)
val offsets = new LogLoader(
logDir,
topicPartition,
@@ -647,7 +652,8 @@ class LogLoaderTest {
val config = new LogConfig(logProps)
val logDirFailureChannel = null
val segments = new LogSegments(topicPartition)
- val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.recordVersion, "")
+ val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
+ logDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, mockTime.scheduler)
val offsets = new LogLoader(
logDir,
topicPartition,
@@ -1387,7 +1393,7 @@ class LogLoaderTest {
assertEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new EpochEntry(2, 1), new EpochEntry(3, 3)), leaderEpochCache.epochEntries)
// deliberately remove some of the epoch entries
- leaderEpochCache.truncateFromEnd(2)
+ leaderEpochCache.truncateFromEndAsyncFlush(2)
assertNotEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new EpochEntry(2, 1), new EpochEntry(3, 3)), leaderEpochCache.epochEntries)
log.close()
@@ -1789,7 +1795,8 @@ class LogLoaderTest {
log.logSegments.forEach(segment => segments.add(segment))
assertEquals(5, segments.firstSegment.get.baseOffset)
- val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "")
+ val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
+ logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "", None, mockTime.scheduler)
val offsets = new LogLoader(
logDir,
topicPartition,
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index b559c192790c4..e2272941ab3d6 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -24,7 +24,8 @@ import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{MockTime, Time, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
-import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint
+import org.apache.kafka.server.util.MockScheduler
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log._
import org.junit.jupiter.api.Assertions._
@@ -33,7 +34,6 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
import java.io.{File, RandomAccessFile}
-import java.util
import java.util.{Optional, OptionalLong}
import scala.collection._
import scala.jdk.CollectionConverters._
@@ -431,17 +431,9 @@ class LogSegmentTest {
def testRecoveryRebuildsEpochCache(): Unit = {
val seg = createSegment(0)
- val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
- private var epochs = Seq.empty[EpochEntry]
+ val checkpoint: LeaderEpochCheckpointFile = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1))
- override def write(epochs: util.Collection[EpochEntry], ignored: Boolean): Unit = {
- this.epochs = epochs.asScala.toSeq
- }
-
- override def read(): java.util.List[EpochEntry] = this.epochs.asJava
- }
-
- val cache = new LeaderEpochFileCache(topicPartition, checkpoint)
+ val cache = new LeaderEpochFileCache(topicPartition, checkpoint, new MockScheduler(new MockTime()))
seg.append(105L, RecordBatch.NO_TIMESTAMP, 104L, MemoryRecords.withRecords(104L, Compression.NONE, 0,
new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 65cbcc7bd70db..bdb4e6bf7e07c 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2911,7 +2911,8 @@ class ReplicaManagerTest {
val maxTransactionTimeoutMs = 30000
val maxProducerIdExpirationMs = 30000
val segments = new LogSegments(tp)
- val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, tp, mockLogDirFailureChannel, logConfig.recordVersion, "")
+ val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
+ logDir, tp, mockLogDirFailureChannel, logConfig.recordVersion, "", None, time.scheduler)
val producerStateManager = new ProducerStateManager(tp, logDir,
maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs, true), time)
val offsets = new LogLoader(
@@ -6523,7 +6524,7 @@ class ReplicaManagerTest {
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
val leaderAndIsr = LeaderAndIsr(0, 1, List(0, 1), LeaderRecoveryState.RECOVERED, LeaderAndIsr.InitialPartitionEpoch)
- val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), leaderAndIsr)
+ val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), leaderAndIsr)
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
verifyRLMOnLeadershipChange(Collections.singleton(partition), Collections.emptySet())
diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/InMemoryLeaderEpochCheckpointTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/InMemoryLeaderEpochCheckpointTest.scala
deleted file mode 100644
index 3af126f5c5529..0000000000000
--- a/core/src/test/scala/unit/kafka/server/checkpoints/InMemoryLeaderEpochCheckpointTest.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server.checkpoints
-
-import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint
-import org.apache.kafka.storage.internals.log.EpochEntry
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-
-import java.io.{BufferedReader, ByteArrayInputStream, InputStreamReader}
-import java.nio.charset.StandardCharsets
-
-class InMemoryLeaderEpochCheckpointTest {
-
- @Test
- def shouldAppendNewEntry(): Unit = {
- val checkpoint = new InMemoryLeaderEpochCheckpoint()
- val epochs = java.util.Arrays.asList(new EpochEntry(0, 1L), new EpochEntry(1, 2L), new EpochEntry(2, 3L))
- checkpoint.write(epochs)
- assertEquals(epochs, checkpoint.read())
-
- val epochs2 = java.util.Arrays.asList(new EpochEntry(3, 4L), new EpochEntry(4, 5L))
- checkpoint.write(epochs2)
-
- assertEquals(epochs2, checkpoint.read())
- }
-
- @Test
- def testReadAsByteBuffer(): Unit = {
- val checkpoint = new InMemoryLeaderEpochCheckpoint()
- val expectedEpoch = 0
- val expectedStartOffset = 1L
- val expectedVersion = 0
- val epochs = java.util.Arrays.asList(new EpochEntry(expectedEpoch, expectedStartOffset))
- checkpoint.write(epochs)
- assertEquals(epochs, checkpoint.read())
- val buffer = checkpoint.readAsByteBuffer()
-
- val bufferedReader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(buffer.array()), StandardCharsets.UTF_8))
- assertEquals(expectedVersion.toString, bufferedReader.readLine())
- assertEquals(epochs.size().toString, bufferedReader.readLine())
- assertEquals(s"$expectedEpoch $expectedStartOffset", bufferedReader.readLine())
- }
-}
diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala
index a7e370d7f4091..7808cedb075ee 100644
--- a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala
@@ -19,12 +19,13 @@ package kafka.server.checkpoints
import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.KafkaStorageException
-import org.apache.kafka.storage.internals.checkpoint.CheckpointFileWithFailureHandler
-import org.apache.kafka.storage.internals.log.LogDirFailureChannel
+import org.apache.kafka.storage.internals.checkpoint.{CheckpointFileWithFailureHandler, LeaderEpochCheckpointFile}
+import org.apache.kafka.storage.internals.log.{EpochEntry, LogDirFailureChannel}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.Mockito
+import java.io.File
import java.util.Collections
import scala.collection.Map
@@ -97,7 +98,7 @@ class OffsetCheckpointFileWithFailureHandlerTest extends Logging {
val logDirFailureChannel = new LogDirFailureChannel(10)
val checkpointFile = new CheckpointFileWithFailureHandler(file, OffsetCheckpointFile.CurrentVersion + 1,
OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent)
- checkpointFile.write(Collections.singletonList(new TopicPartition("foo", 5) -> 10L), true)
+ checkpointFile.write(Collections.singletonList(new TopicPartition("foo", 5) -> 10L))
assertThrows(classOf[KafkaStorageException], () => new OffsetCheckpointFile(checkpointFile.file, logDirFailureChannel).read())
}
@@ -133,4 +134,15 @@ class OffsetCheckpointFileWithFailureHandlerTest extends Logging {
assertThrows(classOf[IllegalArgumentException], () => lazyCheckpoints.fetch("/invalid/kafka-logs", new TopicPartition("foo", 0)))
}
+ @Test
+ def testWriteIfDirExistsShouldNotThrowWhenDirNotExists(): Unit = {
+ val dir = TestUtils.tempDir()
+ val file = dir.toPath.resolve("test-checkpoint").toFile
+ val logDirFailureChannel = new LogDirFailureChannel(10)
+ val checkpointFile = new CheckpointFileWithFailureHandler(file, 0,
+ LeaderEpochCheckpointFile.FORMATTER, logDirFailureChannel, file.getParent)
+
+ dir.renameTo(new File(dir.getAbsolutePath + "-renamed"))
+ checkpointFile.writeIfDirExists(Collections.singletonList(new EpochEntry(1, 42)))
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
index 05041f39709f7..6f6d0bdbda58d 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
@@ -20,15 +20,15 @@ package kafka.server.epoch
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
-import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpoint, LeaderEpochCheckpointFile}
+import org.apache.kafka.server.util.MockTime
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{EpochEntry, LogDirFailureChannel}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import java.io.File
-import java.util.{Collections, OptionalInt, Optional}
-import scala.collection.Seq
+import java.util.{Collections, Optional, OptionalInt}
import scala.jdk.CollectionConverters._
/**
@@ -36,13 +36,10 @@ import scala.jdk.CollectionConverters._
*/
class LeaderEpochFileCacheTest {
val tp = new TopicPartition("TestTopic", 5)
- private val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
- private var epochs: Seq[EpochEntry] = Seq()
- override def write(epochs: java.util.Collection[EpochEntry], ignored: Boolean): Unit = this.epochs = epochs.asScala.toSeq
- override def read(): java.util.List[EpochEntry] = this.epochs.asJava
- }
+ val mockTime = new MockTime()
+ private val checkpoint: LeaderEpochCheckpointFile = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1))
- private val cache = new LeaderEpochFileCache(tp, checkpoint)
+ private val cache = new LeaderEpochFileCache(tp, checkpoint, mockTime.scheduler)
@Test
def testPreviousEpoch(): Unit = {
@@ -57,7 +54,7 @@ class LeaderEpochFileCacheTest {
cache.assign(10, 20)
assertEquals(OptionalInt.of(4), cache.previousEpoch)
- cache.truncateFromEnd(18)
+ cache.truncateFromEndAsyncFlush(18)
assertEquals(OptionalInt.of(2), cache.previousEpoch)
}
@@ -245,12 +242,12 @@ class LeaderEpochFileCacheTest {
val checkpoint = new LeaderEpochCheckpointFile(new File(checkpointPath), new LogDirFailureChannel(1))
//Given
- val cache = new LeaderEpochFileCache(tp, checkpoint)
+ val cache = new LeaderEpochFileCache(tp, checkpoint, new MockTime().scheduler)
cache.assign(2, 6)
//When
val checkpoint2 = new LeaderEpochCheckpointFile(new File(checkpointPath), new LogDirFailureChannel(1))
- val cache2 = new LeaderEpochFileCache(tp, checkpoint2)
+ val cache2 = new LeaderEpochFileCache(tp, checkpoint2, new MockTime().scheduler)
//Then
assertEquals(1, cache2.epochEntries.size)
@@ -387,7 +384,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)
//When clear latest on epoch boundary
- cache.truncateFromEnd(8)
+ cache.truncateFromEndAsyncFlush(8)
//Then should remove two latest epochs (remove is inclusive)
assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6)), cache.epochEntries)
@@ -401,7 +398,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)
//When reset to offset ON epoch boundary
- cache.truncateFromStart(8)
+ cache.truncateFromStartAsyncFlush(8)
//Then should preserve (3, 8)
assertEquals(java.util.Arrays.asList(new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries)
@@ -415,7 +412,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)
//When reset to offset BETWEEN epoch boundaries
- cache.truncateFromStart(9)
+ cache.truncateFromStartAsyncFlush(9)
//Then we should retain epoch 3, but update it's offset to 9 as 8 has been removed
assertEquals(java.util.Arrays.asList(new EpochEntry(3, 9), new EpochEntry(4, 11)), cache.epochEntries)
@@ -429,7 +426,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)
//When reset to offset before first epoch offset
- cache.truncateFromStart(1)
+ cache.truncateFromStartAsyncFlush(1)
//Then nothing should change
assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6),new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries)
@@ -443,7 +440,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)
//When reset to offset on earliest epoch boundary
- cache.truncateFromStart(6)
+ cache.truncateFromStartAsyncFlush(6)
//Then nothing should change
assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6),new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries)
@@ -457,7 +454,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)
//When
- cache.truncateFromStart(11)
+ cache.truncateFromStartAsyncFlush(11)
//Then retain the last
assertEquals(Collections.singletonList(new EpochEntry(4, 11)), cache.epochEntries)
@@ -471,7 +468,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)
//When we clear from a position between offset 8 & offset 11
- cache.truncateFromStart(9)
+ cache.truncateFromStartAsyncFlush(9)
//Then we should update the middle epoch entry's offset
assertEquals(java.util.Arrays.asList(new EpochEntry(3, 9), new EpochEntry(4, 11)), cache.epochEntries)
@@ -485,7 +482,7 @@ class LeaderEpochFileCacheTest {
cache.assign(2, 10)
//When we clear from a position between offset 0 & offset 7
- cache.truncateFromStart(5)
+ cache.truncateFromStartAsyncFlush(5)
//Then we should keep epoch 0 but update the offset appropriately
assertEquals(java.util.Arrays.asList(new EpochEntry(0,5), new EpochEntry(1, 7), new EpochEntry(2, 10)),
@@ -500,7 +497,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)
//When reset to offset beyond last epoch
- cache.truncateFromStart(15)
+ cache.truncateFromStartAsyncFlush(15)
//Then update the last
assertEquals(Collections.singletonList(new EpochEntry(4, 15)), cache.epochEntries)
@@ -514,7 +511,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)
//When reset to offset BETWEEN epoch boundaries
- cache.truncateFromEnd( 9)
+ cache.truncateFromEndAsyncFlush( 9)
//Then should keep the preceding epochs
assertEquals(OptionalInt.of(3), cache.latestEpoch)
@@ -543,7 +540,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)
//When reset to offset on epoch boundary
- cache.truncateFromStart(UNDEFINED_EPOCH_OFFSET)
+ cache.truncateFromStartAsyncFlush(UNDEFINED_EPOCH_OFFSET)
//Then should do nothing
assertEquals(3, cache.epochEntries.size)
@@ -557,7 +554,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)
//When reset to offset on epoch boundary
- cache.truncateFromEnd(UNDEFINED_EPOCH_OFFSET)
+ cache.truncateFromEndAsyncFlush(UNDEFINED_EPOCH_OFFSET)
//Then should do nothing
assertEquals(3, cache.epochEntries.size)
@@ -578,13 +575,13 @@ class LeaderEpochFileCacheTest {
@Test
def shouldClearEarliestOnEmptyCache(): Unit = {
//Then
- cache.truncateFromStart(7)
+ cache.truncateFromStartAsyncFlush(7)
}
@Test
def shouldClearLatestOnEmptyCache(): Unit = {
//Then
- cache.truncateFromEnd(7)
+ cache.truncateFromEndAsyncFlush(7)
}
@Test
@@ -600,7 +597,7 @@ class LeaderEpochFileCacheTest {
cache.assign(10, 20)
assertEquals(OptionalInt.of(4), cache.previousEpoch(10))
- cache.truncateFromEnd(18)
+ cache.truncateFromEndAsyncFlush(18)
assertEquals(OptionalInt.of(2), cache.previousEpoch(cache.latestEpoch.getAsInt))
}
@@ -617,7 +614,7 @@ class LeaderEpochFileCacheTest {
cache.assign(10, 20)
assertEquals(Optional.of(new EpochEntry(4, 15)), cache.previousEntry(10))
- cache.truncateFromEnd(18)
+ cache.truncateFromEndAsyncFlush(18)
assertEquals(Optional.of(new EpochEntry(2, 10)), cache.previousEntry(cache.latestEpoch.getAsInt))
}
@@ -658,4 +655,15 @@ class LeaderEpochFileCacheTest {
assertEquals(OptionalInt.empty(), cache.epochForOffset(5))
}
+ @Test
+ def shouldWriteCheckpointOnTruncation(): Unit = {
+ cache.assign(2, 6)
+ cache.assign(3, 8)
+ cache.assign(4, 11)
+
+ cache.truncateFromEndAsyncFlush(11)
+ cache.truncateFromStartAsyncFlush(8)
+
+ assertEquals(List(new EpochEntry(3, 8)).asJava, checkpoint.read())
+ }
}
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index d25fdb1b4e9b1..6280318af5d60 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -139,7 +139,8 @@ class SchedulerTest {
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
val logDirFailureChannel = new LogDirFailureChannel(10)
val segments = new LogSegments(topicPartition)
- val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "")
+ val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
+ logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "", None, mockTime.scheduler)
val producerStateManager = new ProducerStateManager(topicPartition, logDir,
maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs, false), mockTime)
val offsets = new LogLoader(
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
index 9c115881328a0..6efbaa136e0e9 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
@@ -72,7 +72,7 @@ public CheckpointFile(File file,
tempPath = Paths.get(absolutePath + ".tmp");
}
- public void write(Collection entries, boolean sync) throws IOException {
+ public void write(Collection entries) throws IOException {
synchronized (lock) {
// write to temp file and then swap with the existing file
try (FileOutputStream fileOutputStream = new FileOutputStream(tempPath.toFile());
@@ -80,12 +80,10 @@ public void write(Collection entries, boolean sync) throws IOException {
CheckpointWriteBuffer checkpointWriteBuffer = new CheckpointWriteBuffer<>(writer, version, formatter);
checkpointWriteBuffer.write(entries);
writer.flush();
- if (sync) {
- fileOutputStream.getFD().sync();
- }
+ fileOutputStream.getFD().sync();
}
- Utils.atomicMoveWithFallback(tempPath, absolutePath, sync);
+ Utils.atomicMoveWithFallback(tempPath, absolutePath);
}
}
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java
index 35abfb5a984d2..79963d79d2360 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java
@@ -19,13 +19,19 @@
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.server.common.CheckpointFile;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.List;
public class CheckpointFileWithFailureHandler {
+ private static final Logger log = LoggerFactory.getLogger(CheckpointFileWithFailureHandler.class);
+
public final File file;
private final LogDirFailureChannel logDirFailureChannel;
@@ -41,9 +47,21 @@ public CheckpointFileWithFailureHandler(File file, int version, CheckpointFile.E
checkpointFile = new CheckpointFile<>(file, version, formatter);
}
- public void write(Collection entries, boolean sync) {
+ public void write(Collection entries) {
+ try {
+ checkpointFile.write(entries);
+ } catch (IOException e) {
+ String msg = "Error while writing to checkpoint file " + file.getAbsolutePath();
+ logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e);
+ throw new KafkaStorageException(msg, e);
+ }
+ }
+
+ public void writeIfDirExists(Collection entries) {
try {
- checkpointFile.write(entries, sync);
+ checkpointFile.write(entries);
+ } catch (FileNotFoundException | NoSuchFileException e) {
+ log.warn("Failed to write to checkpoint file {}. This is ok if the topic/partition is being deleted", file.getAbsolutePath(), e);
} catch (IOException e) {
String msg = "Error while writing to checkpoint file " + file.getAbsolutePath();
logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e);
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java
deleted file mode 100644
index 499c19fb78b5b..0000000000000
--- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.storage.internals.checkpoint;
-
-import org.apache.kafka.server.common.CheckpointFile;
-import org.apache.kafka.storage.internals.log.EpochEntry;
-
-import java.io.BufferedWriter;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * This class stores a list of EpochEntry(LeaderEpoch + Offsets) to memory
- *
- * The motivation for this class is to allow remote log manager to create the RemoteLogSegmentMetadata(RLSM)
- * with the correct leader epoch info for a specific segment. To do that, we need to rely on the LeaderEpochCheckpointCache
- * to truncate from start and end, to get the epoch info. However, we don't really want to truncate the epochs in cache
- * (and write to checkpoint file in the end). So, we introduce this InMemoryLeaderEpochCheckpoint to feed into LeaderEpochCheckpointCache,
- * and when we truncate the epoch for RLSM, we can do them in memory without affecting the checkpoint file, and without interacting with file system.
- */
-public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint {
- private List epochs = Collections.emptyList();
-
- public void write(Collection epochs, boolean ignored) {
- this.epochs = new ArrayList<>(epochs);
- }
-
- public List read() {
- return Collections.unmodifiableList(epochs);
- }
-
- public ByteBuffer readAsByteBuffer() throws IOException {
- ByteArrayOutputStream stream = new ByteArrayOutputStream();
- try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(stream, StandardCharsets.UTF_8))) {
- CheckpointFile.CheckpointWriteBuffer writeBuffer = new CheckpointFile.CheckpointWriteBuffer<>(writer, 0, LeaderEpochCheckpointFile.FORMATTER);
- writeBuffer.write(epochs);
- writer.flush();
- }
-
- return ByteBuffer.wrap(stream.toByteArray());
- }
-}
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java
deleted file mode 100644
index 28ffae03df0e1..0000000000000
--- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.storage.internals.checkpoint;
-
-import org.apache.kafka.storage.internals.log.EpochEntry;
-
-import java.util.Collection;
-import java.util.List;
-
-public interface LeaderEpochCheckpoint {
- // in file-backed checkpoint implementation, the content should be
- // synced to the device if `sync` is true
- void write(Collection epochs, boolean sync);
-
- default void write(Collection epochs) {
- write(epochs, true);
- }
-
- List read();
-}
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java
index 3472182aeea1e..392a36533400f 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java
@@ -38,7 +38,7 @@
* 1 2
* -----checkpoint file end----------
*/
-public class LeaderEpochCheckpointFile implements LeaderEpochCheckpoint {
+public class LeaderEpochCheckpointFile {
public static final Formatter FORMATTER = new Formatter();
@@ -53,11 +53,14 @@ public LeaderEpochCheckpointFile(File file, LogDirFailureChannel logDirFailureCh
}
public void write(Collection epochs) {
- write(epochs, true);
+ checkpoint.write(epochs);
}
- public void write(Collection epochs, boolean sync) {
- checkpoint.write(epochs, sync);
+ public void writeForTruncation(Collection epochs) {
+ // Writing epoch entries after truncation is done asynchronously for performance reasons.
+ // This could cause NoSuchFileException when the directory is renamed concurrently for topic deletion,
+ // so we use writeIfDirExists here.
+ checkpoint.writeIfDirExists(epochs);
}
public List read() {
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
index 03df6cc0dceb9..7b78a70993d93 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
@@ -18,15 +18,18 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.util.Scheduler;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.log.EpochEntry;
-import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
import org.slf4j.Logger;
import java.util.AbstractMap;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.OptionalInt;
@@ -42,10 +45,15 @@
*
* Leader Epoch = epoch assigned to each leader by the controller.
* Offset = offset of the first message in each epoch.
+ *
+ * Note that {@link #truncateFromStartAsyncFlush},{@link #truncateFromEndAsyncFlush} flush the epoch-entry changes to checkpoint asynchronously.
+ * Hence, it is instantiater's responsibility to ensure restoring the cache to the correct state after instantiating
+ * this class from checkpoint (which might contain stale epoch entries right after instantiation).
*/
public class LeaderEpochFileCache {
private final TopicPartition topicPartition;
- private final LeaderEpochCheckpoint checkpoint;
+ private final LeaderEpochCheckpointFile checkpoint;
+ private final Scheduler scheduler;
private final Logger log;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -55,16 +63,40 @@ public class LeaderEpochFileCache {
/**
* @param topicPartition the associated topic partition
* @param checkpoint the checkpoint file
+ * @param scheduler the scheduler to use for async I/O operations
*/
@SuppressWarnings("this-escape")
- public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint checkpoint) {
+ public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpointFile checkpoint, Scheduler scheduler) {
this.checkpoint = checkpoint;
this.topicPartition = topicPartition;
+ this.scheduler = scheduler;
LogContext logContext = new LogContext("[LeaderEpochCache " + topicPartition + "] ");
log = logContext.logger(LeaderEpochFileCache.class);
checkpoint.read().forEach(this::assign);
}
+ /**
+ * Instantiate a new LeaderEpochFileCache with provided epoch entries instead of from the backing checkpoint file.
+ * The provided epoch entries are expected to be no less fresh than the checkpoint file.
+ * @param epochEntries the current epoch entries
+ * @param topicPartition the associated topic partition
+ * @param checkpoint the checkpoint file
+ * @param scheduler the scheduler to use for async I/O operations
+ */
+ private LeaderEpochFileCache(List epochEntries,
+ TopicPartition topicPartition,
+ LeaderEpochCheckpointFile checkpoint,
+ Scheduler scheduler) {
+ this.checkpoint = checkpoint;
+ this.topicPartition = topicPartition;
+ this.scheduler = scheduler;
+ LogContext logContext = new LogContext("[LeaderEpochCache " + topicPartition + "] ");
+ log = logContext.logger(LeaderEpochFileCache.class);
+ for (EpochEntry entry : epochEntries) {
+ epochs.put(entry.epoch, entry);
+ }
+ }
+
/**
* Assigns the supplied Leader Epoch to the supplied Offset
* Once the epoch is assigned it cannot be reassigned
@@ -73,7 +105,7 @@ public void assign(int epoch, long startOffset) {
EpochEntry entry = new EpochEntry(epoch, startOffset);
if (assign(entry)) {
log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size());
- writeToFile(true);
+ writeToFile();
}
}
@@ -83,7 +115,7 @@ public void assign(List entries) {
log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size());
}
});
- if (!entries.isEmpty()) writeToFile(true);
+ if (!entries.isEmpty()) writeToFile();
}
private boolean isUpdateNeeded(EpochEntry entry) {
@@ -117,7 +149,9 @@ private boolean assign(EpochEntry entry) {
* Remove any entries which violate monotonicity prior to appending a new entry
*/
private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) {
- List removedEpochs = removeFromEnd(entry -> entry.epoch >= newEntry.epoch || entry.startOffset >= newEntry.startOffset);
+ List removedEpochs = removeWhileMatching(
+ epochs.descendingMap().entrySet().iterator(),
+ entry -> entry.epoch >= newEntry.epoch || entry.startOffset >= newEntry.startOffset);
if (removedEpochs.size() > 1 || (!removedEpochs.isEmpty() && removedEpochs.get(0).startOffset != newEntry.startOffset)) {
@@ -128,15 +162,7 @@ private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) {
}
}
- private List removeFromEnd(Predicate predicate) {
- return removeWhileMatching(epochs.descendingMap().entrySet().iterator(), predicate);
- }
-
- private List removeFromStart(Predicate predicate) {
- return removeWhileMatching(epochs.entrySet().iterator(), predicate);
- }
-
- private List removeWhileMatching(Iterator> iterator, Predicate predicate) {
+ private static List removeWhileMatching(Iterator> iterator, Predicate predicate) {
ArrayList removedEpochs = new ArrayList<>();
while (iterator.hasNext()) {
@@ -305,22 +331,23 @@ public Map.Entry endOffsetFor(int requestedEpoch, long logEndOffs
/**
* Removes all epoch entries from the store with start offsets greater than or equal to the passed offset.
+ *
+ * Checkpoint-flushing is done asynchronously.
*/
- public void truncateFromEnd(long endOffset) {
+ public void truncateFromEndAsyncFlush(long endOffset) {
lock.writeLock().lock();
try {
- Optional epochEntry = latestEntry();
- if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) {
- List removedEntries = removeFromEnd(x -> x.startOffset >= endOffset);
-
- // We intentionally don't force flushing change to the device here because:
+ List removedEntries = truncateFromEnd(epochs, endOffset);
+ if (!removedEntries.isEmpty()) {
+ // We flush the change to the device in the background because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called by ReplicaFetcher threads, which could block replica fetching
// then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency.
- // - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be handled by
- // another truncateFromEnd call on log loading procedure so it won't be a problem
- writeToFile(false);
+ // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
+ // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
+ // another truncateFromEnd call on log loading procedure, so it won't be a problem
+ scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation);
log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size());
}
@@ -334,28 +361,27 @@ public void truncateFromEnd(long endOffset) {
* be offset, then clears any previous epoch entries.
*
* This method is exclusive: so truncateFromStart(6) will retain an entry at offset 6.
+ *
+ * Checkpoint-flushing is done asynchronously.
*
* @param startOffset the offset to clear up to
*/
- public void truncateFromStart(long startOffset) {
+ public void truncateFromStartAsyncFlush(long startOffset) {
lock.writeLock().lock();
try {
- List removedEntries = removeFromStart(entry -> entry.startOffset <= startOffset);
-
+ List removedEntries = truncateFromStart(epochs, startOffset);
if (!removedEntries.isEmpty()) {
- EpochEntry firstBeforeStartOffset = removedEntries.get(removedEntries.size() - 1);
- EpochEntry updatedFirstEntry = new EpochEntry(firstBeforeStartOffset.epoch, startOffset);
- epochs.put(updatedFirstEntry.epoch, updatedFirstEntry);
-
- // We intentionally don't force flushing change to the device here because:
+ // We flush the change to the device in the background because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called as part of deleteRecords with holding UnifiedLog#lock.
// - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust
- // - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be recovered by
- // another truncateFromStart call on log loading procedure so it won't be a problem
- writeToFile(false);
+ // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
+ // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
+ // another truncateFromStart call on log loading procedure, so it won't be a problem
+ scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation);
+ EpochEntry updatedFirstEntry = removedEntries.get(removedEntries.size() - 1);
log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size());
}
} finally {
@@ -363,6 +389,27 @@ public void truncateFromStart(long startOffset) {
}
}
+ private static List truncateFromStart(TreeMap epochs, long startOffset) {
+ List removedEntries = removeWhileMatching(
+ epochs.entrySet().iterator(), entry -> entry.startOffset <= startOffset);
+
+ if (!removedEntries.isEmpty()) {
+ EpochEntry firstBeforeStartOffset = removedEntries.get(removedEntries.size() - 1);
+ EpochEntry updatedFirstEntry = new EpochEntry(firstBeforeStartOffset.epoch, startOffset);
+ epochs.put(updatedFirstEntry.epoch, updatedFirstEntry);
+ }
+
+ return removedEntries;
+ }
+
+ private static List truncateFromEnd(TreeMap epochs, long endOffset) {
+ Optional epochEntry = Optional.ofNullable(epochs.lastEntry()).map(Entry::getValue);
+ if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) {
+ return removeWhileMatching(epochs.descendingMap().entrySet().iterator(), x -> x.startOffset >= endOffset);
+ }
+ return Collections.emptyList();
+ }
+
public OptionalInt epochForOffset(long offset) {
lock.readLock().lock();
try {
@@ -386,11 +433,39 @@ public OptionalInt epochForOffset(long offset) {
}
}
- public LeaderEpochFileCache writeTo(LeaderEpochCheckpoint leaderEpochCheckpoint) {
+ /**
+ * Returns a new LeaderEpochFileCache which contains same
+ * epoch entries with replacing backing checkpoint file.
+ * @param leaderEpochCheckpoint the new checkpoint file
+ * @return a new LeaderEpochFileCache instance
+ */
+ public LeaderEpochFileCache withCheckpoint(LeaderEpochCheckpointFile leaderEpochCheckpoint) {
+ lock.readLock().lock();
+ try {
+ return new LeaderEpochFileCache(epochEntries(),
+ topicPartition,
+ leaderEpochCheckpoint,
+ scheduler);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Returns the leader epoch entries within the range of the given start and end offset
+ * @param startOffset The start offset of the epoch entries (inclusive).
+ * @param endOffset The end offset of the epoch entries (exclusive)
+ * @return the leader epoch entries
+ */
+ public List epochEntriesInRange(long startOffset, long endOffset) {
lock.readLock().lock();
try {
- leaderEpochCheckpoint.write(epochEntries());
- return new LeaderEpochFileCache(topicPartition, leaderEpochCheckpoint);
+ TreeMap epochsCopy = new TreeMap<>(this.epochs);
+ if (startOffset >= 0) {
+ truncateFromStart(epochsCopy, startOffset);
+ }
+ truncateFromEnd(epochsCopy, endOffset);
+ return new ArrayList<>(epochsCopy.values());
} finally {
lock.readLock().unlock();
}
@@ -403,7 +478,7 @@ public void clearAndFlush() {
lock.writeLock().lock();
try {
epochs.clear();
- writeToFile(true);
+ writeToFile();
} finally {
lock.writeLock().unlock();
}
@@ -440,12 +515,23 @@ public NavigableMap epochWithOffsets() {
}
}
- private void writeToFile(boolean sync) {
+ private void writeToFile() {
+ lock.readLock().lock();
+ try {
+ checkpoint.write(epochs.values());
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ private void writeToFileForTruncation() {
+ List entries;
lock.readLock().lock();
try {
- checkpoint.write(epochs.values(), sync);
+ entries = new ArrayList<>(epochs.values());
} finally {
lock.readLock().unlock();
}
+ checkpoint.writeForTruncation(entries);
}
}