Skip to content

Commit

Permalink
RLM needs synchronous flush
Browse files Browse the repository at this point in the history
  • Loading branch information
ocadaruma committed May 25, 2024
1 parent 482c80c commit 02365a1
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 53 deletions.
4 changes: 2 additions & 2 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,9 @@ InMemoryLeaderEpochCheckpoint getLeaderEpochCheckpoint(UnifiedLog log, long star
if (log.leaderEpochCache().isDefined()) {
LeaderEpochFileCache cache = log.leaderEpochCache().get().writeTo(checkpoint);
if (startOffset >= 0) {
cache.truncateFromStart(startOffset);
cache.truncateFromStart(startOffset, true);
}
cache.truncateFromEnd(endOffset);
cache.truncateFromEnd(endOffset, true);
}
return checkpoint;
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/LogLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,14 @@ class LogLoader(
}
}

leaderEpochCache.ifPresent(_.truncateFromEnd(nextOffset))
leaderEpochCache.ifPresent(_.truncateFromEnd(nextOffset, false))
val newLogStartOffset = if (isRemoteLogEnabled) {
logStartOffsetCheckpoint
} else {
math.max(logStartOffsetCheckpoint, segments.firstSegment.get.baseOffset)
}
// The earliest leader epoch may not be flushed during a hard failure. Recover it here.
leaderEpochCache.ifPresent(_.truncateFromStart(logStartOffsetCheckpoint))
leaderEpochCache.ifPresent(_.truncateFromStart(logStartOffsetCheckpoint, false))

// Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
// from scratch.
Expand Down
30 changes: 27 additions & 3 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
updatedLogStartOffset = true
updateLogStartOffset(newLogStartOffset)
info(s"Incremented log start offset to $newLogStartOffset due to $reason")
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
// We flush the change to the device in the background because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called as part of deleteRecords with holding UnifiedLog#lock.
// - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromStart call on log loading procedure, so it won't be a problem
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset, false))
producerStateManager.onLogStartOffsetIncremented(newLogStartOffset)
maybeIncrementFirstUnstableOffset()
}
Expand Down Expand Up @@ -1807,7 +1815,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// and inserted the first start offset entry, but then failed to append any entries
// before another leader was elected.
lock synchronized {
leaderEpochCache.foreach(_.truncateFromEnd(logEndOffset))
// We flush the change to the device in the background because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called by ReplicaFetcher threads, which could block replica fetching
// then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency.
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromEnd call on log loading procedure, so it won't be a problem
leaderEpochCache.foreach(_.truncateFromEnd(logEndOffset, false))
}

false
Expand All @@ -1820,7 +1836,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
} else {
val deletedSegments = localLog.truncateTo(targetOffset)
deleteProducerSnapshots(deletedSegments, asyncDelete = true)
leaderEpochCache.foreach(_.truncateFromEnd(targetOffset))
// We flush the change to the device in the background because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called by ReplicaFetcher threads, which could block replica fetching
// then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency.
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromEnd call on log loading procedure, so it won't be a problem
leaderEpochCache.foreach(_.truncateFromEnd(targetOffset, false))
logStartOffset = math.min(targetOffset, logStartOffset)
rebuildProducerState(targetOffset, producerStateManager)
if (highWatermark >= localLog.logEndOffset)
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2308,8 +2308,8 @@ private Map<Integer, Long> truncateAndGetLeaderEpochs(List<EpochEntry> entries,
InMemoryLeaderEpochCheckpoint myCheckpoint = new InMemoryLeaderEpochCheckpoint();
myCheckpoint.write(entries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(null, myCheckpoint, scheduler);
cache.truncateFromStart(startOffset);
cache.truncateFromEnd(endOffset);
cache.truncateFromStart(startOffset, true);
cache.truncateFromEnd(endOffset, true);
return myCheckpoint.read().stream().collect(Collectors.toMap(e -> e.epoch, e -> e.startOffset));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class LeaderEpochFileCacheTest {
cache.assign(10, 20)
assertEquals(OptionalInt.of(4), cache.previousEpoch)

cache.truncateFromEnd(18)
cache.truncateFromEnd(18, false)
assertEquals(OptionalInt.of(2), cache.previousEpoch)
}

Expand Down Expand Up @@ -389,7 +389,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)

//When clear latest on epoch boundary
cache.truncateFromEnd(8)
cache.truncateFromEnd(8, false)

//Then should remove two latest epochs (remove is inclusive)
assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6)), cache.epochEntries)
Expand All @@ -403,7 +403,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)

//When reset to offset ON epoch boundary
cache.truncateFromStart(8)
cache.truncateFromStart(8, false)

//Then should preserve (3, 8)
assertEquals(java.util.Arrays.asList(new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries)
Expand All @@ -417,7 +417,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)

//When reset to offset BETWEEN epoch boundaries
cache.truncateFromStart(9)
cache.truncateFromStart(9, false)

//Then we should retain epoch 3, but update it's offset to 9 as 8 has been removed
assertEquals(java.util.Arrays.asList(new EpochEntry(3, 9), new EpochEntry(4, 11)), cache.epochEntries)
Expand All @@ -431,7 +431,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)

//When reset to offset before first epoch offset
cache.truncateFromStart(1)
cache.truncateFromStart(1, false)

//Then nothing should change
assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6),new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries)
Expand All @@ -445,7 +445,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)

//When reset to offset on earliest epoch boundary
cache.truncateFromStart(6)
cache.truncateFromStart(6, false)

//Then nothing should change
assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6),new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries)
Expand All @@ -459,7 +459,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)

//When
cache.truncateFromStart(11)
cache.truncateFromStart(11, false)

//Then retain the last
assertEquals(Collections.singletonList(new EpochEntry(4, 11)), cache.epochEntries)
Expand All @@ -473,7 +473,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)

//When we clear from a position between offset 8 & offset 11
cache.truncateFromStart(9)
cache.truncateFromStart(9, false)

//Then we should update the middle epoch entry's offset
assertEquals(java.util.Arrays.asList(new EpochEntry(3, 9), new EpochEntry(4, 11)), cache.epochEntries)
Expand All @@ -487,7 +487,7 @@ class LeaderEpochFileCacheTest {
cache.assign(2, 10)

//When we clear from a position between offset 0 & offset 7
cache.truncateFromStart(5)
cache.truncateFromStart(5, false)

//Then we should keep epoch 0 but update the offset appropriately
assertEquals(java.util.Arrays.asList(new EpochEntry(0,5), new EpochEntry(1, 7), new EpochEntry(2, 10)),
Expand All @@ -502,7 +502,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)

//When reset to offset beyond last epoch
cache.truncateFromStart(15)
cache.truncateFromStart(15, false)

//Then update the last
assertEquals(Collections.singletonList(new EpochEntry(4, 15)), cache.epochEntries)
Expand All @@ -516,7 +516,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)

//When reset to offset BETWEEN epoch boundaries
cache.truncateFromEnd( 9)
cache.truncateFromEnd(9, false)

//Then should keep the preceding epochs
assertEquals(OptionalInt.of(3), cache.latestEpoch)
Expand Down Expand Up @@ -545,7 +545,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)

//When reset to offset on epoch boundary
cache.truncateFromStart(UNDEFINED_EPOCH_OFFSET)
cache.truncateFromStart(UNDEFINED_EPOCH_OFFSET, false)

//Then should do nothing
assertEquals(3, cache.epochEntries.size)
Expand All @@ -559,7 +559,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)

//When reset to offset on epoch boundary
cache.truncateFromEnd(UNDEFINED_EPOCH_OFFSET)
cache.truncateFromEnd(UNDEFINED_EPOCH_OFFSET, false)

//Then should do nothing
assertEquals(3, cache.epochEntries.size)
Expand All @@ -580,13 +580,13 @@ class LeaderEpochFileCacheTest {
@Test
def shouldClearEarliestOnEmptyCache(): Unit = {
//Then
cache.truncateFromStart(7)
cache.truncateFromStart(7, false)
}

@Test
def shouldClearLatestOnEmptyCache(): Unit = {
//Then
cache.truncateFromEnd(7)
cache.truncateFromEnd(7, false)
}

@Test
Expand All @@ -602,7 +602,7 @@ class LeaderEpochFileCacheTest {
cache.assign(10, 20)
assertEquals(OptionalInt.of(4), cache.previousEpoch(10))

cache.truncateFromEnd(18)
cache.truncateFromEnd(18, false)
assertEquals(OptionalInt.of(2), cache.previousEpoch(cache.latestEpoch.getAsInt))
}

Expand All @@ -619,7 +619,7 @@ class LeaderEpochFileCacheTest {
cache.assign(10, 20)
assertEquals(Optional.of(new EpochEntry(4, 15)), cache.previousEntry(10))

cache.truncateFromEnd(18)
cache.truncateFromEnd(18, false)
assertEquals(Optional.of(new EpochEntry(2, 10)), cache.previousEntry(cache.latestEpoch.getAsInt))
}

Expand Down Expand Up @@ -666,8 +666,8 @@ class LeaderEpochFileCacheTest {
cache.assign(3, 8)
cache.assign(4, 11)

cache.truncateFromEnd(11)
cache.truncateFromStart(8)
cache.truncateFromEnd(11, false)
cache.truncateFromStart(8, false)

assertEquals(List(new EpochEntry(3, 8)).asJava, checkpoint.read())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
* Leader Epoch = epoch assigned to each leader by the controller.
* Offset = offset of the first message in each epoch.
* <p>
* Note that {@link #truncateFromStart},{@link #truncateFromEnd} flush the epoch-entry changes to checkpoint asynchronously.
* Note that {@link #truncateFromStart},{@link #truncateFromEnd} may flush the epoch-entry changes to checkpoint asynchronously.
* Hence, it is instantiater's responsibility to ensure restoring the cache to the correct state after instantiating
* this class from checkpoint (which might contain stale epoch entries right after instantiation).
*/
Expand Down Expand Up @@ -335,25 +335,22 @@ public Map.Entry<Integer, Long> endOffsetFor(int requestedEpoch, long logEndOffs

/**
* Removes all epoch entries from the store with start offsets greater than or equal to the passed offset.
* <p>
* Checkpoint-flushing is done asynchronously.
*
* @param endOffset the offset to clear up to
* @param flushSync if true, the method will block until the changes are flushed to file
*/
public void truncateFromEnd(long endOffset) {
public void truncateFromEnd(long endOffset, boolean flushSync) {
lock.writeLock().lock();
try {
Optional<EpochEntry> epochEntry = latestEntry();
if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) {
List<EpochEntry> removedEntries = removeFromEnd(x -> x.startOffset >= endOffset);

// We flush the change to the device in the background because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called by ReplicaFetcher threads, which could block replica fetching
// then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency.
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromEnd call on log loading procedure, so it won't be a problem
scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation);
if (flushSync) {
writeToFileForTruncation();
} else {
scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation);
}

log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size());
}
Expand All @@ -367,12 +364,11 @@ public void truncateFromEnd(long endOffset) {
* be offset, then clears any previous epoch entries.
* <p>
* This method is exclusive: so truncateFromStart(6) will retain an entry at offset 6.
* <p>
* Checkpoint-flushing is done asynchronously.
*
* @param startOffset the offset to clear up to
* @param flushSync if true, the method will block until the changes are flushed to file
*/
public void truncateFromStart(long startOffset) {
public void truncateFromStart(long startOffset, boolean flushSync) {
lock.writeLock().lock();
try {
List<EpochEntry> removedEntries = removeFromStart(entry -> entry.startOffset <= startOffset);
Expand All @@ -382,15 +378,11 @@ public void truncateFromStart(long startOffset) {
EpochEntry updatedFirstEntry = new EpochEntry(firstBeforeStartOffset.epoch, startOffset);
epochs.put(updatedFirstEntry.epoch, updatedFirstEntry);

// We flush the change to the device in the background because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called as part of deleteRecords with holding UnifiedLog#lock.
// - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromStart call on log loading procedure, so it won't be a problem
scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation);
if (flushSync) {
writeToFileForTruncation();
} else {
scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation);
}

log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size());
}
Expand Down

0 comments on commit 02365a1

Please sign in to comment.