From 9e04790449bff6978640948027a63e6ae600f4e5 Mon Sep 17 00:00:00 2001 From: Okada Haruki Date: Thu, 30 Nov 2023 02:43:44 +0900 Subject: [PATCH] KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance (#14242) While any blocking operation under holding the UnifiedLog.lock could lead to serious performance (even availability) issues, currently there are several paths that calls fsync(2) inside the lock In the meantime the lock is held, all subsequent produces against the partition may block This easily causes all request-handlers to be busy on bad disk performance Even worse, when a disk experiences tens of seconds of glitch (it's not rare in spinning drives), it makes the broker to unable to process any requests with unfenced from the cluster (i.e. "zombie" like status) This PR gets rid of 4 cases of essentially-unnecessary fsync(2) calls performed under the lock: (1) ProducerStateManager.takeSnapshot at UnifiedLog.roll I moved fsync(2) call to the scheduler thread as part of existing "flush-log" job (before incrementing recovery point) Since it's still ensured that the snapshot is flushed before incrementing recovery point, this change shouldn't cause any problem (2) ProducerStateManager.removeAndMarkSnapshotForDeletion as part of log segment deletion This method calls Utils.atomicMoveWithFallback with needFlushParentDir = true internally, which calls fsync. I changed it to call Utils.atomicMoveWithFallback with needFlushParentDir = false (which is consistent behavior with index files deletion. index files deletion also doesn't flush parent dir) This change shouldn't cause problems neither. (3) LeaderEpochFileCache.truncateFromStart when incrementing log-start-offset This path is called from deleteRecords on request-handler threads. Here, we don't need fsync(2) either actually. On unclean shutdown, few leader epochs might be remained in the file but it will be handled by LogLoader on start-up so not a problem (4) LeaderEpochFileCache.truncateFromEnd as part of log truncation Likewise, we don't need fsync(2) here, since any epochs which are untruncated on unclean shutdown will be handled on log loading procedure Reviewers: Luke Chen , Divij Vaidya , Justine Olshan , Jun Rao --- .../org/apache/kafka/common/utils/Utils.java | 13 ++++++- .../src/main/scala/kafka/log/UnifiedLog.scala | 18 +++++++-- .../checkpoints/OffsetCheckpointFile.scala | 2 +- .../log/remote/RemoteLogManagerTest.java | 2 +- .../scala/unit/kafka/log/LogSegmentTest.scala | 2 +- .../scala/unit/kafka/log/UnifiedLogTest.scala | 24 ++++++++++-- ...CheckpointFileWithFailureHandlerTest.scala | 2 +- .../epoch/LeaderEpochFileCacheTest.scala | 2 +- .../kafka/server/common/CheckpointFile.java | 8 ++-- .../storage/CommittedOffsetsFile.java | 4 +- .../CheckpointFileWithFailureHandler.java | 4 +- .../InMemoryLeaderEpochCheckpoint.java | 4 +- .../checkpoint/LeaderEpochCheckpoint.java | 7 +++- .../checkpoint/LeaderEpochCheckpointFile.java | 8 +++- .../internals/epoch/LeaderEpochFileCache.java | 37 +++++++++++-------- .../internals/log/ProducerStateManager.java | 22 ++++++++--- .../storage/internals/log/SnapshotFile.java | 8 ++-- 17 files changed, 118 insertions(+), 49 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 6a0913d3c2da1..6e1c3cefbb1d6 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -1021,6 +1021,17 @@ public static void flushDirIfExists(Path path) throws IOException { } } + /** + * Flushes dirty file with swallowing {@link NoSuchFileException} + */ + public static void flushFileIfExists(Path path) throws IOException { + try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ)) { + fileChannel.force(true); + } catch (NoSuchFileException e) { + log.warn("Failed to flush file {}", path, e); + } + } + /** * Closes all the provided closeables. * @throws IOException if any of the close methods throws an IOException. @@ -1543,7 +1554,7 @@ public static Iterator covariantCast(Iterator iterator) { * Checks if a string is null, empty or whitespace only. * @param str a string to be checked * @return true if the string is null, empty or whitespace only; otherwise, return false. - */ + */ public static boolean isBlank(String str) { return str == null || str.trim().isEmpty(); } diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 0691cff51d0bc..8ca58ad20f0ce 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -44,7 +44,7 @@ import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard} import java.io.{File, IOException} -import java.nio.file.Files +import java.nio.file.{Files, Path} import java.util import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import java.util.stream.Collectors @@ -1656,10 +1656,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, // may actually be ahead of the current producer state end offset (which corresponds to the log end offset), // we manually override the state offset here prior to taking the snapshot. producerStateManager.updateMapEndOffset(newSegment.baseOffset) - producerStateManager.takeSnapshot() + // We avoid potentially-costly fsync call, since we acquire UnifiedLog#lock here + // which could block subsequent produces in the meantime. + // flush is done in the scheduler thread along with segment flushing below + val maybeSnapshot = producerStateManager.takeSnapshot(false) updateHighWatermarkWithLogEndOffset() // Schedule an asynchronous flush of the old segment - scheduler.scheduleOnce("flush-log", () => flushUptoOffsetExclusive(newSegment.baseOffset)) + scheduler.scheduleOnce("flush-log", () => { + maybeSnapshot.ifPresent(f => flushProducerStateSnapshot(f.toPath)) + flushUptoOffsetExclusive(newSegment.baseOffset) + }) newSegment } @@ -1742,6 +1748,12 @@ class UnifiedLog(@volatile var logStartOffset: Long, producerStateManager.mapEndOffset } + private[log] def flushProducerStateSnapshot(snapshot: Path): Unit = { + maybeHandleIOException(s"Error while deleting producer state snapshot $snapshot for $topicPartition in dir ${dir.getParent}") { + Utils.flushFileIfExists(snapshot) + } + } + /** * Truncate this log so that it ends with the greatest offset < targetOffset. * diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala index 084e46c5ef266..de3283d21fd42 100644 --- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala +++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala @@ -68,7 +68,7 @@ class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureCh def write(offsets: Map[TopicPartition, Long]): Unit = { val list: java.util.List[(TopicPartition, Long)] = new java.util.ArrayList[(TopicPartition, Long)](offsets.size) offsets.foreach(x => list.add(x)) - checkpoint.write(list) + checkpoint.write(list, true) } def read(): Map[TopicPartition, Long] = { diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 0e2ac1b8f06ee..de1245f55699d 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -184,7 +184,7 @@ public class RemoteLogManagerTest { List epochs = Collections.emptyList(); @Override - public void write(Collection epochs) { + public void write(Collection epochs, boolean ignored) { this.epochs = new ArrayList<>(epochs); } diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index aacab5b624ef5..11fff517b430d 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -404,7 +404,7 @@ class LogSegmentTest { val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint { private var epochs = Seq.empty[EpochEntry] - override def write(epochs: util.Collection[EpochEntry]): Unit = { + override def write(epochs: util.Collection[EpochEntry], ignored: Boolean): Unit = { this.epochs = epochs.asScala.toSeq } diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 9f9acaf33c70b..1a4ffaa57d05e 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -42,8 +42,8 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.EnumSource import org.mockito.ArgumentMatchers -import org.mockito.ArgumentMatchers.anyLong -import org.mockito.Mockito.{mock, when} +import org.mockito.ArgumentMatchers.{any, anyLong} +import org.mockito.Mockito.{doThrow, mock, spy, when} import java.io._ import java.nio.ByteBuffer @@ -3625,7 +3625,7 @@ class UnifiedLogTest { val records = TestUtils.singletonRecords(value = s"test$i".getBytes) log.appendAsLeader(records, leaderEpoch = 0) } - + log.updateHighWatermark(90L) log.maybeIncrementLogStartOffset(20L, LogStartOffsetIncrementReason.SegmentDeletion) assertEquals(20, log.logStartOffset) @@ -3911,6 +3911,24 @@ class UnifiedLogTest { log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard) } + @Test + def testRecoveryPointNotIncrementedOnProducerStateSnapshotFlushFailure(): Unit = { + val logConfig = LogTestUtils.createLogConfig() + val log = spy(createLog(logDir, logConfig)) + + doThrow(new KafkaStorageException("Injected exception")).when(log).flushProducerStateSnapshot(any()) + + log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 0) + try { + log.roll(Some(1L)) + } catch { + case _: KafkaStorageException => // ignore + } + + // check that the recovery point isn't incremented + assertEquals(0L, log.recoveryPoint) + } + @Test def testDeletableSegmentsFilter(): Unit = { val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024) diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala index ddbf58d884e30..a7e370d7f4091 100644 --- a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala +++ b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala @@ -97,7 +97,7 @@ class OffsetCheckpointFileWithFailureHandlerTest extends Logging { val logDirFailureChannel = new LogDirFailureChannel(10) val checkpointFile = new CheckpointFileWithFailureHandler(file, OffsetCheckpointFile.CurrentVersion + 1, OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent) - checkpointFile.write(Collections.singletonList(new TopicPartition("foo", 5) -> 10L)) + checkpointFile.write(Collections.singletonList(new TopicPartition("foo", 5) -> 10L), true) assertThrows(classOf[KafkaStorageException], () => new OffsetCheckpointFile(checkpointFile.file, logDirFailureChannel).read()) } diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala index a2de160a0ed52..a47c902024a9f 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala @@ -38,7 +38,7 @@ class LeaderEpochFileCacheTest { val tp = new TopicPartition("TestTopic", 5) private val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint { private var epochs: Seq[EpochEntry] = Seq() - override def write(epochs: java.util.Collection[EpochEntry]): Unit = this.epochs = epochs.asScala.toSeq + override def write(epochs: java.util.Collection[EpochEntry], ignored: Boolean): Unit = this.epochs = epochs.asScala.toSeq override def read(): java.util.List[EpochEntry] = this.epochs.asJava } diff --git a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java index 6efbaa136e0e9..9c115881328a0 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java @@ -72,7 +72,7 @@ public CheckpointFile(File file, tempPath = Paths.get(absolutePath + ".tmp"); } - public void write(Collection entries) throws IOException { + public void write(Collection entries, boolean sync) throws IOException { synchronized (lock) { // write to temp file and then swap with the existing file try (FileOutputStream fileOutputStream = new FileOutputStream(tempPath.toFile()); @@ -80,10 +80,12 @@ public void write(Collection entries) throws IOException { CheckpointWriteBuffer checkpointWriteBuffer = new CheckpointWriteBuffer<>(writer, version, formatter); checkpointWriteBuffer.write(entries); writer.flush(); - fileOutputStream.getFD().sync(); + if (sync) { + fileOutputStream.getFD().sync(); + } } - Utils.atomicMoveWithFallback(tempPath, absolutePath); + Utils.atomicMoveWithFallback(tempPath, absolutePath, sync); } } diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java index 1eddc0b788b16..a08e0f30507bf 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java @@ -68,7 +68,7 @@ public Optional> fromString(String line) { } public synchronized void writeEntries(Map committedOffsets) throws IOException { - checkpointFile.write(committedOffsets.entrySet()); + checkpointFile.write(committedOffsets.entrySet(), true); } public synchronized Map readEntries() throws IOException { @@ -83,4 +83,4 @@ public synchronized Map readEntries() throws IOException { return partitionToOffsets; } -} \ No newline at end of file +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java index f780ced9b0477..35abfb5a984d2 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java @@ -41,9 +41,9 @@ public CheckpointFileWithFailureHandler(File file, int version, CheckpointFile.E checkpointFile = new CheckpointFile<>(file, version, formatter); } - public void write(Collection entries) { + public void write(Collection entries, boolean sync) { try { - checkpointFile.write(entries); + checkpointFile.write(entries, sync); } catch (IOException e) { String msg = "Error while writing to checkpoint file " + file.getAbsolutePath(); logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java index 3ef30b2502a8b..386712b330fbd 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java @@ -42,7 +42,7 @@ public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint { private List epochs = Collections.emptyList(); - public void write(Collection epochs) { + public void write(Collection epochs, boolean ignored) { this.epochs = new ArrayList<>(epochs); } @@ -60,4 +60,4 @@ public ByteBuffer readAsByteBuffer() throws IOException { return ByteBuffer.wrap(stream.toByteArray()); } -} \ No newline at end of file +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java index 8cf5519512622..28ffae03df0e1 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java @@ -22,8 +22,13 @@ import java.util.List; public interface LeaderEpochCheckpoint { + // in file-backed checkpoint implementation, the content should be + // synced to the device if `sync` is true + void write(Collection epochs, boolean sync); - void write(Collection epochs); + default void write(Collection epochs) { + write(epochs, true); + } List read(); } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java index 81527c6377a28..3472182aeea1e 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java @@ -53,7 +53,11 @@ public LeaderEpochCheckpointFile(File file, LogDirFailureChannel logDirFailureCh } public void write(Collection epochs) { - checkpoint.write(epochs); + write(epochs, true); + } + + public void write(Collection epochs, boolean sync) { + checkpoint.write(epochs, sync); } public List read() { @@ -75,4 +79,4 @@ public Optional fromString(String line) { return (strings.length == 2) ? Optional.of(new EpochEntry(Integer.parseInt(strings[0]), Long.parseLong(strings[1]))) : Optional.empty(); } } -} \ No newline at end of file +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java index a2c4ae7e0fcc5..03df6cc0dceb9 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java @@ -73,7 +73,7 @@ public void assign(int epoch, long startOffset) { EpochEntry entry = new EpochEntry(epoch, startOffset); if (assign(entry)) { log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); - flush(); + writeToFile(true); } } @@ -83,7 +83,7 @@ public void assign(List entries) { log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); } }); - if (!entries.isEmpty()) flush(); + if (!entries.isEmpty()) writeToFile(true); } private boolean isUpdateNeeded(EpochEntry entry) { @@ -152,11 +152,6 @@ private List removeWhileMatching(Iterator= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) { List removedEntries = removeFromEnd(x -> x.startOffset >= endOffset); - flush(); + // We intentionally don't force flushing change to the device here because: + // - To avoid fsync latency + // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives + // * This method is called by ReplicaFetcher threads, which could block replica fetching + // then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. + // - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be handled by + // another truncateFromEnd call on log loading procedure so it won't be a problem + writeToFile(false); log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size()); } @@ -345,7 +347,14 @@ public void truncateFromStart(long startOffset) { EpochEntry updatedFirstEntry = new EpochEntry(firstBeforeStartOffset.epoch, startOffset); epochs.put(updatedFirstEntry.epoch, updatedFirstEntry); - flush(); + // We intentionally don't force flushing change to the device here because: + // - To avoid fsync latency + // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives + // * This method is called as part of deleteRecords with holding UnifiedLog#lock. + // - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust + // - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be recovered by + // another truncateFromStart call on log loading procedure so it won't be a problem + writeToFile(false); log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size()); } @@ -394,7 +403,7 @@ public void clearAndFlush() { lock.writeLock().lock(); try { epochs.clear(); - flush(); + writeToFile(true); } finally { lock.writeLock().unlock(); } @@ -431,16 +440,12 @@ public NavigableMap epochWithOffsets() { } } - private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint) { + private void writeToFile(boolean sync) { lock.readLock().lock(); try { - leaderEpochCheckpoint.write(epochs.values()); + checkpoint.write(epochs.values(), sync); } finally { lock.readLock().unlock(); } } - - private void flush() { - flushTo(this.checkpoint); - } } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java index d3e48ef7057b1..6bcafd2d60763 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java @@ -462,14 +462,21 @@ public Optional lastEntry(long producerId) { } /** - * Take a snapshot at the current end offset if one does not already exist. + * Take a snapshot at the current end offset if one does not already exist with syncing the change to the device */ public void takeSnapshot() throws IOException { + takeSnapshot(true); + } + + /** + * Take a snapshot at the current end offset if one does not already exist, then return the snapshot file if taken. + */ + public Optional takeSnapshot(boolean sync) throws IOException { // If not a new offset, then it is not worth taking another snapshot if (lastMapOffset > lastSnapOffset) { SnapshotFile snapshotFile = new SnapshotFile(LogFileUtils.producerSnapshotFile(logDir, lastMapOffset)); long start = time.hiResClockMs(); - writeSnapshot(snapshotFile.file(), producers); + writeSnapshot(snapshotFile.file(), producers, sync); log.info("Wrote producer snapshot at offset {} with {} producer ids in {} ms.", lastMapOffset, producers.size(), time.hiResClockMs() - start); @@ -477,7 +484,10 @@ public void takeSnapshot() throws IOException { // Update the last snap offset according to the serialized map lastSnapOffset = lastMapOffset; + + return Optional.of(snapshotFile.file()); } + return Optional.empty(); } /** @@ -635,7 +645,7 @@ public Optional removeAndMarkSnapshotForDeletion(long snapshotOffs // deletion, so ignoring the exception here just means that the intended operation was // already completed. try { - snapshotFile.renameTo(LogFileUtils.DELETED_FILE_SUFFIX); + snapshotFile.renameToDelete(); return Optional.of(snapshotFile); } catch (NoSuchFileException ex) { log.info("Failed to rename producer state snapshot {} with deletion suffix because it was already deleted", snapshotFile.file().getAbsoluteFile()); @@ -684,7 +694,7 @@ public static List readSnapshot(File file) throws IOExceptio } } - private static void writeSnapshot(File file, Map entries) throws IOException { + private static void writeSnapshot(File file, Map entries, boolean sync) throws IOException { Struct struct = new Struct(PID_SNAPSHOT_MAP_SCHEMA); struct.set(VERSION_FIELD, PRODUCER_SNAPSHOT_VERSION); struct.set(CRC_FIELD, 0L); // we'll fill this after writing the entries @@ -716,7 +726,9 @@ private static void writeSnapshot(File file, Map entri try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { fileChannel.write(buffer); - fileChannel.force(true); + if (sync) { + fileChannel.force(true); + } } } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java index be496ab299878..61fdae79950db 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java @@ -60,10 +60,10 @@ public File file() { return file; } - public void renameTo(String newSuffix) throws IOException { - File renamed = new File(Utils.replaceSuffix(file.getPath(), "", newSuffix)); + public void renameToDelete() throws IOException { + File renamed = new File(Utils.replaceSuffix(file.getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)); try { - Utils.atomicMoveWithFallback(file.toPath(), renamed.toPath()); + Utils.atomicMoveWithFallback(file.toPath(), renamed.toPath(), false); } finally { file = renamed; } @@ -76,4 +76,4 @@ public String toString() { ", file=" + file + ')'; } -} \ No newline at end of file +}