-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance #14242
Conversation
Could you take a look? @showuon |
…stabilize performance
@ocadaruma , thanks for the improvement! Some high level questions:
Thank you. |
@showuon
Yes. However, precisely, removing fsync on
|
Hey @ocadaruma I would like to discuss the changes individually. Let's start with (1) The side-effect of moving producer snapshot flush to async thread is that it while earlier it was guaranteed that producer snapshot is present and consistent when segment (and others) are flushed. If for some reason, the producer fsync failed, we would not have scheduled a flush for segment and friends. But now, since we are flushing snapshot async & quietly, it might be possible that we have segment and indexes on disk but we don't have a producer snapshot. This is ok for server restart, because on restart, we will rebuild the snapshot by scanning last few segments. To summarize, Kafka does not expect producer snapshot on disk to be strongly consistent with rest of files such as log segment and transaction index. @jolshan (as an expert on trx index and producer snapshot) do you agree with this statement? If we agree on this, then (1) is a safe change IMO. |
@divijvaidya Thank you for your review.
During reading your comment, I realized that "quietly" could be a problem so we might need to change producer-state flushing to throw IOException in case of failure. (still "async" isn't a problem though) If we ignore producer-state-flush failure here, recovery-point might be incremented even with stale on-disk producer state snapshot. So, in case of restart after power failure, the broker might restore stale producer state without rebuilding (since recovery point is incremented) which could cause idempotency issues. I'll update the PR after Justine's comment |
Great point. May I suggest that we document the consistency expectations of producer snapshot with segment on the disk. From what you mentioned, it sounds like "Kafka expects producer snapshot to be strongly consistent with the segment data on disk before the recovery checkpoint but doesn't expect after the checkpoint. The inconsistency after the checkpoint is acceptable because....blah blah" We verify that expectations with experts such as Justine and Jun. Based on that we can make a decision of quietly vs. async etc. The documentation will also help future contributions reason about code base. Initially, you can put the documentation in the description of this PR itself and later we can find a home for it in Kafka website docs. We need to do the same exercise for other files that you are changing in this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ocadaruma : Thanks for the PR. Left a few comments.
@divijvaidya : Yes, your understanding on (1) is correct.
updateHighWatermarkWithLogEndOffset() | ||
// Schedule an asynchronous flush of the old segment | ||
scheduler.scheduleOnce("flush-log", () => flushUptoOffsetExclusive(newSegment.baseOffset)) | ||
scheduler.scheduleOnce("flush-log", () => { | ||
maybeSnapshot.ifPresent(f => Utils.flushFileQuietly(f.toPath, "producer-snapshot")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we fail to flush the snapshot, it seems that we should propagate the IOException to logDirFailureChannel like in flushUptoOffsetExclusive. Otherwise, we could be skipping the recovery of producer state when we should.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I also noticed that. I'll fix
// then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. | ||
// - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be handled by | ||
// another truncateFromEnd call on log loading procedure so it won't be a problem | ||
flush(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's kind of weird to call flush
with sync = false since the only thing that flush
does is to sync. Could we just avoid calling flush
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flush(false)
still write the epoch entries to the file. (but without fsync)
If we don't call flush here, some entries will remain in the file even after the log truncation.
I'm guessing it wouldn't be a problem realistically at least in the current implementation (since, on log truncation on the follower, it will call LeaderEpochFileCache#assign
on log append which anyways flush in-memory epoch entries to the file) though, we should still write to the file here IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't call flush here, some entries will remain in the file even after the log truncation.
That's true. But if we write the new content to the file without flushing, it seems that those old entries could still exist in the file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
those old entries could still exist in the file
Yeah, precisely, the content on the device (not file) could be still old. As long as we read the file in usual way (i.e. not through O_DIRECT), we can see the latest data.
The staleness on the device arises only when the server experiences power failure before OS flushes the page cache.
In this case, indeed the content could be rolled back to old state.
But it won't be a problem because leader-epoch file will be truncated again to match to the log file upon loading procedure anyways (this is the case mentioned in (3) in PR description)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, it sounds like that you agree that there is little value to call flush without sync. Should we remove the call then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I take another look at the code and found that flushing to the file (without fsync) is necessary.
The point here is if there's any code path that reloads the leader-epoch cache from the file.
I found it's possible, so not flushing could be a problem in below scenario
- (1) AlterReplicaDir is initiated
- (2) truncation happens on futureLog
- LeaderEpochFileCache.truncateFromEnd is called, but it isn't flushed to the file
- (3) future log caught up and the renameDir is called
- This will reload the leader-epoch cache from the file, which is stale
- Then wrong leader-epoch may be returned (e.g. for list-offsets request)
So we still should flush to the file even without fsync.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @ocadaruma. Good point! So, we could still write to the file without flushing. The name flush
implies that it fsyncs to disks. How about renaming it to sth like writeToFile
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds good. I'll fix like that
@@ -152,7 +152,7 @@ private List<EpochEntry> removeWhileMatching(Iterator<Map.Entry<Integer, EpochEn | |||
} | |||
|
|||
public LeaderEpochFileCache cloneWithLeaderEpochCheckpoint(LeaderEpochCheckpoint leaderEpochCheckpoint) { | |||
flushTo(leaderEpochCheckpoint); | |||
flushTo(leaderEpochCheckpoint, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cloneWithLeaderEpochCheckpoint
seems no longer used. Could we just remove it?
/** | ||
* Take a snapshot at the current end offset if one does not already exist, then return the snapshot file if taken. | ||
*/ | ||
public Optional<File> takeSnapshot(boolean sync) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ProducerStateManager.truncateFullyAndReloadSnapshots
removes all snapshot files and then calls loadSnapshots()
, which should return empty. I am wondering what happens if we have an pending async snapshot flush and the flush is called after the underlying file is deleted because of ProducerStateManager.truncateFullyAndReloadSnapshots
. Will that cause the file to be recreated or will it get an IOException
? The former will be bad since the content won't be correct. For the latter, it would be useful to distinguish that from a real disk IO error to avoid unnecessarily crash the broker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, that's a good point. I overlooked snapshot files would be cleaned up upon receiving OffsetMovedToRemoteStorage
.
In this case, if async flush is performed against non-existent file, it would throw IOException so we should catch it and ignore if it's NoSuchFileException.
(Since file creation is still done in original thread so shouldn't conflict with truncateFullyAndReloadSnapshots
. Only fsync is moved to async thread)
I'll fix that
Thanks -- just catching up with the discussion. Just to clarify when we say:
In the restart case, we may take a slight performance hit on startup since we may have to scan more segments.
|
@divijvaidya Hi, thanks for your suggestion. I updated the PR description to include consistency expectations and the analysis of the validity of the changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ocadaruma : Thanks for the updated PR. A few more comments.
updateHighWatermarkWithLogEndOffset() | ||
// Schedule an asynchronous flush of the old segment | ||
scheduler.scheduleOnce("flush-log", () => flushUptoOffsetExclusive(newSegment.baseOffset)) | ||
scheduler.scheduleOnce("flush-log", () => { | ||
maybeSnapshot.ifPresent(f => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we get rid of {
?
@@ -63,7 +63,7 @@ public File file() { | |||
public void renameTo(String newSuffix) throws IOException { | |||
File renamed = new File(Utils.replaceSuffix(file.getPath(), "", newSuffix)); | |||
try { | |||
Utils.atomicMoveWithFallback(file.toPath(), renamed.toPath()); | |||
Utils.atomicMoveWithFallback(file.toPath(), renamed.toPath(), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works since it's ok to lose a file to be deleted. Perhaps it's better to rename the method to sth like renameToDelete
so that it's clear that this is not a generic method for arbitrary renaming.
// then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. | ||
// - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be handled by | ||
// another truncateFromEnd call on log loading procedure so it won't be a problem | ||
flush(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, it sounds like that you agree that there is little value to call flush without sync. Should we remove the call then?
Utils.flushFileIfExists(f.toPath) | ||
} | ||
}) | ||
flushUptoOffsetExclusive(newSegment.baseOffset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to add a test to verify that the recovery point is only advanced after the producer state has been flushed to disk?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ocadaruma : Thanks for the updated PR. Just a couple of minor comments.
@@ -60,10 +60,10 @@ public File file() { | |||
return file; | |||
} | |||
|
|||
public void renameTo(String newSuffix) throws IOException { | |||
public void renameToDelete(String newSuffix) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we remove newSuffix
since it's always DELETED_FILE_SUFFIX
?
// then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. | ||
// - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be handled by | ||
// another truncateFromEnd call on log loading procedure so it won't be a problem | ||
flush(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @ocadaruma. Good point! So, we could still write to the file without flushing. The name flush
implies that it fsyncs to disks. How about renaming it to sth like writeToFile
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ocadaruma : Thanks for the updated PR. Just a minor comment.
lock.readLock().lock(); | ||
try { | ||
leaderEpochCheckpoint.write(epochs.values()); | ||
this.checkpoint.write(epochs.values(), sync); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ocadaruma : Thanks for the updated PR. The code LGTM. One of the build failed. You could trigger a rebuild by closing the PR, waiting for 30 secs and reopening it.
closing once to rebuild |
|
@ocadaruma : Thanks for rerunning the tests. The latest run still has 21 test failures. Are they related to the PR? |
@junrao Oh I misinterpreted as all green with only checking pipeline-view but I had to check tests view. I checked. Seems none of them are related to this change, and failures are due to the flakiness because all failed tests still succeeded on at least some JDK build. |
@ocadaruma : Thanks for looking into the failed tests. If those are unrelated to this PR, it would be useful to file jiras for flaky tests not already tracked. Also, could you resolve the conflict? |
@junrao Thank you for the suggestion. Also created tickets for flaky tests which don't have corresponding JIRA ticket now.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ocadaruma : Thanks for triaging the failed tests. LGTM
…stabilize performance (apache#14242) While any blocking operation under holding the UnifiedLog.lock could lead to serious performance (even availability) issues, currently there are several paths that calls fsync(2) inside the lock In the meantime the lock is held, all subsequent produces against the partition may block This easily causes all request-handlers to be busy on bad disk performance Even worse, when a disk experiences tens of seconds of glitch (it's not rare in spinning drives), it makes the broker to unable to process any requests with unfenced from the cluster (i.e. "zombie" like status) This PR gets rid of 4 cases of essentially-unnecessary fsync(2) calls performed under the lock: (1) ProducerStateManager.takeSnapshot at UnifiedLog.roll I moved fsync(2) call to the scheduler thread as part of existing "flush-log" job (before incrementing recovery point) Since it's still ensured that the snapshot is flushed before incrementing recovery point, this change shouldn't cause any problem (2) ProducerStateManager.removeAndMarkSnapshotForDeletion as part of log segment deletion This method calls Utils.atomicMoveWithFallback with needFlushParentDir = true internally, which calls fsync. I changed it to call Utils.atomicMoveWithFallback with needFlushParentDir = false (which is consistent behavior with index files deletion. index files deletion also doesn't flush parent dir) This change shouldn't cause problems neither. (3) LeaderEpochFileCache.truncateFromStart when incrementing log-start-offset This path is called from deleteRecords on request-handler threads. Here, we don't need fsync(2) either actually. On unclean shutdown, few leader epochs might be remained in the file but it will be handled by LogLoader on start-up so not a problem (4) LeaderEpochFileCache.truncateFromEnd as part of log truncation Likewise, we don't need fsync(2) here, since any epochs which are untruncated on unclean shutdown will be handled on log loading procedure Reviewers: Luke Chen <[email protected]>, Divij Vaidya <[email protected]>, Justine Olshan <[email protected]>, Jun Rao <[email protected]>
…stabilize performance (apache#14242) While any blocking operation under holding the UnifiedLog.lock could lead to serious performance (even availability) issues, currently there are several paths that calls fsync(2) inside the lock In the meantime the lock is held, all subsequent produces against the partition may block This easily causes all request-handlers to be busy on bad disk performance Even worse, when a disk experiences tens of seconds of glitch (it's not rare in spinning drives), it makes the broker to unable to process any requests with unfenced from the cluster (i.e. "zombie" like status) This PR gets rid of 4 cases of essentially-unnecessary fsync(2) calls performed under the lock: (1) ProducerStateManager.takeSnapshot at UnifiedLog.roll I moved fsync(2) call to the scheduler thread as part of existing "flush-log" job (before incrementing recovery point) Since it's still ensured that the snapshot is flushed before incrementing recovery point, this change shouldn't cause any problem (2) ProducerStateManager.removeAndMarkSnapshotForDeletion as part of log segment deletion This method calls Utils.atomicMoveWithFallback with needFlushParentDir = true internally, which calls fsync. I changed it to call Utils.atomicMoveWithFallback with needFlushParentDir = false (which is consistent behavior with index files deletion. index files deletion also doesn't flush parent dir) This change shouldn't cause problems neither. (3) LeaderEpochFileCache.truncateFromStart when incrementing log-start-offset This path is called from deleteRecords on request-handler threads. Here, we don't need fsync(2) either actually. On unclean shutdown, few leader epochs might be remained in the file but it will be handled by LogLoader on start-up so not a problem (4) LeaderEpochFileCache.truncateFromEnd as part of log truncation Likewise, we don't need fsync(2) here, since any epochs which are untruncated on unclean shutdown will be handled on log loading procedure Reviewers: Luke Chen <[email protected]>, Divij Vaidya <[email protected]>, Justine Olshan <[email protected]>, Jun Rao <[email protected]>
…stabilize performance (apache#14242) While any blocking operation under holding the UnifiedLog.lock could lead to serious performance (even availability) issues, currently there are several paths that calls fsync(2) inside the lock In the meantime the lock is held, all subsequent produces against the partition may block This easily causes all request-handlers to be busy on bad disk performance Even worse, when a disk experiences tens of seconds of glitch (it's not rare in spinning drives), it makes the broker to unable to process any requests with unfenced from the cluster (i.e. "zombie" like status) This PR gets rid of 4 cases of essentially-unnecessary fsync(2) calls performed under the lock: (1) ProducerStateManager.takeSnapshot at UnifiedLog.roll I moved fsync(2) call to the scheduler thread as part of existing "flush-log" job (before incrementing recovery point) Since it's still ensured that the snapshot is flushed before incrementing recovery point, this change shouldn't cause any problem (2) ProducerStateManager.removeAndMarkSnapshotForDeletion as part of log segment deletion This method calls Utils.atomicMoveWithFallback with needFlushParentDir = true internally, which calls fsync. I changed it to call Utils.atomicMoveWithFallback with needFlushParentDir = false (which is consistent behavior with index files deletion. index files deletion also doesn't flush parent dir) This change shouldn't cause problems neither. (3) LeaderEpochFileCache.truncateFromStart when incrementing log-start-offset This path is called from deleteRecords on request-handler threads. Here, we don't need fsync(2) either actually. On unclean shutdown, few leader epochs might be remained in the file but it will be handled by LogLoader on start-up so not a problem (4) LeaderEpochFileCache.truncateFromEnd as part of log truncation Likewise, we don't need fsync(2) here, since any epochs which are untruncated on unclean shutdown will be handled on log loading procedure Reviewers: Luke Chen <[email protected]>, Divij Vaidya <[email protected]>, Justine Olshan <[email protected]>, Jun Rao <[email protected]>
…stabilize performance (apache#14242) While any blocking operation under holding the UnifiedLog.lock could lead to serious performance (even availability) issues, currently there are several paths that calls fsync(2) inside the lock In the meantime the lock is held, all subsequent produces against the partition may block This easily causes all request-handlers to be busy on bad disk performance Even worse, when a disk experiences tens of seconds of glitch (it's not rare in spinning drives), it makes the broker to unable to process any requests with unfenced from the cluster (i.e. "zombie" like status) This PR gets rid of 4 cases of essentially-unnecessary fsync(2) calls performed under the lock: (1) ProducerStateManager.takeSnapshot at UnifiedLog.roll I moved fsync(2) call to the scheduler thread as part of existing "flush-log" job (before incrementing recovery point) Since it's still ensured that the snapshot is flushed before incrementing recovery point, this change shouldn't cause any problem (2) ProducerStateManager.removeAndMarkSnapshotForDeletion as part of log segment deletion This method calls Utils.atomicMoveWithFallback with needFlushParentDir = true internally, which calls fsync. I changed it to call Utils.atomicMoveWithFallback with needFlushParentDir = false (which is consistent behavior with index files deletion. index files deletion also doesn't flush parent dir) This change shouldn't cause problems neither. (3) LeaderEpochFileCache.truncateFromStart when incrementing log-start-offset This path is called from deleteRecords on request-handler threads. Here, we don't need fsync(2) either actually. On unclean shutdown, few leader epochs might be remained in the file but it will be handled by LogLoader on start-up so not a problem (4) LeaderEpochFileCache.truncateFromEnd as part of log truncation Likewise, we don't need fsync(2) here, since any epochs which are untruncated on unclean shutdown will be handled on log loading procedure Reviewers: Luke Chen <[email protected]>, Divij Vaidya <[email protected]>, Justine Olshan <[email protected]>, Jun Rao <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ocadaruma : Added another comment related to this PR.
synchronized (lock) { | ||
// write to temp file and then swap with the existing file | ||
try (FileOutputStream fileOutputStream = new FileOutputStream(tempPath.toFile()); | ||
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) { | ||
CheckpointWriteBuffer<T> checkpointWriteBuffer = new CheckpointWriteBuffer<>(writer, version, formatter); | ||
checkpointWriteBuffer.write(entries); | ||
writer.flush(); | ||
fileOutputStream.getFD().sync(); | ||
if (sync) { | ||
fileOutputStream.getFD().sync(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ocadaruma : I realized a potential issue with this change. The issue is that if sync is false, we don't force a flush to disk. However, the OS could flush partial content of the leader epoch file. If the broker has a hard failure, the leader epoch file could be corrupted. In the recovery path, since we always expect the leader epoch file to be well-formed, a corrupted leader epoch file will fail the recovery.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao Hmm, that's true. Thanks for pointing out.
Created a ticket for this and assigned me. https://issues.apache.org/jira/browse/KAFKA-16541
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @ocadaruma !
JIRA ticket: https://issues.apache.org/jira/browse/KAFKA-15046
Utils.atomicMoveWithFallback
withneedFlushParentDir = true
internally, which calls fsync.Utils.atomicMoveWithFallback
withneedFlushParentDir = false
(which is consistent behavior with index files deletion. index files deletion also doesn't flush parent dir)To check if these changes don't cause a problem, below consistency expectation table will be helpful:
- If the snapshot content before recovery point is not consistent with the log, it will cause a problem like idempotency violation due to the missing producer state.
- Even when the broker crashes by power failure before the files are deleted from the actual disk, they should be eventually deleted from the disk.
- It will return wrong entry when reading the leader epoch cache (e.g. in list offsets request handling)
We can confirm the changes are valid based on above table like this:
-deleted
for segment deletion by log retention, but broker crashes by power failure before the rename is persisted to the disk-deleted
suffix stripped, then ProducerStateManager would load these snapshot files unnecessarily.-deleted
for topic deletion, but broker crashes by power failure before the rename is persisted to the disk-deleted
suffix stripped.-delete
and it's fsynced anyways, the revert of snapshot file wouldn't be a problem. Parent log dirs will be deleted anyways after resuming topic deletion procedure.Committer Checklist (excluded from commit message)