Skip to content

Commit

Permalink
KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to …
Browse files Browse the repository at this point in the history
…stabilize performance (apache#14242)

While any blocking operation under holding the UnifiedLog.lock could lead to serious performance (even availability) issues, currently there are several paths that calls fsync(2) inside the lock
In the meantime the lock is held, all subsequent produces against the partition may block
This easily causes all request-handlers to be busy on bad disk performance
Even worse, when a disk experiences tens of seconds of glitch (it's not rare in spinning drives), it makes the broker to unable to process any requests with unfenced from the cluster (i.e. "zombie" like status)
This PR gets rid of 4 cases of essentially-unnecessary fsync(2) calls performed under the lock:
(1) ProducerStateManager.takeSnapshot at UnifiedLog.roll
I moved fsync(2) call to the scheduler thread as part of existing "flush-log" job (before incrementing recovery point)
Since it's still ensured that the snapshot is flushed before incrementing recovery point, this change shouldn't cause any problem
(2) ProducerStateManager.removeAndMarkSnapshotForDeletion as part of log segment deletion
This method calls Utils.atomicMoveWithFallback with needFlushParentDir = true internally, which calls fsync.
I changed it to call Utils.atomicMoveWithFallback with needFlushParentDir = false (which is consistent behavior with index files deletion. index files deletion also doesn't flush parent dir)
This change shouldn't cause problems neither.
(3) LeaderEpochFileCache.truncateFromStart when incrementing log-start-offset
This path is called from deleteRecords on request-handler threads.
Here, we don't need fsync(2) either actually.
On unclean shutdown, few leader epochs might be remained in the file but it will be handled by LogLoader on start-up so not a problem
(4) LeaderEpochFileCache.truncateFromEnd as part of log truncation
Likewise, we don't need fsync(2) here, since any epochs which are untruncated on unclean shutdown will be handled on log loading procedure

Reviewers: Luke Chen <[email protected]>, Divij Vaidya <[email protected]>, Justine Olshan <[email protected]>, Jun Rao <[email protected]>
  • Loading branch information
ocadaruma authored and clolov committed Apr 5, 2024
1 parent 7d9bb1a commit 9e04790
Show file tree
Hide file tree
Showing 17 changed files with 118 additions and 49 deletions.
13 changes: 12 additions & 1 deletion clients/src/main/java/org/apache/kafka/common/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,17 @@ public static void flushDirIfExists(Path path) throws IOException {
}
}

/**
* Flushes dirty file with swallowing {@link NoSuchFileException}
*/
public static void flushFileIfExists(Path path) throws IOException {
try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ)) {
fileChannel.force(true);
} catch (NoSuchFileException e) {
log.warn("Failed to flush file {}", path, e);
}
}

/**
* Closes all the provided closeables.
* @throws IOException if any of the close methods throws an IOException.
Expand Down Expand Up @@ -1543,7 +1554,7 @@ public static <S> Iterator<S> covariantCast(Iterator<? extends S> iterator) {
* Checks if a string is null, empty or whitespace only.
* @param str a string to be checked
* @return true if the string is null, empty or whitespace only; otherwise, return false.
*/
*/
public static boolean isBlank(String str) {
return str == null || str.trim().isEmpty();
}
Expand Down
18 changes: 15 additions & 3 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard}

import java.io.{File, IOException}
import java.nio.file.Files
import java.nio.file.{Files, Path}
import java.util
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.stream.Collectors
Expand Down Expand Up @@ -1656,10 +1656,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// may actually be ahead of the current producer state end offset (which corresponds to the log end offset),
// we manually override the state offset here prior to taking the snapshot.
producerStateManager.updateMapEndOffset(newSegment.baseOffset)
producerStateManager.takeSnapshot()
// We avoid potentially-costly fsync call, since we acquire UnifiedLog#lock here
// which could block subsequent produces in the meantime.
// flush is done in the scheduler thread along with segment flushing below
val maybeSnapshot = producerStateManager.takeSnapshot(false)
updateHighWatermarkWithLogEndOffset()
// Schedule an asynchronous flush of the old segment
scheduler.scheduleOnce("flush-log", () => flushUptoOffsetExclusive(newSegment.baseOffset))
scheduler.scheduleOnce("flush-log", () => {
maybeSnapshot.ifPresent(f => flushProducerStateSnapshot(f.toPath))
flushUptoOffsetExclusive(newSegment.baseOffset)
})
newSegment
}

Expand Down Expand Up @@ -1742,6 +1748,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
producerStateManager.mapEndOffset
}

private[log] def flushProducerStateSnapshot(snapshot: Path): Unit = {
maybeHandleIOException(s"Error while deleting producer state snapshot $snapshot for $topicPartition in dir ${dir.getParent}") {
Utils.flushFileIfExists(snapshot)
}
}

/**
* Truncate this log so that it ends with the greatest offset < targetOffset.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureCh
def write(offsets: Map[TopicPartition, Long]): Unit = {
val list: java.util.List[(TopicPartition, Long)] = new java.util.ArrayList[(TopicPartition, Long)](offsets.size)
offsets.foreach(x => list.add(x))
checkpoint.write(list)
checkpoint.write(list, true)
}

def read(): Map[TopicPartition, Long] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public class RemoteLogManagerTest {
List<EpochEntry> epochs = Collections.emptyList();

@Override
public void write(Collection<EpochEntry> epochs) {
public void write(Collection<EpochEntry> epochs, boolean ignored) {
this.epochs = new ArrayList<>(epochs);
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ class LogSegmentTest {
val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
private var epochs = Seq.empty[EpochEntry]

override def write(epochs: util.Collection[EpochEntry]): Unit = {
override def write(epochs: util.Collection[EpochEntry], ignored: Boolean): Unit = {
this.epochs = epochs.asScala.toSeq
}

Expand Down
24 changes: 21 additions & 3 deletions core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.anyLong
import org.mockito.Mockito.{mock, when}
import org.mockito.ArgumentMatchers.{any, anyLong}
import org.mockito.Mockito.{doThrow, mock, spy, when}

import java.io._
import java.nio.ByteBuffer
Expand Down Expand Up @@ -3625,7 +3625,7 @@ class UnifiedLogTest {
val records = TestUtils.singletonRecords(value = s"test$i".getBytes)
log.appendAsLeader(records, leaderEpoch = 0)
}

log.updateHighWatermark(90L)
log.maybeIncrementLogStartOffset(20L, LogStartOffsetIncrementReason.SegmentDeletion)
assertEquals(20, log.logStartOffset)
Expand Down Expand Up @@ -3911,6 +3911,24 @@ class UnifiedLogTest {
log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard)
}

@Test
def testRecoveryPointNotIncrementedOnProducerStateSnapshotFlushFailure(): Unit = {
val logConfig = LogTestUtils.createLogConfig()
val log = spy(createLog(logDir, logConfig))

doThrow(new KafkaStorageException("Injected exception")).when(log).flushProducerStateSnapshot(any())

log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 0)
try {
log.roll(Some(1L))
} catch {
case _: KafkaStorageException => // ignore
}

// check that the recovery point isn't incremented
assertEquals(0L, log.recoveryPoint)
}

@Test
def testDeletableSegmentsFilter(): Unit = {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class OffsetCheckpointFileWithFailureHandlerTest extends Logging {
val logDirFailureChannel = new LogDirFailureChannel(10)
val checkpointFile = new CheckpointFileWithFailureHandler(file, OffsetCheckpointFile.CurrentVersion + 1,
OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent)
checkpointFile.write(Collections.singletonList(new TopicPartition("foo", 5) -> 10L))
checkpointFile.write(Collections.singletonList(new TopicPartition("foo", 5) -> 10L), true)
assertThrows(classOf[KafkaStorageException], () => new OffsetCheckpointFile(checkpointFile.file, logDirFailureChannel).read())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class LeaderEpochFileCacheTest {
val tp = new TopicPartition("TestTopic", 5)
private val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
private var epochs: Seq[EpochEntry] = Seq()
override def write(epochs: java.util.Collection[EpochEntry]): Unit = this.epochs = epochs.asScala.toSeq
override def write(epochs: java.util.Collection[EpochEntry], ignored: Boolean): Unit = this.epochs = epochs.asScala.toSeq
override def read(): java.util.List[EpochEntry] = this.epochs.asJava
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,20 @@ public CheckpointFile(File file,
tempPath = Paths.get(absolutePath + ".tmp");
}

public void write(Collection<T> entries) throws IOException {
public void write(Collection<T> entries, boolean sync) throws IOException {
synchronized (lock) {
// write to temp file and then swap with the existing file
try (FileOutputStream fileOutputStream = new FileOutputStream(tempPath.toFile());
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) {
CheckpointWriteBuffer<T> checkpointWriteBuffer = new CheckpointWriteBuffer<>(writer, version, formatter);
checkpointWriteBuffer.write(entries);
writer.flush();
fileOutputStream.getFD().sync();
if (sync) {
fileOutputStream.getFD().sync();
}
}

Utils.atomicMoveWithFallback(tempPath, absolutePath);
Utils.atomicMoveWithFallback(tempPath, absolutePath, sync);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Optional<Map.Entry<Integer, Long>> fromString(String line) {
}

public synchronized void writeEntries(Map<Integer, Long> committedOffsets) throws IOException {
checkpointFile.write(committedOffsets.entrySet());
checkpointFile.write(committedOffsets.entrySet(), true);
}

public synchronized Map<Integer, Long> readEntries() throws IOException {
Expand All @@ -83,4 +83,4 @@ public synchronized Map<Integer, Long> readEntries() throws IOException {

return partitionToOffsets;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public CheckpointFileWithFailureHandler(File file, int version, CheckpointFile.E
checkpointFile = new CheckpointFile<>(file, version, formatter);
}

public void write(Collection<T> entries) {
public void write(Collection<T> entries, boolean sync) {
try {
checkpointFile.write(entries);
checkpointFile.write(entries, sync);
} catch (IOException e) {
String msg = "Error while writing to checkpoint file " + file.getAbsolutePath();
logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint {
private List<EpochEntry> epochs = Collections.emptyList();

public void write(Collection<EpochEntry> epochs) {
public void write(Collection<EpochEntry> epochs, boolean ignored) {
this.epochs = new ArrayList<>(epochs);
}

Expand All @@ -60,4 +60,4 @@ public ByteBuffer readAsByteBuffer() throws IOException {

return ByteBuffer.wrap(stream.toByteArray());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@
import java.util.List;

public interface LeaderEpochCheckpoint {
// in file-backed checkpoint implementation, the content should be
// synced to the device if `sync` is true
void write(Collection<EpochEntry> epochs, boolean sync);

void write(Collection<EpochEntry> epochs);
default void write(Collection<EpochEntry> epochs) {
write(epochs, true);
}

List<EpochEntry> read();
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ public LeaderEpochCheckpointFile(File file, LogDirFailureChannel logDirFailureCh
}

public void write(Collection<EpochEntry> epochs) {
checkpoint.write(epochs);
write(epochs, true);
}

public void write(Collection<EpochEntry> epochs, boolean sync) {
checkpoint.write(epochs, sync);
}

public List<EpochEntry> read() {
Expand All @@ -75,4 +79,4 @@ public Optional<EpochEntry> fromString(String line) {
return (strings.length == 2) ? Optional.of(new EpochEntry(Integer.parseInt(strings[0]), Long.parseLong(strings[1]))) : Optional.empty();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void assign(int epoch, long startOffset) {
EpochEntry entry = new EpochEntry(epoch, startOffset);
if (assign(entry)) {
log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size());
flush();
writeToFile(true);
}
}

Expand All @@ -83,7 +83,7 @@ public void assign(List<EpochEntry> entries) {
log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size());
}
});
if (!entries.isEmpty()) flush();
if (!entries.isEmpty()) writeToFile(true);
}

private boolean isUpdateNeeded(EpochEntry entry) {
Expand Down Expand Up @@ -152,11 +152,6 @@ private List<EpochEntry> removeWhileMatching(Iterator<Map.Entry<Integer, EpochEn
return removedEpochs;
}

public LeaderEpochFileCache cloneWithLeaderEpochCheckpoint(LeaderEpochCheckpoint leaderEpochCheckpoint) {
flushTo(leaderEpochCheckpoint);
return new LeaderEpochFileCache(this.topicPartition, leaderEpochCheckpoint);
}

public boolean nonEmpty() {
lock.readLock().lock();
try {
Expand Down Expand Up @@ -318,7 +313,14 @@ public void truncateFromEnd(long endOffset) {
if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) {
List<EpochEntry> removedEntries = removeFromEnd(x -> x.startOffset >= endOffset);

flush();
// We intentionally don't force flushing change to the device here because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called by ReplicaFetcher threads, which could block replica fetching
// then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency.
// - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be handled by
// another truncateFromEnd call on log loading procedure so it won't be a problem
writeToFile(false);

log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size());
}
Expand All @@ -345,7 +347,14 @@ public void truncateFromStart(long startOffset) {
EpochEntry updatedFirstEntry = new EpochEntry(firstBeforeStartOffset.epoch, startOffset);
epochs.put(updatedFirstEntry.epoch, updatedFirstEntry);

flush();
// We intentionally don't force flushing change to the device here because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called as part of deleteRecords with holding UnifiedLog#lock.
// - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust
// - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be recovered by
// another truncateFromStart call on log loading procedure so it won't be a problem
writeToFile(false);

log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size());
}
Expand Down Expand Up @@ -394,7 +403,7 @@ public void clearAndFlush() {
lock.writeLock().lock();
try {
epochs.clear();
flush();
writeToFile(true);
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -431,16 +440,12 @@ public NavigableMap<Integer, Long> epochWithOffsets() {
}
}

private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint) {
private void writeToFile(boolean sync) {
lock.readLock().lock();
try {
leaderEpochCheckpoint.write(epochs.values());
checkpoint.write(epochs.values(), sync);
} finally {
lock.readLock().unlock();
}
}

private void flush() {
flushTo(this.checkpoint);
}
}
Loading

0 comments on commit 9e04790

Please sign in to comment.