Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance #14242

Merged
merged 7 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion clients/src/main/java/org/apache/kafka/common/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,17 @@ public static void flushDirIfExists(Path path) throws IOException {
}
}

/**
* Flushes dirty file with swallowing {@link NoSuchFileException}
*/
public static void flushFileIfExists(Path path) throws IOException {
try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ)) {
fileChannel.force(true);
} catch (NoSuchFileException e) {
log.warn("Failed to flush file {}", path, e);
}
}

/**
* Closes all the provided closeables.
* @throws IOException if any of the close methods throws an IOException.
Expand Down Expand Up @@ -1497,7 +1508,7 @@ public static <S> Iterator<S> covariantCast(Iterator<? extends S> iterator) {
* Checks if a string is null, empty or whitespace only.
* @param str a string to be checked
* @return true if the string is null, empty or whitespace only; otherwise, return false.
*/
*/
public static boolean isBlank(String str) {
return str == null || str.trim().isEmpty();
}
Expand Down
18 changes: 15 additions & 3 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams}

import java.io.{File, IOException}
import java.nio.file.Files
import java.nio.file.{Files, Path}
import java.util
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.{Collections, Optional, OptionalInt, OptionalLong}
Expand Down Expand Up @@ -1617,10 +1617,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// may actually be ahead of the current producer state end offset (which corresponds to the log end offset),
// we manually override the state offset here prior to taking the snapshot.
producerStateManager.updateMapEndOffset(newSegment.baseOffset)
producerStateManager.takeSnapshot()
// We avoid potentially-costly fsync call, since we acquire UnifiedLog#lock here
// which could block subsequent produces in the meantime.
// flush is done in the scheduler thread along with segment flushing below
val maybeSnapshot = producerStateManager.takeSnapshot(false)
updateHighWatermarkWithLogEndOffset()
// Schedule an asynchronous flush of the old segment
scheduler.scheduleOnce("flush-log", () => flushUptoOffsetExclusive(newSegment.baseOffset))
scheduler.scheduleOnce("flush-log", () => {
maybeSnapshot.ifPresent(f => flushProducerStateSnapshot(f.toPath))
flushUptoOffsetExclusive(newSegment.baseOffset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add a test to verify that the recovery point is only advanced after the producer state has been flushed to disk?

})
newSegment
}

Expand Down Expand Up @@ -1703,6 +1709,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
producerStateManager.mapEndOffset
}

private[log] def flushProducerStateSnapshot(snapshot: Path): Unit = {
maybeHandleIOException(s"Error while deleting producer state snapshot $snapshot for $topicPartition in dir ${dir.getParent}") {
Utils.flushFileIfExists(snapshot)
}
}

/**
* Truncate this log so that it ends with the greatest offset < targetOffset.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureCh
def write(offsets: Map[TopicPartition, Long]): Unit = {
val list: java.util.List[(TopicPartition, Long)] = new java.util.ArrayList[(TopicPartition, Long)](offsets.size)
offsets.foreach(x => list.add(x))
checkpoint.write(list)
checkpoint.write(list, true)
}

def read(): Map[TopicPartition, Long] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public class RemoteLogManagerTest {
List<EpochEntry> epochs = Collections.emptyList();

@Override
public void write(Collection<EpochEntry> epochs) {
public void write(Collection<EpochEntry> epochs, boolean ignored) {
this.epochs = new ArrayList<>(epochs);
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ class LogSegmentTest {
val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
private var epochs = Seq.empty[EpochEntry]

override def write(epochs: util.Collection[EpochEntry]): Unit = {
override def write(epochs: util.Collection[EpochEntry], ignored: Boolean): Unit = {
this.epochs = epochs.asScala.toSeq
}

Expand Down
24 changes: 21 additions & 3 deletions core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEn
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.anyLong
import org.mockito.Mockito.{mock, when}
import org.mockito.ArgumentMatchers.{any, anyLong}
import org.mockito.Mockito.{doThrow, mock, spy, when}

import java.io._
import java.nio.ByteBuffer
Expand Down Expand Up @@ -3581,7 +3581,7 @@ class UnifiedLogTest {
val records = TestUtils.singletonRecords(value = s"test$i".getBytes)
log.appendAsLeader(records, leaderEpoch = 0)
}

log.updateHighWatermark(90L)
log.maybeIncrementLogStartOffset(20L, LogStartOffsetIncrementReason.SegmentDeletion)
assertEquals(20, log.logStartOffset)
Expand Down Expand Up @@ -3796,6 +3796,24 @@ class UnifiedLogTest {
log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard)
}

@Test
def testRecoveryPointNotIncrementedOnProducerStateSnapshotFlushFailure(): Unit = {
val logConfig = LogTestUtils.createLogConfig()
val log = spy(createLog(logDir, logConfig))

doThrow(new KafkaStorageException("Injected exception")).when(log).flushProducerStateSnapshot(any())

log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 0)
try {
log.roll(Some(1L))
} catch {
case _: KafkaStorageException => // ignore
}

// check that the recovery point isn't incremented
assertEquals(0L, log.recoveryPoint)
}

private def appendTransactionalToBuffer(buffer: ByteBuffer,
producerId: Long,
producerEpoch: Short,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class OffsetCheckpointFileWithFailureHandlerTest extends Logging {
val logDirFailureChannel = new LogDirFailureChannel(10)
val checkpointFile = new CheckpointFileWithFailureHandler(file, OffsetCheckpointFile.CurrentVersion + 1,
OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent)
checkpointFile.write(Collections.singletonList(new TopicPartition("foo", 5) -> 10L))
checkpointFile.write(Collections.singletonList(new TopicPartition("foo", 5) -> 10L), true)
assertThrows(classOf[KafkaStorageException], () => new OffsetCheckpointFile(checkpointFile.file, logDirFailureChannel).read())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class LeaderEpochFileCacheTest {
val tp = new TopicPartition("TestTopic", 5)
private val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
private var epochs: Seq[EpochEntry] = Seq()
override def write(epochs: java.util.Collection[EpochEntry]): Unit = this.epochs = epochs.asScala.toSeq
override def write(epochs: java.util.Collection[EpochEntry], ignored: Boolean): Unit = this.epochs = epochs.asScala.toSeq
override def read(): java.util.List[EpochEntry] = this.epochs.asJava
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,20 @@ public CheckpointFile(File file,
tempPath = Paths.get(absolutePath + ".tmp");
}

public void write(Collection<T> entries) throws IOException {
public void write(Collection<T> entries, boolean sync) throws IOException {
synchronized (lock) {
// write to temp file and then swap with the existing file
try (FileOutputStream fileOutputStream = new FileOutputStream(tempPath.toFile());
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) {
CheckpointWriteBuffer<T> checkpointWriteBuffer = new CheckpointWriteBuffer<>(writer, version, formatter);
checkpointWriteBuffer.write(entries);
writer.flush();
fileOutputStream.getFD().sync();
if (sync) {
fileOutputStream.getFD().sync();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ocadaruma : I realized a potential issue with this change. The issue is that if sync is false, we don't force a flush to disk. However, the OS could flush partial content of the leader epoch file. If the broker has a hard failure, the leader epoch file could be corrupted. In the recovery path, since we always expect the leader epoch file to be well-formed, a corrupted leader epoch file will fail the recovery.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao Hmm, that's true. Thanks for pointing out.
Created a ticket for this and assigned me. https://issues.apache.org/jira/browse/KAFKA-16541

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @ocadaruma !

}
}

Utils.atomicMoveWithFallback(tempPath, absolutePath);
Utils.atomicMoveWithFallback(tempPath, absolutePath, sync);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Optional<Map.Entry<Integer, Long>> fromString(String line) {
}

public synchronized void writeEntries(Map<Integer, Long> committedOffsets) throws IOException {
checkpointFile.write(committedOffsets.entrySet());
checkpointFile.write(committedOffsets.entrySet(), true);
}

public synchronized Map<Integer, Long> readEntries() throws IOException {
Expand All @@ -83,4 +83,4 @@ public synchronized Map<Integer, Long> readEntries() throws IOException {

return partitionToOffsets;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public CheckpointFileWithFailureHandler(File file, int version, CheckpointFile.E
checkpointFile = new CheckpointFile<>(file, version, formatter);
}

public void write(Collection<T> entries) {
public void write(Collection<T> entries, boolean sync) {
try {
checkpointFile.write(entries);
checkpointFile.write(entries, sync);
} catch (IOException e) {
String msg = "Error while writing to checkpoint file " + file.getAbsolutePath();
logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint {
private List<EpochEntry> epochs = Collections.emptyList();

public void write(Collection<EpochEntry> epochs) {
public void write(Collection<EpochEntry> epochs, boolean ignored) {
this.epochs = new ArrayList<>(epochs);
}

Expand All @@ -60,4 +60,4 @@ public ByteBuffer readAsByteBuffer() throws IOException {

return ByteBuffer.wrap(stream.toByteArray());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@
import java.util.List;

public interface LeaderEpochCheckpoint {
// in file-backed checkpoint implementation, the content should be
// synced to the device if `sync` is true
void write(Collection<EpochEntry> epochs, boolean sync);

void write(Collection<EpochEntry> epochs);
default void write(Collection<EpochEntry> epochs) {
write(epochs, true);
}

List<EpochEntry> read();
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ public LeaderEpochCheckpointFile(File file, LogDirFailureChannel logDirFailureCh
}

public void write(Collection<EpochEntry> epochs) {
checkpoint.write(epochs);
write(epochs, true);
}

public void write(Collection<EpochEntry> epochs, boolean sync) {
checkpoint.write(epochs, sync);
}

public List<EpochEntry> read() {
Expand All @@ -75,4 +79,4 @@ public Optional<EpochEntry> fromString(String line) {
return (strings.length == 2) ? Optional.of(new EpochEntry(Integer.parseInt(strings[0]), Long.parseLong(strings[1]))) : Optional.empty();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void assign(int epoch, long startOffset) {
EpochEntry entry = new EpochEntry(epoch, startOffset);
if (assign(entry)) {
log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size());
flush();
flush(true);
}
}

Expand All @@ -82,7 +82,7 @@ public void assign(List<EpochEntry> entries) {
log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size());
}
});
if (!entries.isEmpty()) flush();
if (!entries.isEmpty()) flush(true);
}

private boolean isUpdateNeeded(EpochEntry entry) {
Expand Down Expand Up @@ -151,11 +151,6 @@ private List<EpochEntry> removeWhileMatching(Iterator<Map.Entry<Integer, EpochEn
return removedEpochs;
}

public LeaderEpochFileCache cloneWithLeaderEpochCheckpoint(LeaderEpochCheckpoint leaderEpochCheckpoint) {
flushTo(leaderEpochCheckpoint);
return new LeaderEpochFileCache(this.topicPartition, leaderEpochCheckpoint);
}

public boolean nonEmpty() {
lock.readLock().lock();
try {
Expand Down Expand Up @@ -308,7 +303,14 @@ public void truncateFromEnd(long endOffset) {
if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) {
List<EpochEntry> removedEntries = removeFromEnd(x -> x.startOffset >= endOffset);

flush();
// We intentionally don't force flushing change to the device here because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called by ReplicaFetcher threads, which could block replica fetching
// then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency.
// - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be handled by
// another truncateFromEnd call on log loading procedure so it won't be a problem
flush(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kind of weird to call flush with sync = false since the only thing that flush does is to sync. Could we just avoid calling flush?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush(false) still write the epoch entries to the file. (but without fsync)

If we don't call flush here, some entries will remain in the file even after the log truncation.

I'm guessing it wouldn't be a problem realistically at least in the current implementation (since, on log truncation on the follower, it will call LeaderEpochFileCache#assign on log append which anyways flush in-memory epoch entries to the file) though, we should still write to the file here IMO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't call flush here, some entries will remain in the file even after the log truncation.

That's true. But if we write the new content to the file without flushing, it seems that those old entries could still exist in the file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those old entries could still exist in the file

Yeah, precisely, the content on the device (not file) could be still old. As long as we read the file in usual way (i.e. not through O_DIRECT), we can see the latest data.

The staleness on the device arises only when the server experiences power failure before OS flushes the page cache.
In this case, indeed the content could be rolled back to old state.

But it won't be a problem because leader-epoch file will be truncated again to match to the log file upon loading procedure anyways (this is the case mentioned in (3) in PR description)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, it sounds like that you agree that there is little value to call flush without sync. Should we remove the call then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take another look at the code and found that flushing to the file (without fsync) is necessary.

The point here is if there's any code path that reloads the leader-epoch cache from the file.
I found it's possible, so not flushing could be a problem in below scenario

  • (1) AlterReplicaDir is initiated
  • (2) truncation happens on futureLog
    • LeaderEpochFileCache.truncateFromEnd is called, but it isn't flushed to the file
  • (3) future log caught up and the renameDir is called
    • This will reload the leader-epoch cache from the file, which is stale
    • Then wrong leader-epoch may be returned (e.g. for list-offsets request)

So we still should flush to the file even without fsync.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @ocadaruma. Good point! So, we could still write to the file without flushing. The name flush implies that it fsyncs to disks. How about renaming it to sth like writeToFile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good. I'll fix like that


log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size());
}
Expand All @@ -335,7 +337,14 @@ public void truncateFromStart(long startOffset) {
EpochEntry updatedFirstEntry = new EpochEntry(firstBeforeStartOffset.epoch, startOffset);
epochs.put(updatedFirstEntry.epoch, updatedFirstEntry);

flush();
// We intentionally don't force flushing change to the device here because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called as part of deleteRecords with holding UnifiedLog#lock.
// - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust
// - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be recovered by
// another truncateFromStart call on log loading procedure so it won't be a problem
flush(false);

log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size());
}
Expand Down Expand Up @@ -384,7 +393,7 @@ public void clearAndFlush() {
lock.writeLock().lock();
try {
epochs.clear();
flush();
flush(true);
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -421,16 +430,16 @@ public NavigableMap<Integer, Long> epochWithOffsets() {
}
}

private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint) {
private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint, boolean sync) {
lock.readLock().lock();
try {
leaderEpochCheckpoint.write(epochs.values());
leaderEpochCheckpoint.write(epochs.values(), sync);
} finally {
lock.readLock().unlock();
}
}

private void flush() {
flushTo(this.checkpoint);
private void flush(boolean sync) {
flushTo(this.checkpoint, sync);
}
}
Loading