Skip to content

Commit

Permalink
RLM needs synchronous flush
Browse files Browse the repository at this point in the history
  • Loading branch information
ocadaruma committed May 25, 2024
1 parent 482c80c commit 4f6332d
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 54 deletions.
4 changes: 2 additions & 2 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,9 @@ InMemoryLeaderEpochCheckpoint getLeaderEpochCheckpoint(UnifiedLog log, long star
if (log.leaderEpochCache().isDefined()) {
LeaderEpochFileCache cache = log.leaderEpochCache().get().writeTo(checkpoint);
if (startOffset >= 0) {
cache.truncateFromStart(startOffset);
cache.truncateFromStart(startOffset, true);
}
cache.truncateFromEnd(endOffset);
cache.truncateFromEnd(endOffset, true);
}
return checkpoint;
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/LogLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,14 @@ class LogLoader(
}
}

leaderEpochCache.ifPresent(_.truncateFromEnd(nextOffset))
leaderEpochCache.ifPresent(_.truncateFromEnd(nextOffset, false))
val newLogStartOffset = if (isRemoteLogEnabled) {
logStartOffsetCheckpoint
} else {
math.max(logStartOffsetCheckpoint, segments.firstSegment.get.baseOffset)
}
// The earliest leader epoch may not be flushed during a hard failure. Recover it here.
leaderEpochCache.ifPresent(_.truncateFromStart(logStartOffsetCheckpoint))
leaderEpochCache.ifPresent(_.truncateFromStart(logStartOffsetCheckpoint, false))

// Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
// from scratch.
Expand Down
30 changes: 27 additions & 3 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
updatedLogStartOffset = true
updateLogStartOffset(newLogStartOffset)
info(s"Incremented log start offset to $newLogStartOffset due to $reason")
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
// We flush the change to the device in the background because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called as part of deleteRecords with holding UnifiedLog#lock.
// - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromStart call on log loading procedure, so it won't be a problem
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset, false))
producerStateManager.onLogStartOffsetIncremented(newLogStartOffset)
maybeIncrementFirstUnstableOffset()
}
Expand Down Expand Up @@ -1807,7 +1815,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// and inserted the first start offset entry, but then failed to append any entries
// before another leader was elected.
lock synchronized {
leaderEpochCache.foreach(_.truncateFromEnd(logEndOffset))
// We flush the change to the device in the background because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called by ReplicaFetcher threads, which could block replica fetching
// then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency.
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromEnd call on log loading procedure, so it won't be a problem
leaderEpochCache.foreach(_.truncateFromEnd(logEndOffset, false))
}

false
Expand All @@ -1820,7 +1836,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
} else {
val deletedSegments = localLog.truncateTo(targetOffset)
deleteProducerSnapshots(deletedSegments, asyncDelete = true)
leaderEpochCache.foreach(_.truncateFromEnd(targetOffset))
// We flush the change to the device in the background because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called by ReplicaFetcher threads, which could block replica fetching
// then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency.
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromEnd call on log loading procedure, so it won't be a problem
leaderEpochCache.foreach(_.truncateFromEnd(targetOffset, false))
logStartOffset = math.min(targetOffset, logStartOffset)
rebuildProducerState(targetOffset, producerStateManager)
if (highWatermark >= localLog.logEndOffset)
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2308,8 +2308,8 @@ private Map<Integer, Long> truncateAndGetLeaderEpochs(List<EpochEntry> entries,
InMemoryLeaderEpochCheckpoint myCheckpoint = new InMemoryLeaderEpochCheckpoint();
myCheckpoint.write(entries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(null, myCheckpoint, scheduler);
cache.truncateFromStart(startOffset);
cache.truncateFromEnd(endOffset);
cache.truncateFromStart(startOffset, true);
cache.truncateFromEnd(endOffset, true);
return myCheckpoint.read().stream().collect(Collectors.toMap(e -> e.epoch, e -> e.startOffset));
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1393,7 +1393,7 @@ class LogLoaderTest {
assertEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new EpochEntry(2, 1), new EpochEntry(3, 3)), leaderEpochCache.epochEntries)

// deliberately remove some of the epoch entries
leaderEpochCache.truncateFromEnd(2)
leaderEpochCache.truncateFromEnd(2, false)
assertNotEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new EpochEntry(2, 1), new EpochEntry(3, 3)), leaderEpochCache.epochEntries)
log.close()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class LeaderEpochFileCacheTest {
cache.assign(10, 20)
assertEquals(OptionalInt.of(4), cache.previousEpoch)

cache.truncateFromEnd(18)
cache.truncateFromEnd(18, false)
assertEquals(OptionalInt.of(2), cache.previousEpoch)
}

Expand Down Expand Up @@ -389,7 +389,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)

//When clear latest on epoch boundary
cache.truncateFromEnd(8)
cache.truncateFromEnd(8, false)

//Then should remove two latest epochs (remove is inclusive)
assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6)), cache.epochEntries)
Expand All @@ -403,7 +403,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)

//When reset to offset ON epoch boundary
cache.truncateFromStart(8)
cache.truncateFromStart(8, false)

//Then should preserve (3, 8)
assertEquals(java.util.Arrays.asList(new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries)
Expand All @@ -417,7 +417,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)

//When reset to offset BETWEEN epoch boundaries
cache.truncateFromStart(9)
cache.truncateFromStart(9, false)

//Then we should retain epoch 3, but update it's offset to 9 as 8 has been removed
assertEquals(java.util.Arrays.asList(new EpochEntry(3, 9), new EpochEntry(4, 11)), cache.epochEntries)
Expand All @@ -431,7 +431,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)

//When reset to offset before first epoch offset
cache.truncateFromStart(1)
cache.truncateFromStart(1, false)

//Then nothing should change
assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6),new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries)
Expand All @@ -445,7 +445,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)

//When reset to offset on earliest epoch boundary
cache.truncateFromStart(6)
cache.truncateFromStart(6, false)

//Then nothing should change
assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6),new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries)
Expand All @@ -459,7 +459,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)

//When
cache.truncateFromStart(11)
cache.truncateFromStart(11, false)

//Then retain the last
assertEquals(Collections.singletonList(new EpochEntry(4, 11)), cache.epochEntries)
Expand All @@ -473,7 +473,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)

//When we clear from a position between offset 8 & offset 11
cache.truncateFromStart(9)
cache.truncateFromStart(9, false)

//Then we should update the middle epoch entry's offset
assertEquals(java.util.Arrays.asList(new EpochEntry(3, 9), new EpochEntry(4, 11)), cache.epochEntries)
Expand All @@ -487,7 +487,7 @@ class LeaderEpochFileCacheTest {
cache.assign(2, 10)

//When we clear from a position between offset 0 & offset 7
cache.truncateFromStart(5)
cache.truncateFromStart(5, false)

//Then we should keep epoch 0 but update the offset appropriately
assertEquals(java.util.Arrays.asList(new EpochEntry(0,5), new EpochEntry(1, 7), new EpochEntry(2, 10)),
Expand All @@ -502,7 +502,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)

//When reset to offset beyond last epoch
cache.truncateFromStart(15)
cache.truncateFromStart(15, false)

//Then update the last
assertEquals(Collections.singletonList(new EpochEntry(4, 15)), cache.epochEntries)
Expand All @@ -516,7 +516,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)

//When reset to offset BETWEEN epoch boundaries
cache.truncateFromEnd( 9)
cache.truncateFromEnd(9, false)

//Then should keep the preceding epochs
assertEquals(OptionalInt.of(3), cache.latestEpoch)
Expand Down Expand Up @@ -545,7 +545,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)

//When reset to offset on epoch boundary
cache.truncateFromStart(UNDEFINED_EPOCH_OFFSET)
cache.truncateFromStart(UNDEFINED_EPOCH_OFFSET, false)

//Then should do nothing
assertEquals(3, cache.epochEntries.size)
Expand All @@ -559,7 +559,7 @@ class LeaderEpochFileCacheTest {
cache.assign(4, 11)

//When reset to offset on epoch boundary
cache.truncateFromEnd(UNDEFINED_EPOCH_OFFSET)
cache.truncateFromEnd(UNDEFINED_EPOCH_OFFSET, false)

//Then should do nothing
assertEquals(3, cache.epochEntries.size)
Expand All @@ -580,13 +580,13 @@ class LeaderEpochFileCacheTest {
@Test
def shouldClearEarliestOnEmptyCache(): Unit = {
//Then
cache.truncateFromStart(7)
cache.truncateFromStart(7, false)
}

@Test
def shouldClearLatestOnEmptyCache(): Unit = {
//Then
cache.truncateFromEnd(7)
cache.truncateFromEnd(7, false)
}

@Test
Expand All @@ -602,7 +602,7 @@ class LeaderEpochFileCacheTest {
cache.assign(10, 20)
assertEquals(OptionalInt.of(4), cache.previousEpoch(10))

cache.truncateFromEnd(18)
cache.truncateFromEnd(18, false)
assertEquals(OptionalInt.of(2), cache.previousEpoch(cache.latestEpoch.getAsInt))
}

Expand All @@ -619,7 +619,7 @@ class LeaderEpochFileCacheTest {
cache.assign(10, 20)
assertEquals(Optional.of(new EpochEntry(4, 15)), cache.previousEntry(10))

cache.truncateFromEnd(18)
cache.truncateFromEnd(18, false)
assertEquals(Optional.of(new EpochEntry(2, 10)), cache.previousEntry(cache.latestEpoch.getAsInt))
}

Expand Down Expand Up @@ -666,8 +666,8 @@ class LeaderEpochFileCacheTest {
cache.assign(3, 8)
cache.assign(4, 11)

cache.truncateFromEnd(11)
cache.truncateFromStart(8)
cache.truncateFromEnd(11, false)
cache.truncateFromStart(8, false)

assertEquals(List(new EpochEntry(3, 8)).asJava, checkpoint.read())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
* Leader Epoch = epoch assigned to each leader by the controller.
* Offset = offset of the first message in each epoch.
* <p>
* Note that {@link #truncateFromStart},{@link #truncateFromEnd} flush the epoch-entry changes to checkpoint asynchronously.
* Note that {@link #truncateFromStart},{@link #truncateFromEnd} may flush the epoch-entry changes to checkpoint asynchronously.
* Hence, it is instantiater's responsibility to ensure restoring the cache to the correct state after instantiating
* this class from checkpoint (which might contain stale epoch entries right after instantiation).
*/
Expand Down Expand Up @@ -335,25 +335,22 @@ public Map.Entry<Integer, Long> endOffsetFor(int requestedEpoch, long logEndOffs

/**
* Removes all epoch entries from the store with start offsets greater than or equal to the passed offset.
* <p>
* Checkpoint-flushing is done asynchronously.
*
* @param endOffset the offset to clear up to
* @param flushSync if true, the method will block until the changes are flushed to file
*/
public void truncateFromEnd(long endOffset) {
public void truncateFromEnd(long endOffset, boolean flushSync) {
lock.writeLock().lock();
try {
Optional<EpochEntry> epochEntry = latestEntry();
if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) {
List<EpochEntry> removedEntries = removeFromEnd(x -> x.startOffset >= endOffset);

// We flush the change to the device in the background because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called by ReplicaFetcher threads, which could block replica fetching
// then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency.
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromEnd call on log loading procedure, so it won't be a problem
scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation);
if (flushSync) {
writeToFileForTruncation();
} else {
scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation);
}

log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size());
}
Expand All @@ -367,12 +364,11 @@ public void truncateFromEnd(long endOffset) {
* be offset, then clears any previous epoch entries.
* <p>
* This method is exclusive: so truncateFromStart(6) will retain an entry at offset 6.
* <p>
* Checkpoint-flushing is done asynchronously.
*
* @param startOffset the offset to clear up to
* @param flushSync if true, the method will block until the changes are flushed to file
*/
public void truncateFromStart(long startOffset) {
public void truncateFromStart(long startOffset, boolean flushSync) {
lock.writeLock().lock();
try {
List<EpochEntry> removedEntries = removeFromStart(entry -> entry.startOffset <= startOffset);
Expand All @@ -382,15 +378,11 @@ public void truncateFromStart(long startOffset) {
EpochEntry updatedFirstEntry = new EpochEntry(firstBeforeStartOffset.epoch, startOffset);
epochs.put(updatedFirstEntry.epoch, updatedFirstEntry);

// We flush the change to the device in the background because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called as part of deleteRecords with holding UnifiedLog#lock.
// - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromStart call on log loading procedure, so it won't be a problem
scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation);
if (flushSync) {
writeToFileForTruncation();
} else {
scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation);
}

log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size());
}
Expand Down

0 comments on commit 4f6332d

Please sign in to comment.