Skip to content

Commit

Permalink
fix indirect truncation for RLM
Browse files Browse the repository at this point in the history
  • Loading branch information
ocadaruma committed Jun 3, 2024
1 parent 4f6332d commit 22be0ba
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 314 deletions.
46 changes: 31 additions & 15 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.CheckpointFile;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
Expand All @@ -57,7 +58,7 @@
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.AbortedTxn;
import org.apache.kafka.storage.internals.log.EpochEntry;
Expand All @@ -79,12 +80,16 @@
import scala.Option;
import scala.collection.JavaConverters;

import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.security.PrivilegedAction;
import java.util.ArrayList;
Expand Down Expand Up @@ -561,25 +566,23 @@ public boolean isCancelled() {
}

/**
* Returns the leader epoch checkpoint by truncating with the given start[exclusive] and end[inclusive] offset
* Returns the leader epoch entries within the range of the given start[exclusive] and end[inclusive] offset.
* <p>
* Visible for testing.
*
* @param log The actual log from where to take the leader-epoch checkpoint
* @param startOffset The start offset of the checkpoint file (exclusive in the truncation).
* @param startOffset The start offset of the epoch entries (exclusive).
* If start offset is 6, then it will retain an entry at offset 6.
* @param endOffset The end offset of the checkpoint file (inclusive in the truncation)
* @param endOffset The end offset of the epoch entries (inclusive)
* If end offset is 100, then it will remove the entries greater than or equal to 100.
* @return the truncated leader epoch checkpoint
* @return the leader epoch entries
*/
InMemoryLeaderEpochCheckpoint getLeaderEpochCheckpoint(UnifiedLog log, long startOffset, long endOffset) {
InMemoryLeaderEpochCheckpoint checkpoint = new InMemoryLeaderEpochCheckpoint();
List<EpochEntry> getLeaderEpochEntries(UnifiedLog log, long startOffset, long endOffset) {
if (log.leaderEpochCache().isDefined()) {
LeaderEpochFileCache cache = log.leaderEpochCache().get().writeTo(checkpoint);
if (startOffset >= 0) {
cache.truncateFromStart(startOffset, true);
}
cache.truncateFromEnd(endOffset, true);
return log.leaderEpochCache().get().epochEntriesInRange(startOffset, endOffset);
} else {
return Collections.emptyList();
}
return checkpoint;
}

class RLMTask extends CancellableRunnable {
Expand Down Expand Up @@ -736,7 +739,7 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment
long endOffset = nextSegmentBaseOffset - 1;
File producerStateSnapshotFile = log.producerStateManager().fetchSnapshot(nextSegmentBaseOffset).orElse(null);

List<EpochEntry> epochEntries = getLeaderEpochCheckpoint(log, segment.baseOffset(), nextSegmentBaseOffset).read();
List<EpochEntry> epochEntries = getLeaderEpochEntries(log, segment.baseOffset(), nextSegmentBaseOffset);
Map<Integer, Long> segmentLeaderEpochs = new HashMap<>(epochEntries.size());
epochEntries.forEach(entry -> segmentLeaderEpochs.put(entry.epoch, entry.startOffset));

Expand All @@ -746,7 +749,7 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment

remoteLogMetadataManager.addRemoteLogSegmentMetadata(copySegmentStartedRlsm).get();

ByteBuffer leaderEpochsIndex = getLeaderEpochCheckpoint(log, -1, nextSegmentBaseOffset).readAsByteBuffer();
ByteBuffer leaderEpochsIndex = epochEntriesAsByteBuffer(getLeaderEpochEntries(log, -1, nextSegmentBaseOffset));
LogSegmentData segmentData = new LogSegmentData(logFile.toPath(), toPathIfExists(segment.offsetIndex().file()),
toPathIfExists(segment.timeIndex().file()), Optional.ofNullable(toPathIfExists(segment.txnIndex().file())),
producerStateSnapshotFile.toPath(), leaderEpochsIndex);
Expand Down Expand Up @@ -1692,6 +1695,19 @@ private static void shutdownAndAwaitTermination(ExecutorService pool, String poo
LOGGER.info("Shutting down of thread pool {} is completed", poolName);
}

//Visible for testing
static ByteBuffer epochEntriesAsByteBuffer(List<EpochEntry> epochEntries) throws IOException {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(stream, StandardCharsets.UTF_8))) {
CheckpointFile.CheckpointWriteBuffer<EpochEntry> writeBuffer =
new CheckpointFile.CheckpointWriteBuffer<>(writer, 0, LeaderEpochCheckpointFile.FORMATTER);
writeBuffer.write(epochEntries);
writer.flush();
}

return ByteBuffer.wrap(stream.toByteArray());
}

private void removeRemoteTopicPartitionMetrics(TopicIdPartition topicIdPartition) {
String topic = topicIdPartition.topic();
if (!brokerTopicStats.isTopicStatsExisted(topicIdPartition.topic())) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/LogLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,14 @@ class LogLoader(
}
}

leaderEpochCache.ifPresent(_.truncateFromEnd(nextOffset, false))
leaderEpochCache.ifPresent(_.truncateFromEnd(nextOffset))
val newLogStartOffset = if (isRemoteLogEnabled) {
logStartOffsetCheckpoint
} else {
math.max(logStartOffsetCheckpoint, segments.firstSegment.get.baseOffset)
}
// The earliest leader epoch may not be flushed during a hard failure. Recover it here.
leaderEpochCache.ifPresent(_.truncateFromStart(logStartOffsetCheckpoint, false))
leaderEpochCache.ifPresent(_.truncateFromStart(logStartOffsetCheckpoint))

// Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
// from scratch.
Expand Down
30 changes: 3 additions & 27 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1016,15 +1016,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
updatedLogStartOffset = true
updateLogStartOffset(newLogStartOffset)
info(s"Incremented log start offset to $newLogStartOffset due to $reason")
// We flush the change to the device in the background because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called as part of deleteRecords with holding UnifiedLog#lock.
// - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromStart call on log loading procedure, so it won't be a problem
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset, false))
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
producerStateManager.onLogStartOffsetIncremented(newLogStartOffset)
maybeIncrementFirstUnstableOffset()
}
Expand Down Expand Up @@ -1815,15 +1807,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// and inserted the first start offset entry, but then failed to append any entries
// before another leader was elected.
lock synchronized {
// We flush the change to the device in the background because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called by ReplicaFetcher threads, which could block replica fetching
// then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency.
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromEnd call on log loading procedure, so it won't be a problem
leaderEpochCache.foreach(_.truncateFromEnd(logEndOffset, false))
leaderEpochCache.foreach(_.truncateFromEnd(logEndOffset))
}

false
Expand All @@ -1836,15 +1820,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
} else {
val deletedSegments = localLog.truncateTo(targetOffset)
deleteProducerSnapshots(deletedSegments, asyncDelete = true)
// We flush the change to the device in the background because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called by ReplicaFetcher threads, which could block replica fetching
// then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency.
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromEnd call on log loading procedure, so it won't be a problem
leaderEpochCache.foreach(_.truncateFromEnd(targetOffset, false))
leaderEpochCache.foreach(_.truncateFromEnd(targetOffset))
logStartOffset = math.min(targetOffset, logStartOffset)
rebuildProducerState(targetOffset, producerStateManager)
if (highWatermark >= localLog.logEndOffset)
Expand Down
61 changes: 35 additions & 26 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.util.MockScheduler;
import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.FetchIsolation;
import org.apache.kafka.storage.internals.log.LazyIndex;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.internals.log.LogFileUtils;
import org.apache.kafka.storage.internals.log.LogSegment;
import org.apache.kafka.storage.internals.log.OffsetIndex;
Expand All @@ -87,16 +87,19 @@
import scala.Option;
import scala.collection.JavaConverters;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -188,19 +191,7 @@ public class RemoteLogManagerTest {
private final EpochEntry epochEntry1 = new EpochEntry(1, 100);
private final EpochEntry epochEntry2 = new EpochEntry(2, 200);
private final List<EpochEntry> totalEpochEntries = Arrays.asList(epochEntry0, epochEntry1, epochEntry2);
private final LeaderEpochCheckpoint checkpoint = new LeaderEpochCheckpoint() {
List<EpochEntry> epochs = Collections.emptyList();

@Override
public void write(Collection<EpochEntry> epochs) {
this.epochs = new ArrayList<>(epochs);
}

@Override
public List<EpochEntry> read() {
return epochs;
}
};
private LeaderEpochCheckpointFile checkpoint;
private final AtomicLong currentLogStartOffset = new AtomicLong(0L);

private final UnifiedLog mockLog = mock(UnifiedLog.class);
Expand All @@ -209,6 +200,7 @@ public List<EpochEntry> read() {

@BeforeEach
void setUp() throws Exception {
checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1));
topicIds.put(leaderTopicIdPartition.topicPartition().topic(), leaderTopicIdPartition.topicId());
topicIds.put(followerTopicIdPartition.topicPartition().topic(), followerTopicIdPartition.topicId());
Properties props = kafka.utils.TestUtils.createDummyBrokerConfig();
Expand Down Expand Up @@ -248,11 +240,9 @@ void testGetLeaderEpochCheckpoint() {
checkpoint.write(totalEpochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
InMemoryLeaderEpochCheckpoint inMemoryCheckpoint = remoteLogManager.getLeaderEpochCheckpoint(mockLog, 0, 300);
assertEquals(totalEpochEntries, inMemoryCheckpoint.read());
assertEquals(totalEpochEntries, remoteLogManager.getLeaderEpochEntries(mockLog, 0, 300));

InMemoryLeaderEpochCheckpoint inMemoryCheckpoint2 = remoteLogManager.getLeaderEpochCheckpoint(mockLog, 100, 200);
List<EpochEntry> epochEntries = inMemoryCheckpoint2.read();
List<EpochEntry> epochEntries = remoteLogManager.getLeaderEpochEntries(mockLog, 100, 200);
assertEquals(1, epochEntries.size());
assertEquals(epochEntry1, epochEntries.get(0));
}
Expand Down Expand Up @@ -1124,9 +1114,7 @@ private void verifyLogSegmentData(LogSegmentData logSegmentData,
assertEquals(tempFile.getAbsolutePath(), logSegmentData.logSegment().toAbsolutePath().toString());
assertEquals(mockProducerSnapshotIndex.getAbsolutePath(), logSegmentData.producerSnapshotIndex().toAbsolutePath().toString());

InMemoryLeaderEpochCheckpoint inMemoryLeaderEpochCheckpoint = new InMemoryLeaderEpochCheckpoint();
inMemoryLeaderEpochCheckpoint.write(expectedLeaderEpoch);
assertEquals(inMemoryLeaderEpochCheckpoint.readAsByteBuffer(), logSegmentData.leaderEpochIndex());
assertEquals(RemoteLogManager.epochEntriesAsByteBuffer(expectedLeaderEpoch), logSegmentData.leaderEpochIndex());
}

@Test
Expand Down Expand Up @@ -2305,11 +2293,17 @@ private List<RemoteLogSegmentMetadata> listRemoteLogSegmentMetadataByTime(TopicI
private Map<Integer, Long> truncateAndGetLeaderEpochs(List<EpochEntry> entries,
Long startOffset,
Long endOffset) {
InMemoryLeaderEpochCheckpoint myCheckpoint = new InMemoryLeaderEpochCheckpoint();
LeaderEpochCheckpointFile myCheckpoint;
try {
myCheckpoint = new LeaderEpochCheckpointFile(
TestUtils.tempFile(), new LogDirFailureChannel(1));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
myCheckpoint.write(entries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(null, myCheckpoint, scheduler);
cache.truncateFromStart(startOffset, true);
cache.truncateFromEnd(endOffset, true);
cache.truncateFromStart(startOffset);
cache.truncateFromEnd(endOffset);
return myCheckpoint.read().stream().collect(Collectors.toMap(e -> e.epoch, e -> e.startOffset));
}

Expand Down Expand Up @@ -2534,6 +2528,21 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l
}
}

@Test
public void testEpochEntriesAsByteBuffer() throws Exception {
int expectedEpoch = 0;
long expectedStartOffset = 1L;
int expectedVersion = 0;
List<EpochEntry> epochs = Arrays.asList(new EpochEntry(expectedEpoch, expectedStartOffset));
ByteBuffer buffer = RemoteLogManager.epochEntriesAsByteBuffer(epochs);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(buffer.array()), StandardCharsets.UTF_8));

assertEquals(String.valueOf(expectedVersion), bufferedReader.readLine());
assertEquals(String.valueOf(epochs.size()), bufferedReader.readLine());
assertEquals(expectedEpoch + " " + expectedStartOffset, bufferedReader.readLine());
}


private Partition mockPartition(TopicIdPartition topicIdPartition) {
TopicPartition tp = topicIdPartition.topicPartition();
Partition partition = mock(Partition.class);
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1393,7 +1393,7 @@ class LogLoaderTest {
assertEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new EpochEntry(2, 1), new EpochEntry(3, 3)), leaderEpochCache.epochEntries)

// deliberately remove some of the epoch entries
leaderEpochCache.truncateFromEnd(2, false)
leaderEpochCache.truncateFromEnd(2)
assertNotEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new EpochEntry(2, 1), new EpochEntry(3, 3)), leaderEpochCache.epochEntries)
log.close()

Expand Down
13 changes: 2 additions & 11 deletions core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{MockTime, Time, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.util.MockScheduler
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log._
import org.junit.jupiter.api.Assertions._
Expand All @@ -34,7 +34,6 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource

import java.io.{File, RandomAccessFile}
import java.util
import java.util.{Optional, OptionalLong}
import scala.collection._
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -404,15 +403,7 @@ class LogSegmentTest {
def testRecoveryRebuildsEpochCache(): Unit = {
val seg = createSegment(0)

val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
private var epochs = Seq.empty[EpochEntry]

override def write(epochs: util.Collection[EpochEntry]): Unit = {
this.epochs = epochs.asScala.toSeq
}

override def read(): java.util.List[EpochEntry] = this.epochs.asJava
}
val checkpoint: LeaderEpochCheckpointFile = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1))

val cache = new LeaderEpochFileCache(topicPartition, checkpoint, new MockScheduler(new MockTime()))
seg.append(105L, RecordBatch.NO_TIMESTAMP, 104L, MemoryRecords.withRecords(104L, Compression.NONE, 0,
Expand Down
Loading

0 comments on commit 22be0ba

Please sign in to comment.