Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance #14242

Merged
merged 7 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion clients/src/main/java/org/apache/kafka/common/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,17 @@ public static void flushDirIfExists(Path path) throws IOException {
}
}

/**
* Flushes dirty file with swallowing {@link NoSuchFileException}
*/
public static void flushFileIfExists(Path path) throws IOException {
try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ)) {
fileChannel.force(true);
} catch (NoSuchFileException e) {
log.warn("Failed to flush file {}", path, e);
}
}

/**
* Closes all the provided closeables.
* @throws IOException if any of the close methods throws an IOException.
Expand Down Expand Up @@ -1497,7 +1508,7 @@ public static <S> Iterator<S> covariantCast(Iterator<? extends S> iterator) {
* Checks if a string is null, empty or whitespace only.
* @param str a string to be checked
* @return true if the string is null, empty or whitespace only; otherwise, return false.
*/
*/
public static boolean isBlank(String str) {
return str == null || str.trim().isEmpty();
}
Expand Down
18 changes: 15 additions & 3 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams}

import java.io.{File, IOException}
import java.nio.file.Files
import java.nio.file.{Files, Path}
import java.util
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.{Collections, Optional, OptionalInt, OptionalLong}
Expand Down Expand Up @@ -1617,10 +1617,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// may actually be ahead of the current producer state end offset (which corresponds to the log end offset),
// we manually override the state offset here prior to taking the snapshot.
producerStateManager.updateMapEndOffset(newSegment.baseOffset)
producerStateManager.takeSnapshot()
// We avoid potentially-costly fsync call, since we acquire UnifiedLog#lock here
// which could block subsequent produces in the meantime.
// flush is done in the scheduler thread along with segment flushing below
val maybeSnapshot = producerStateManager.takeSnapshot(false)
updateHighWatermarkWithLogEndOffset()
// Schedule an asynchronous flush of the old segment
scheduler.scheduleOnce("flush-log", () => flushUptoOffsetExclusive(newSegment.baseOffset))
scheduler.scheduleOnce("flush-log", () => {
maybeSnapshot.ifPresent(f => flushProducerStateSnapshot(f.toPath))
flushUptoOffsetExclusive(newSegment.baseOffset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add a test to verify that the recovery point is only advanced after the producer state has been flushed to disk?

})
newSegment
}

Expand Down Expand Up @@ -1703,6 +1709,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
producerStateManager.mapEndOffset
}

private[log] def flushProducerStateSnapshot(snapshot: Path): Unit = {
maybeHandleIOException(s"Error while deleting producer state snapshot $snapshot for $topicPartition in dir ${dir.getParent}") {
Utils.flushFileIfExists(snapshot)
}
}

/**
* Truncate this log so that it ends with the greatest offset < targetOffset.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureCh
def write(offsets: Map[TopicPartition, Long]): Unit = {
val list: java.util.List[(TopicPartition, Long)] = new java.util.ArrayList[(TopicPartition, Long)](offsets.size)
offsets.foreach(x => list.add(x))
checkpoint.write(list)
checkpoint.write(list, true)
}

def read(): Map[TopicPartition, Long] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public class RemoteLogManagerTest {
List<EpochEntry> epochs = Collections.emptyList();

@Override
public void write(Collection<EpochEntry> epochs) {
public void write(Collection<EpochEntry> epochs, boolean ignored) {
this.epochs = new ArrayList<>(epochs);
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ class LogSegmentTest {
val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
private var epochs = Seq.empty[EpochEntry]

override def write(epochs: util.Collection[EpochEntry]): Unit = {
override def write(epochs: util.Collection[EpochEntry], ignored: Boolean): Unit = {
this.epochs = epochs.asScala.toSeq
}

Expand Down
24 changes: 21 additions & 3 deletions core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEn
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.anyLong
import org.mockito.Mockito.{mock, when}
import org.mockito.ArgumentMatchers.{any, anyLong}
import org.mockito.Mockito.{doThrow, mock, spy, when}

import java.io._
import java.nio.ByteBuffer
Expand Down Expand Up @@ -3581,7 +3581,7 @@ class UnifiedLogTest {
val records = TestUtils.singletonRecords(value = s"test$i".getBytes)
log.appendAsLeader(records, leaderEpoch = 0)
}

log.updateHighWatermark(90L)
log.maybeIncrementLogStartOffset(20L, LogStartOffsetIncrementReason.SegmentDeletion)
assertEquals(20, log.logStartOffset)
Expand Down Expand Up @@ -3796,6 +3796,24 @@ class UnifiedLogTest {
log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard)
}

@Test
def testRecoveryPointNotIncrementedOnProducerStateSnapshotFlushFailure(): Unit = {
val logConfig = LogTestUtils.createLogConfig()
val log = spy(createLog(logDir, logConfig))

doThrow(new KafkaStorageException("Injected exception")).when(log).flushProducerStateSnapshot(any())

log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 0)
try {
log.roll(Some(1L))
} catch {
case _: KafkaStorageException => // ignore
}

// check that the recovery point isn't incremented
assertEquals(0L, log.recoveryPoint)
}

private def appendTransactionalToBuffer(buffer: ByteBuffer,
producerId: Long,
producerEpoch: Short,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class OffsetCheckpointFileWithFailureHandlerTest extends Logging {
val logDirFailureChannel = new LogDirFailureChannel(10)
val checkpointFile = new CheckpointFileWithFailureHandler(file, OffsetCheckpointFile.CurrentVersion + 1,
OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent)
checkpointFile.write(Collections.singletonList(new TopicPartition("foo", 5) -> 10L))
checkpointFile.write(Collections.singletonList(new TopicPartition("foo", 5) -> 10L), true)
assertThrows(classOf[KafkaStorageException], () => new OffsetCheckpointFile(checkpointFile.file, logDirFailureChannel).read())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class LeaderEpochFileCacheTest {
val tp = new TopicPartition("TestTopic", 5)
private val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
private var epochs: Seq[EpochEntry] = Seq()
override def write(epochs: java.util.Collection[EpochEntry]): Unit = this.epochs = epochs.asScala.toSeq
override def write(epochs: java.util.Collection[EpochEntry], ignored: Boolean): Unit = this.epochs = epochs.asScala.toSeq
override def read(): java.util.List[EpochEntry] = this.epochs.asJava
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,20 @@ public CheckpointFile(File file,
tempPath = Paths.get(absolutePath + ".tmp");
}

public void write(Collection<T> entries) throws IOException {
public void write(Collection<T> entries, boolean sync) throws IOException {
synchronized (lock) {
// write to temp file and then swap with the existing file
try (FileOutputStream fileOutputStream = new FileOutputStream(tempPath.toFile());
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) {
CheckpointWriteBuffer<T> checkpointWriteBuffer = new CheckpointWriteBuffer<>(writer, version, formatter);
checkpointWriteBuffer.write(entries);
writer.flush();
fileOutputStream.getFD().sync();
if (sync) {
fileOutputStream.getFD().sync();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ocadaruma : I realized a potential issue with this change. The issue is that if sync is false, we don't force a flush to disk. However, the OS could flush partial content of the leader epoch file. If the broker has a hard failure, the leader epoch file could be corrupted. In the recovery path, since we always expect the leader epoch file to be well-formed, a corrupted leader epoch file will fail the recovery.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao Hmm, that's true. Thanks for pointing out.
Created a ticket for this and assigned me. https://issues.apache.org/jira/browse/KAFKA-16541

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @ocadaruma !

}
}

Utils.atomicMoveWithFallback(tempPath, absolutePath);
Utils.atomicMoveWithFallback(tempPath, absolutePath, sync);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Optional<Map.Entry<Integer, Long>> fromString(String line) {
}

public synchronized void writeEntries(Map<Integer, Long> committedOffsets) throws IOException {
checkpointFile.write(committedOffsets.entrySet());
checkpointFile.write(committedOffsets.entrySet(), true);
}

public synchronized Map<Integer, Long> readEntries() throws IOException {
Expand All @@ -83,4 +83,4 @@ public synchronized Map<Integer, Long> readEntries() throws IOException {

return partitionToOffsets;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public CheckpointFileWithFailureHandler(File file, int version, CheckpointFile.E
checkpointFile = new CheckpointFile<>(file, version, formatter);
}

public void write(Collection<T> entries) {
public void write(Collection<T> entries, boolean sync) {
try {
checkpointFile.write(entries);
checkpointFile.write(entries, sync);
} catch (IOException e) {
String msg = "Error while writing to checkpoint file " + file.getAbsolutePath();
logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint {
private List<EpochEntry> epochs = Collections.emptyList();

public void write(Collection<EpochEntry> epochs) {
public void write(Collection<EpochEntry> epochs, boolean ignored) {
this.epochs = new ArrayList<>(epochs);
}

Expand All @@ -60,4 +60,4 @@ public ByteBuffer readAsByteBuffer() throws IOException {

return ByteBuffer.wrap(stream.toByteArray());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@
import java.util.List;

public interface LeaderEpochCheckpoint {
// in file-backed checkpoint implementation, the content should be
// synced to the device if `sync` is true
void write(Collection<EpochEntry> epochs, boolean sync);

void write(Collection<EpochEntry> epochs);
default void write(Collection<EpochEntry> epochs) {
write(epochs, true);
}

List<EpochEntry> read();
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ public LeaderEpochCheckpointFile(File file, LogDirFailureChannel logDirFailureCh
}

public void write(Collection<EpochEntry> epochs) {
checkpoint.write(epochs);
write(epochs, true);
}

public void write(Collection<EpochEntry> epochs, boolean sync) {
checkpoint.write(epochs, sync);
}

public List<EpochEntry> read() {
Expand All @@ -75,4 +79,4 @@ public Optional<EpochEntry> fromString(String line) {
return (strings.length == 2) ? Optional.of(new EpochEntry(Integer.parseInt(strings[0]), Long.parseLong(strings[1]))) : Optional.empty();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void assign(int epoch, long startOffset) {
EpochEntry entry = new EpochEntry(epoch, startOffset);
if (assign(entry)) {
log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size());
flush();
writeToFile(true);
}
}

Expand All @@ -82,7 +82,7 @@ public void assign(List<EpochEntry> entries) {
log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size());
}
});
if (!entries.isEmpty()) flush();
if (!entries.isEmpty()) writeToFile(true);
}

private boolean isUpdateNeeded(EpochEntry entry) {
Expand Down Expand Up @@ -151,11 +151,6 @@ private List<EpochEntry> removeWhileMatching(Iterator<Map.Entry<Integer, EpochEn
return removedEpochs;
}

public LeaderEpochFileCache cloneWithLeaderEpochCheckpoint(LeaderEpochCheckpoint leaderEpochCheckpoint) {
flushTo(leaderEpochCheckpoint);
return new LeaderEpochFileCache(this.topicPartition, leaderEpochCheckpoint);
}

public boolean nonEmpty() {
lock.readLock().lock();
try {
Expand Down Expand Up @@ -308,7 +303,14 @@ public void truncateFromEnd(long endOffset) {
if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) {
List<EpochEntry> removedEntries = removeFromEnd(x -> x.startOffset >= endOffset);

flush();
// We intentionally don't force flushing change to the device here because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called by ReplicaFetcher threads, which could block replica fetching
// then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency.
// - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be handled by
// another truncateFromEnd call on log loading procedure so it won't be a problem
writeToFile(false);

log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size());
}
Expand All @@ -335,7 +337,14 @@ public void truncateFromStart(long startOffset) {
EpochEntry updatedFirstEntry = new EpochEntry(firstBeforeStartOffset.epoch, startOffset);
epochs.put(updatedFirstEntry.epoch, updatedFirstEntry);

flush();
// We intentionally don't force flushing change to the device here because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called as part of deleteRecords with holding UnifiedLog#lock.
// - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust
// - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be recovered by
// another truncateFromStart call on log loading procedure so it won't be a problem
writeToFile(false);

log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size());
}
Expand Down Expand Up @@ -384,7 +393,7 @@ public void clearAndFlush() {
lock.writeLock().lock();
try {
epochs.clear();
flush();
writeToFile(true);
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -421,16 +430,12 @@ public NavigableMap<Integer, Long> epochWithOffsets() {
}
}

private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint) {
private void writeToFile(boolean sync) {
lock.readLock().lock();
try {
leaderEpochCheckpoint.write(epochs.values());
this.checkpoint.write(epochs.values(), sync);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this ?

} finally {
lock.readLock().unlock();
}
}

private void flush() {
flushTo(this.checkpoint);
}
}
Loading