From 22be0ba9a8cfc934291f75175d4f7717f661583e Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Mon, 3 Jun 2024 13:48:57 +0900 Subject: [PATCH] fix indirect truncation for RLM --- .../kafka/log/remote/RemoteLogManager.java | 46 ++++-- core/src/main/scala/kafka/log/LogLoader.scala | 4 +- .../src/main/scala/kafka/log/UnifiedLog.scala | 30 +--- .../log/remote/RemoteLogManagerTest.java | 61 ++++---- .../scala/unit/kafka/log/LogLoaderTest.scala | 2 +- .../scala/unit/kafka/log/LogSegmentTest.scala | 13 +- .../InMemoryLeaderEpochCheckpointTest.scala | 58 -------- .../epoch/LeaderEpochFileCacheTest.scala | 47 +++--- .../InMemoryLeaderEpochCheckpoint.java | 63 -------- .../checkpoint/LeaderEpochCheckpoint.java | 32 ----- .../checkpoint/LeaderEpochCheckpointFile.java | 5 +- .../internals/epoch/LeaderEpochFileCache.java | 134 +++++++++++------- 12 files changed, 181 insertions(+), 314 deletions(-) delete mode 100644 core/src/test/scala/unit/kafka/server/checkpoints/InMemoryLeaderEpochCheckpointTest.scala delete mode 100644 storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java delete mode 100644 storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 6e74f713c8a9a..7ba602f9c9b64 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -43,6 +43,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager; import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager; @@ -57,7 +58,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteStorageException; import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; import org.apache.kafka.server.metrics.KafkaMetricsGroup; -import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; import org.apache.kafka.storage.internals.log.AbortedTxn; import org.apache.kafka.storage.internals.log.EpochEntry; @@ -79,12 +80,16 @@ import scala.Option; import scala.collection.JavaConverters; +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStreamWriter; import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.security.PrivilegedAction; import java.util.ArrayList; @@ -561,25 +566,23 @@ public boolean isCancelled() { } /** - * Returns the leader epoch checkpoint by truncating with the given start[exclusive] and end[inclusive] offset + * Returns the leader epoch entries within the range of the given start[exclusive] and end[inclusive] offset. + *

+ * Visible for testing. * * @param log The actual log from where to take the leader-epoch checkpoint - * @param startOffset The start offset of the checkpoint file (exclusive in the truncation). + * @param startOffset The start offset of the epoch entries (exclusive). * If start offset is 6, then it will retain an entry at offset 6. - * @param endOffset The end offset of the checkpoint file (inclusive in the truncation) + * @param endOffset The end offset of the epoch entries (inclusive) * If end offset is 100, then it will remove the entries greater than or equal to 100. - * @return the truncated leader epoch checkpoint + * @return the leader epoch entries */ - InMemoryLeaderEpochCheckpoint getLeaderEpochCheckpoint(UnifiedLog log, long startOffset, long endOffset) { - InMemoryLeaderEpochCheckpoint checkpoint = new InMemoryLeaderEpochCheckpoint(); + List getLeaderEpochEntries(UnifiedLog log, long startOffset, long endOffset) { if (log.leaderEpochCache().isDefined()) { - LeaderEpochFileCache cache = log.leaderEpochCache().get().writeTo(checkpoint); - if (startOffset >= 0) { - cache.truncateFromStart(startOffset, true); - } - cache.truncateFromEnd(endOffset, true); + return log.leaderEpochCache().get().epochEntriesInRange(startOffset, endOffset); + } else { + return Collections.emptyList(); } - return checkpoint; } class RLMTask extends CancellableRunnable { @@ -736,7 +739,7 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment long endOffset = nextSegmentBaseOffset - 1; File producerStateSnapshotFile = log.producerStateManager().fetchSnapshot(nextSegmentBaseOffset).orElse(null); - List epochEntries = getLeaderEpochCheckpoint(log, segment.baseOffset(), nextSegmentBaseOffset).read(); + List epochEntries = getLeaderEpochEntries(log, segment.baseOffset(), nextSegmentBaseOffset); Map segmentLeaderEpochs = new HashMap<>(epochEntries.size()); epochEntries.forEach(entry -> segmentLeaderEpochs.put(entry.epoch, entry.startOffset)); @@ -746,7 +749,7 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment remoteLogMetadataManager.addRemoteLogSegmentMetadata(copySegmentStartedRlsm).get(); - ByteBuffer leaderEpochsIndex = getLeaderEpochCheckpoint(log, -1, nextSegmentBaseOffset).readAsByteBuffer(); + ByteBuffer leaderEpochsIndex = epochEntriesAsByteBuffer(getLeaderEpochEntries(log, -1, nextSegmentBaseOffset)); LogSegmentData segmentData = new LogSegmentData(logFile.toPath(), toPathIfExists(segment.offsetIndex().file()), toPathIfExists(segment.timeIndex().file()), Optional.ofNullable(toPathIfExists(segment.txnIndex().file())), producerStateSnapshotFile.toPath(), leaderEpochsIndex); @@ -1692,6 +1695,19 @@ private static void shutdownAndAwaitTermination(ExecutorService pool, String poo LOGGER.info("Shutting down of thread pool {} is completed", poolName); } + //Visible for testing + static ByteBuffer epochEntriesAsByteBuffer(List epochEntries) throws IOException { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(stream, StandardCharsets.UTF_8))) { + CheckpointFile.CheckpointWriteBuffer writeBuffer = + new CheckpointFile.CheckpointWriteBuffer<>(writer, 0, LeaderEpochCheckpointFile.FORMATTER); + writeBuffer.write(epochEntries); + writer.flush(); + } + + return ByteBuffer.wrap(stream.toByteArray()); + } + private void removeRemoteTopicPartitionMetrics(TopicIdPartition topicIdPartition) { String topic = topicIdPartition.topic(); if (!brokerTopicStats.isTopicStatsExisted(topicIdPartition.topic())) { diff --git a/core/src/main/scala/kafka/log/LogLoader.scala b/core/src/main/scala/kafka/log/LogLoader.scala index 79cb958a0d788..b0f1fdd0e1ca2 100644 --- a/core/src/main/scala/kafka/log/LogLoader.scala +++ b/core/src/main/scala/kafka/log/LogLoader.scala @@ -173,14 +173,14 @@ class LogLoader( } } - leaderEpochCache.ifPresent(_.truncateFromEnd(nextOffset, false)) + leaderEpochCache.ifPresent(_.truncateFromEnd(nextOffset)) val newLogStartOffset = if (isRemoteLogEnabled) { logStartOffsetCheckpoint } else { math.max(logStartOffsetCheckpoint, segments.firstSegment.get.baseOffset) } // The earliest leader epoch may not be flushed during a hard failure. Recover it here. - leaderEpochCache.ifPresent(_.truncateFromStart(logStartOffsetCheckpoint, false)) + leaderEpochCache.ifPresent(_.truncateFromStart(logStartOffsetCheckpoint)) // Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here // from scratch. diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 60cc2eaba6d18..8302123fd1fb5 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1016,15 +1016,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, updatedLogStartOffset = true updateLogStartOffset(newLogStartOffset) info(s"Incremented log start offset to $newLogStartOffset due to $reason") - // We flush the change to the device in the background because: - // - To avoid fsync latency - // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives - // * This method is called as part of deleteRecords with holding UnifiedLog#lock. - // - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust - // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. - // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by - // another truncateFromStart call on log loading procedure, so it won't be a problem - leaderEpochCache.foreach(_.truncateFromStart(logStartOffset, false)) + leaderEpochCache.foreach(_.truncateFromStart(logStartOffset)) producerStateManager.onLogStartOffsetIncremented(newLogStartOffset) maybeIncrementFirstUnstableOffset() } @@ -1815,15 +1807,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, // and inserted the first start offset entry, but then failed to append any entries // before another leader was elected. lock synchronized { - // We flush the change to the device in the background because: - // - To avoid fsync latency - // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives - // * This method is called by ReplicaFetcher threads, which could block replica fetching - // then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. - // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. - // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by - // another truncateFromEnd call on log loading procedure, so it won't be a problem - leaderEpochCache.foreach(_.truncateFromEnd(logEndOffset, false)) + leaderEpochCache.foreach(_.truncateFromEnd(logEndOffset)) } false @@ -1836,15 +1820,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else { val deletedSegments = localLog.truncateTo(targetOffset) deleteProducerSnapshots(deletedSegments, asyncDelete = true) - // We flush the change to the device in the background because: - // - To avoid fsync latency - // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives - // * This method is called by ReplicaFetcher threads, which could block replica fetching - // then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. - // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. - // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by - // another truncateFromEnd call on log loading procedure, so it won't be a problem - leaderEpochCache.foreach(_.truncateFromEnd(targetOffset, false)) + leaderEpochCache.foreach(_.truncateFromEnd(targetOffset)) logStartOffset = math.min(targetOffset, logStartOffset) rebuildProducerState(targetOffset, producerStateManager) if (highWatermark >= localLog.logEndOffset) diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index e7ef344a45341..7fda9c6e37cb0 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -58,14 +58,14 @@ import org.apache.kafka.server.metrics.KafkaMetricsGroup; import org.apache.kafka.server.metrics.KafkaYammerMetrics; import org.apache.kafka.server.util.MockScheduler; -import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint; -import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; import org.apache.kafka.storage.internals.log.EpochEntry; import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.FetchIsolation; import org.apache.kafka.storage.internals.log.LazyIndex; import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; import org.apache.kafka.storage.internals.log.LogFileUtils; import org.apache.kafka.storage.internals.log.LogSegment; import org.apache.kafka.storage.internals.log.OffsetIndex; @@ -87,16 +87,19 @@ import scala.Option; import scala.collection.JavaConverters; +import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.File; import java.io.InputStream; import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStreamReader; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -188,19 +191,7 @@ public class RemoteLogManagerTest { private final EpochEntry epochEntry1 = new EpochEntry(1, 100); private final EpochEntry epochEntry2 = new EpochEntry(2, 200); private final List totalEpochEntries = Arrays.asList(epochEntry0, epochEntry1, epochEntry2); - private final LeaderEpochCheckpoint checkpoint = new LeaderEpochCheckpoint() { - List epochs = Collections.emptyList(); - - @Override - public void write(Collection epochs) { - this.epochs = new ArrayList<>(epochs); - } - - @Override - public List read() { - return epochs; - } - }; + private LeaderEpochCheckpointFile checkpoint; private final AtomicLong currentLogStartOffset = new AtomicLong(0L); private final UnifiedLog mockLog = mock(UnifiedLog.class); @@ -209,6 +200,7 @@ public List read() { @BeforeEach void setUp() throws Exception { + checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1)); topicIds.put(leaderTopicIdPartition.topicPartition().topic(), leaderTopicIdPartition.topicId()); topicIds.put(followerTopicIdPartition.topicPartition().topic(), followerTopicIdPartition.topicId()); Properties props = kafka.utils.TestUtils.createDummyBrokerConfig(); @@ -248,11 +240,9 @@ void testGetLeaderEpochCheckpoint() { checkpoint.write(totalEpochEntries); LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); - InMemoryLeaderEpochCheckpoint inMemoryCheckpoint = remoteLogManager.getLeaderEpochCheckpoint(mockLog, 0, 300); - assertEquals(totalEpochEntries, inMemoryCheckpoint.read()); + assertEquals(totalEpochEntries, remoteLogManager.getLeaderEpochEntries(mockLog, 0, 300)); - InMemoryLeaderEpochCheckpoint inMemoryCheckpoint2 = remoteLogManager.getLeaderEpochCheckpoint(mockLog, 100, 200); - List epochEntries = inMemoryCheckpoint2.read(); + List epochEntries = remoteLogManager.getLeaderEpochEntries(mockLog, 100, 200); assertEquals(1, epochEntries.size()); assertEquals(epochEntry1, epochEntries.get(0)); } @@ -1124,9 +1114,7 @@ private void verifyLogSegmentData(LogSegmentData logSegmentData, assertEquals(tempFile.getAbsolutePath(), logSegmentData.logSegment().toAbsolutePath().toString()); assertEquals(mockProducerSnapshotIndex.getAbsolutePath(), logSegmentData.producerSnapshotIndex().toAbsolutePath().toString()); - InMemoryLeaderEpochCheckpoint inMemoryLeaderEpochCheckpoint = new InMemoryLeaderEpochCheckpoint(); - inMemoryLeaderEpochCheckpoint.write(expectedLeaderEpoch); - assertEquals(inMemoryLeaderEpochCheckpoint.readAsByteBuffer(), logSegmentData.leaderEpochIndex()); + assertEquals(RemoteLogManager.epochEntriesAsByteBuffer(expectedLeaderEpoch), logSegmentData.leaderEpochIndex()); } @Test @@ -2305,11 +2293,17 @@ private List listRemoteLogSegmentMetadataByTime(TopicI private Map truncateAndGetLeaderEpochs(List entries, Long startOffset, Long endOffset) { - InMemoryLeaderEpochCheckpoint myCheckpoint = new InMemoryLeaderEpochCheckpoint(); + LeaderEpochCheckpointFile myCheckpoint; + try { + myCheckpoint = new LeaderEpochCheckpointFile( + TestUtils.tempFile(), new LogDirFailureChannel(1)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } myCheckpoint.write(entries); LeaderEpochFileCache cache = new LeaderEpochFileCache(null, myCheckpoint, scheduler); - cache.truncateFromStart(startOffset, true); - cache.truncateFromEnd(endOffset, true); + cache.truncateFromStart(startOffset); + cache.truncateFromEnd(endOffset); return myCheckpoint.read().stream().collect(Collectors.toMap(e -> e.epoch, e -> e.startOffset)); } @@ -2534,6 +2528,21 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l } } + @Test + public void testEpochEntriesAsByteBuffer() throws Exception { + int expectedEpoch = 0; + long expectedStartOffset = 1L; + int expectedVersion = 0; + List epochs = Arrays.asList(new EpochEntry(expectedEpoch, expectedStartOffset)); + ByteBuffer buffer = RemoteLogManager.epochEntriesAsByteBuffer(epochs); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(buffer.array()), StandardCharsets.UTF_8)); + + assertEquals(String.valueOf(expectedVersion), bufferedReader.readLine()); + assertEquals(String.valueOf(epochs.size()), bufferedReader.readLine()); + assertEquals(expectedEpoch + " " + expectedStartOffset, bufferedReader.readLine()); + } + + private Partition mockPartition(TopicIdPartition topicIdPartition) { TopicPartition tp = topicIdPartition.topicPartition(); Partition partition = mock(Partition.class); diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index 805fc671bf144..f8cc1f01efce9 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -1393,7 +1393,7 @@ class LogLoaderTest { assertEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new EpochEntry(2, 1), new EpochEntry(3, 3)), leaderEpochCache.epochEntries) // deliberately remove some of the epoch entries - leaderEpochCache.truncateFromEnd(2, false) + leaderEpochCache.truncateFromEnd(2) assertNotEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new EpochEntry(2, 1), new EpochEntry(3, 3)), leaderEpochCache.epochEntries) log.close() diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index bac6a0808bc06..decbc09382b1a 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -25,7 +25,7 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{MockTime, Time, Utils} import org.apache.kafka.coordinator.transaction.TransactionLogConfigs import org.apache.kafka.server.util.MockScheduler -import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.log._ import org.junit.jupiter.api.Assertions._ @@ -34,7 +34,6 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.CsvSource import java.io.{File, RandomAccessFile} -import java.util import java.util.{Optional, OptionalLong} import scala.collection._ import scala.jdk.CollectionConverters._ @@ -404,15 +403,7 @@ class LogSegmentTest { def testRecoveryRebuildsEpochCache(): Unit = { val seg = createSegment(0) - val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint { - private var epochs = Seq.empty[EpochEntry] - - override def write(epochs: util.Collection[EpochEntry]): Unit = { - this.epochs = epochs.asScala.toSeq - } - - override def read(): java.util.List[EpochEntry] = this.epochs.asJava - } + val checkpoint: LeaderEpochCheckpointFile = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1)) val cache = new LeaderEpochFileCache(topicPartition, checkpoint, new MockScheduler(new MockTime())) seg.append(105L, RecordBatch.NO_TIMESTAMP, 104L, MemoryRecords.withRecords(104L, Compression.NONE, 0, diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/InMemoryLeaderEpochCheckpointTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/InMemoryLeaderEpochCheckpointTest.scala deleted file mode 100644 index 3af126f5c5529..0000000000000 --- a/core/src/test/scala/unit/kafka/server/checkpoints/InMemoryLeaderEpochCheckpointTest.scala +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.server.checkpoints - -import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint -import org.apache.kafka.storage.internals.log.EpochEntry -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test - -import java.io.{BufferedReader, ByteArrayInputStream, InputStreamReader} -import java.nio.charset.StandardCharsets - -class InMemoryLeaderEpochCheckpointTest { - - @Test - def shouldAppendNewEntry(): Unit = { - val checkpoint = new InMemoryLeaderEpochCheckpoint() - val epochs = java.util.Arrays.asList(new EpochEntry(0, 1L), new EpochEntry(1, 2L), new EpochEntry(2, 3L)) - checkpoint.write(epochs) - assertEquals(epochs, checkpoint.read()) - - val epochs2 = java.util.Arrays.asList(new EpochEntry(3, 4L), new EpochEntry(4, 5L)) - checkpoint.write(epochs2) - - assertEquals(epochs2, checkpoint.read()) - } - - @Test - def testReadAsByteBuffer(): Unit = { - val checkpoint = new InMemoryLeaderEpochCheckpoint() - val expectedEpoch = 0 - val expectedStartOffset = 1L - val expectedVersion = 0 - val epochs = java.util.Arrays.asList(new EpochEntry(expectedEpoch, expectedStartOffset)) - checkpoint.write(epochs) - assertEquals(epochs, checkpoint.read()) - val buffer = checkpoint.readAsByteBuffer() - - val bufferedReader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(buffer.array()), StandardCharsets.UTF_8)) - assertEquals(expectedVersion.toString, bufferedReader.readLine()) - assertEquals(epochs.size().toString, bufferedReader.readLine()) - assertEquals(s"$expectedEpoch $expectedStartOffset", bufferedReader.readLine()) - } -} diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala index c6b5f0c7798a3..b9e7542e4cc5b 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala @@ -21,7 +21,7 @@ import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import org.apache.kafka.server.util.MockTime -import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpoint, LeaderEpochCheckpointFile} +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.log.{EpochEntry, LogDirFailureChannel} import org.junit.jupiter.api.Assertions._ @@ -29,7 +29,6 @@ import org.junit.jupiter.api.Test import java.io.File import java.util.{Collections, Optional, OptionalInt} -import scala.collection.Seq import scala.jdk.CollectionConverters._ /** @@ -38,11 +37,7 @@ import scala.jdk.CollectionConverters._ class LeaderEpochFileCacheTest { val tp = new TopicPartition("TestTopic", 5) val mockTime = new MockTime() - private val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint { - private var epochs: Seq[EpochEntry] = Seq() - override def write(epochs: java.util.Collection[EpochEntry]): Unit = this.epochs = epochs.asScala.toSeq - override def read(): java.util.List[EpochEntry] = this.epochs.asJava - } + private val checkpoint: LeaderEpochCheckpointFile = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1)) private val cache = new LeaderEpochFileCache(tp, checkpoint, mockTime.scheduler) @@ -59,7 +54,7 @@ class LeaderEpochFileCacheTest { cache.assign(10, 20) assertEquals(OptionalInt.of(4), cache.previousEpoch) - cache.truncateFromEnd(18, false) + cache.truncateFromEnd(18) assertEquals(OptionalInt.of(2), cache.previousEpoch) } @@ -389,7 +384,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When clear latest on epoch boundary - cache.truncateFromEnd(8, false) + cache.truncateFromEnd(8) //Then should remove two latest epochs (remove is inclusive) assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6)), cache.epochEntries) @@ -403,7 +398,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset ON epoch boundary - cache.truncateFromStart(8, false) + cache.truncateFromStart(8) //Then should preserve (3, 8) assertEquals(java.util.Arrays.asList(new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries) @@ -417,7 +412,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset BETWEEN epoch boundaries - cache.truncateFromStart(9, false) + cache.truncateFromStart(9) //Then we should retain epoch 3, but update it's offset to 9 as 8 has been removed assertEquals(java.util.Arrays.asList(new EpochEntry(3, 9), new EpochEntry(4, 11)), cache.epochEntries) @@ -431,7 +426,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset before first epoch offset - cache.truncateFromStart(1, false) + cache.truncateFromStart(1) //Then nothing should change assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6),new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries) @@ -445,7 +440,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset on earliest epoch boundary - cache.truncateFromStart(6, false) + cache.truncateFromStart(6) //Then nothing should change assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6),new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries) @@ -459,7 +454,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When - cache.truncateFromStart(11, false) + cache.truncateFromStart(11) //Then retain the last assertEquals(Collections.singletonList(new EpochEntry(4, 11)), cache.epochEntries) @@ -473,7 +468,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When we clear from a position between offset 8 & offset 11 - cache.truncateFromStart(9, false) + cache.truncateFromStart(9) //Then we should update the middle epoch entry's offset assertEquals(java.util.Arrays.asList(new EpochEntry(3, 9), new EpochEntry(4, 11)), cache.epochEntries) @@ -487,7 +482,7 @@ class LeaderEpochFileCacheTest { cache.assign(2, 10) //When we clear from a position between offset 0 & offset 7 - cache.truncateFromStart(5, false) + cache.truncateFromStart(5) //Then we should keep epoch 0 but update the offset appropriately assertEquals(java.util.Arrays.asList(new EpochEntry(0,5), new EpochEntry(1, 7), new EpochEntry(2, 10)), @@ -502,7 +497,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset beyond last epoch - cache.truncateFromStart(15, false) + cache.truncateFromStart(15) //Then update the last assertEquals(Collections.singletonList(new EpochEntry(4, 15)), cache.epochEntries) @@ -516,7 +511,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset BETWEEN epoch boundaries - cache.truncateFromEnd(9, false) + cache.truncateFromEnd( 9) //Then should keep the preceding epochs assertEquals(OptionalInt.of(3), cache.latestEpoch) @@ -545,7 +540,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset on epoch boundary - cache.truncateFromStart(UNDEFINED_EPOCH_OFFSET, false) + cache.truncateFromStart(UNDEFINED_EPOCH_OFFSET) //Then should do nothing assertEquals(3, cache.epochEntries.size) @@ -559,7 +554,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset on epoch boundary - cache.truncateFromEnd(UNDEFINED_EPOCH_OFFSET, false) + cache.truncateFromEnd(UNDEFINED_EPOCH_OFFSET) //Then should do nothing assertEquals(3, cache.epochEntries.size) @@ -580,13 +575,13 @@ class LeaderEpochFileCacheTest { @Test def shouldClearEarliestOnEmptyCache(): Unit = { //Then - cache.truncateFromStart(7, false) + cache.truncateFromStart(7) } @Test def shouldClearLatestOnEmptyCache(): Unit = { //Then - cache.truncateFromEnd(7, false) + cache.truncateFromEnd(7) } @Test @@ -602,7 +597,7 @@ class LeaderEpochFileCacheTest { cache.assign(10, 20) assertEquals(OptionalInt.of(4), cache.previousEpoch(10)) - cache.truncateFromEnd(18, false) + cache.truncateFromEnd(18) assertEquals(OptionalInt.of(2), cache.previousEpoch(cache.latestEpoch.getAsInt)) } @@ -619,7 +614,7 @@ class LeaderEpochFileCacheTest { cache.assign(10, 20) assertEquals(Optional.of(new EpochEntry(4, 15)), cache.previousEntry(10)) - cache.truncateFromEnd(18, false) + cache.truncateFromEnd(18) assertEquals(Optional.of(new EpochEntry(2, 10)), cache.previousEntry(cache.latestEpoch.getAsInt)) } @@ -666,8 +661,8 @@ class LeaderEpochFileCacheTest { cache.assign(3, 8) cache.assign(4, 11) - cache.truncateFromEnd(11, false) - cache.truncateFromStart(8, false) + cache.truncateFromEnd(11) + cache.truncateFromStart(8) assertEquals(List(new EpochEntry(3, 8)).asJava, checkpoint.read()) } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java deleted file mode 100644 index 472441be3872a..0000000000000 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.storage.internals.checkpoint; - -import org.apache.kafka.server.common.CheckpointFile; -import org.apache.kafka.storage.internals.log.EpochEntry; - -import java.io.BufferedWriter; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -/** - * This class stores a list of EpochEntry(LeaderEpoch + Offsets) to memory - * - * The motivation for this class is to allow remote log manager to create the RemoteLogSegmentMetadata(RLSM) - * with the correct leader epoch info for a specific segment. To do that, we need to rely on the LeaderEpochCheckpointCache - * to truncate from start and end, to get the epoch info. However, we don't really want to truncate the epochs in cache - * (and write to checkpoint file in the end). So, we introduce this InMemoryLeaderEpochCheckpoint to feed into LeaderEpochCheckpointCache, - * and when we truncate the epoch for RLSM, we can do them in memory without affecting the checkpoint file, and without interacting with file system. - */ -public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint { - private List epochs = Collections.emptyList(); - - public void write(Collection epochs) { - this.epochs = new ArrayList<>(epochs); - } - - public List read() { - return Collections.unmodifiableList(epochs); - } - - public ByteBuffer readAsByteBuffer() throws IOException { - ByteArrayOutputStream stream = new ByteArrayOutputStream(); - try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(stream, StandardCharsets.UTF_8))) { - CheckpointFile.CheckpointWriteBuffer writeBuffer = new CheckpointFile.CheckpointWriteBuffer<>(writer, 0, LeaderEpochCheckpointFile.FORMATTER); - writeBuffer.write(epochs); - writer.flush(); - } - - return ByteBuffer.wrap(stream.toByteArray()); - } -} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java deleted file mode 100644 index 29e5c75b97717..0000000000000 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.storage.internals.checkpoint; - -import org.apache.kafka.storage.internals.log.EpochEntry; - -import java.util.Collection; -import java.util.List; - -public interface LeaderEpochCheckpoint { - void write(Collection epochs); - - default void writeForTruncation(Collection epochs) { - write(epochs); - } - - List read(); -} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java index 6c6b2de2780a5..392a36533400f 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java @@ -38,7 +38,7 @@ * 1 2 * -----checkpoint file end---------- */ -public class LeaderEpochCheckpointFile implements LeaderEpochCheckpoint { +public class LeaderEpochCheckpointFile { public static final Formatter FORMATTER = new Formatter(); @@ -52,12 +52,10 @@ public LeaderEpochCheckpointFile(File file, LogDirFailureChannel logDirFailureCh checkpoint = new CheckpointFileWithFailureHandler<>(file, CURRENT_VERSION, FORMATTER, logDirFailureChannel, file.getParentFile().getParent()); } - @Override public void write(Collection epochs) { checkpoint.write(epochs); } - @Override public void writeForTruncation(Collection epochs) { // Writing epoch entries after truncation is done asynchronously for performance reasons. // This could cause NoSuchFileException when the directory is renamed concurrently for topic deletion, @@ -65,7 +63,6 @@ public void writeForTruncation(Collection epochs) { checkpoint.writeIfDirExists(epochs); } - @Override public List read() { return checkpoint.read(); } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java index d84f191db2723..d7d5fd61f2ec7 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java @@ -19,15 +19,17 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.server.util.Scheduler; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; import org.apache.kafka.storage.internals.log.EpochEntry; -import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint; import org.slf4j.Logger; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.NavigableMap; import java.util.Optional; import java.util.OptionalInt; @@ -44,13 +46,13 @@ * Leader Epoch = epoch assigned to each leader by the controller. * Offset = offset of the first message in each epoch. *

- * Note that {@link #truncateFromStart},{@link #truncateFromEnd} may flush the epoch-entry changes to checkpoint asynchronously. + * Note that {@link #truncateFromStart},{@link #truncateFromEnd} flush the epoch-entry changes to checkpoint asynchronously. * Hence, it is instantiater's responsibility to ensure restoring the cache to the correct state after instantiating * this class from checkpoint (which might contain stale epoch entries right after instantiation). */ public class LeaderEpochFileCache { private final TopicPartition topicPartition; - private final LeaderEpochCheckpoint checkpoint; + private final LeaderEpochCheckpointFile checkpoint; private final Scheduler scheduler; private final Logger log; @@ -64,7 +66,7 @@ public class LeaderEpochFileCache { * @param scheduler the scheduler to use for async I/O operations */ @SuppressWarnings("this-escape") - public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint checkpoint, Scheduler scheduler) { + public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpointFile checkpoint, Scheduler scheduler) { this.checkpoint = checkpoint; this.topicPartition = topicPartition; this.scheduler = scheduler; @@ -83,7 +85,7 @@ public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint */ private LeaderEpochFileCache(List epochEntries, TopicPartition topicPartition, - LeaderEpochCheckpoint checkpoint, + LeaderEpochCheckpointFile checkpoint, Scheduler scheduler) { this.checkpoint = checkpoint; this.topicPartition = topicPartition; @@ -147,7 +149,9 @@ private boolean assign(EpochEntry entry) { * Remove any entries which violate monotonicity prior to appending a new entry */ private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) { - List removedEpochs = removeFromEnd(entry -> entry.epoch >= newEntry.epoch || entry.startOffset >= newEntry.startOffset); + List removedEpochs = removeFromEnd( + epochs, + entry -> entry.epoch >= newEntry.epoch || entry.startOffset >= newEntry.startOffset); if (removedEpochs.size() > 1 || (!removedEpochs.isEmpty() && removedEpochs.get(0).startOffset != newEntry.startOffset)) { @@ -158,15 +162,17 @@ private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) { } } - private List removeFromEnd(Predicate predicate) { + private static List removeFromEnd( + TreeMap epochs, Predicate predicate) { return removeWhileMatching(epochs.descendingMap().entrySet().iterator(), predicate); } - private List removeFromStart(Predicate predicate) { + private static List removeFromStart( + TreeMap epochs, Predicate predicate) { return removeWhileMatching(epochs.entrySet().iterator(), predicate); } - private List removeWhileMatching(Iterator> iterator, Predicate predicate) { + private static List removeWhileMatching(Iterator> iterator, Predicate predicate) { ArrayList removedEpochs = new ArrayList<>(); while (iterator.hasNext()) { @@ -335,22 +341,23 @@ public Map.Entry endOffsetFor(int requestedEpoch, long logEndOffs /** * Removes all epoch entries from the store with start offsets greater than or equal to the passed offset. - * - * @param endOffset the offset to clear up to - * @param flushSync if true, the method will block until the changes are flushed to file + *

+ * Checkpoint-flushing is done asynchronously. */ - public void truncateFromEnd(long endOffset, boolean flushSync) { + public void truncateFromEnd(long endOffset) { lock.writeLock().lock(); try { - Optional epochEntry = latestEntry(); - if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) { - List removedEntries = removeFromEnd(x -> x.startOffset >= endOffset); - - if (flushSync) { - writeToFileForTruncation(); - } else { - scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); - } + List removedEntries = truncateFromEnd(epochs, endOffset); + if (!removedEntries.isEmpty()) { + // We flush the change to the device in the background because: + // - To avoid fsync latency + // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives + // * This method is called by ReplicaFetcher threads, which could block replica fetching + // then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. + // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. + // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by + // another truncateFromEnd call on log loading procedure, so it won't be a problem + scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size()); } @@ -364,26 +371,27 @@ public void truncateFromEnd(long endOffset, boolean flushSync) { * be offset, then clears any previous epoch entries. *

* This method is exclusive: so truncateFromStart(6) will retain an entry at offset 6. + *

+ * Checkpoint-flushing is done asynchronously. * * @param startOffset the offset to clear up to - * @param flushSync if true, the method will block until the changes are flushed to file */ - public void truncateFromStart(long startOffset, boolean flushSync) { + public void truncateFromStart(long startOffset) { lock.writeLock().lock(); try { - List removedEntries = removeFromStart(entry -> entry.startOffset <= startOffset); - + List removedEntries = truncateFromStart(epochs, startOffset); if (!removedEntries.isEmpty()) { - EpochEntry firstBeforeStartOffset = removedEntries.get(removedEntries.size() - 1); - EpochEntry updatedFirstEntry = new EpochEntry(firstBeforeStartOffset.epoch, startOffset); - epochs.put(updatedFirstEntry.epoch, updatedFirstEntry); - - if (flushSync) { - writeToFileForTruncation(); - } else { - scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); - } - + // We flush the change to the device in the background because: + // - To avoid fsync latency + // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives + // * This method is called as part of deleteRecords with holding UnifiedLog#lock. + // - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust + // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. + // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by + // another truncateFromStart call on log loading procedure, so it won't be a problem + scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); + + EpochEntry updatedFirstEntry = removedEntries.get(removedEntries.size() - 1); log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size()); } } finally { @@ -391,6 +399,26 @@ public void truncateFromStart(long startOffset, boolean flushSync) { } } + private static List truncateFromStart(TreeMap epochs, long startOffset) { + List removedEntries = removeFromStart(epochs, entry -> entry.startOffset <= startOffset); + + if (!removedEntries.isEmpty()) { + EpochEntry firstBeforeStartOffset = removedEntries.get(removedEntries.size() - 1); + EpochEntry updatedFirstEntry = new EpochEntry(firstBeforeStartOffset.epoch, startOffset); + epochs.put(updatedFirstEntry.epoch, updatedFirstEntry); + } + + return removedEntries; + } + + private static List truncateFromEnd(TreeMap epochs, long endOffset) { + Optional epochEntry = Optional.ofNullable(epochs.lastEntry()).map(Entry::getValue); + if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) { + return removeFromEnd(epochs, x -> x.startOffset >= endOffset); + } + return Collections.emptyList(); + } + public OptionalInt epochForOffset(long offset) { lock.readLock().lock(); try { @@ -414,25 +442,13 @@ public OptionalInt epochForOffset(long offset) { } } - public LeaderEpochFileCache writeTo(LeaderEpochCheckpoint leaderEpochCheckpoint) { - lock.readLock().lock(); - try { - leaderEpochCheckpoint.write(epochEntries()); - // We instantiate LeaderEpochFileCache after writing leaderEpochCheckpoint, - // hence it is guaranteed that the new cache is consistent with the latest epoch entries. - return new LeaderEpochFileCache(topicPartition, leaderEpochCheckpoint, scheduler); - } finally { - lock.readLock().unlock(); - } - } - /** * Returns a new LeaderEpochFileCache which contains same * epoch entries with replacing backing checkpoint file. * @param leaderEpochCheckpoint the new checkpoint file * @return a new LeaderEpochFileCache instance */ - public LeaderEpochFileCache withCheckpoint(LeaderEpochCheckpoint leaderEpochCheckpoint) { + public LeaderEpochFileCache withCheckpoint(LeaderEpochCheckpointFile leaderEpochCheckpoint) { lock.readLock().lock(); try { return new LeaderEpochFileCache(epochEntries(), @@ -444,6 +460,26 @@ public LeaderEpochFileCache withCheckpoint(LeaderEpochCheckpoint leaderEpochChec } } + /** + * Returns the leader epoch entries within the range of the given start[exclusive] and end[inclusive] offset + * @param startOffset The start offset of the epoch entries (exclusive). + * @param endOffset The end offset of the epoch entries (inclusive) + * @return the leader epoch entries + */ + public List epochEntriesInRange(long startOffset, long endOffset) { + lock.readLock().lock(); + try { + TreeMap epochsCopy = new TreeMap<>(this.epochs); + if (startOffset >= 0) { + truncateFromStart(epochsCopy, startOffset); + } + truncateFromEnd(epochsCopy, endOffset); + return new ArrayList<>(epochsCopy.values()); + } finally { + lock.readLock().unlock(); + } + } + /** * Delete all entries. */