-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-3968: fsync the parent directory of a segment file when the file is created #10405
Conversation
Thanks for the PR. Did we check the performance impact of this change? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ccding : Thanks for the PR. A couple of comments below.
@@ -427,7 +445,7 @@ public static FileRecords open(File file, | |||
boolean preallocate) throws IOException { | |||
FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate); | |||
int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE; | |||
return new FileRecords(file, channel, 0, end, false); | |||
return new FileRecords(file, channel, 0, end, false, mutable && !fileAlreadyExists); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition mutable && !fileAlreadyExists
doesn't seem complete. When a broker is restarted, all existing log segments are opened with mutable and fileAlreadyExists. However, segments beyond the recovery point may not have been flushed before. When they are flushed, we need to also flush the parent directory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed with passing in the hadCleanShutdown
flag.
@@ -249,6 +266,7 @@ public void renameTo(File f) throws IOException { | |||
} finally { | |||
this.file = f; | |||
} | |||
needFlushParentDir.set(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this seems problematic. For example, when we do log cleaning, the steps are (1) write cleaned data to a new segment with .clean suffix; (2) flush the new segment; (3) rename the .clean file to .swap; (4) rename .swap to .log. There is no additional flush called after renaming. So, this flag won't trigger the flushing of the parent directory.
One way is to add a method that explicitly forces the flushing of the parent directory after renaming and add the call after step 4.
Also, it seems that we also need the logic to flush the parent directory of topic-partition. This is needed when new topic partition is added/deleted in a broker or when moving partition across disks in JBOD. The latter has the following steps: (1) copy log segment in directory topic-partition in one disk to directory topic-partition-future in another disk; (2) once the copying is done, rename topic-partition-future to topic-partition. Here, after step (2) it seems that we need the logic to flush the parent directory in both the old and the new disk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ccding : Thanks for the updated PR. A few more comments.
@@ -195,6 +199,17 @@ public int append(MemoryRecords records) throws IOException { | |||
*/ | |||
public void flush() throws IOException { | |||
channel.force(true); | |||
if (needFlushParentDir.getAndSet(false)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, we want to flush the parent dir first before setting needFlush to false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per our offline discussion, we leave it unchanged. If the flush causes an IOException, the partition will go offline and doesn't have further chances to call flush again.
* Flush the parent directory of a file to the physical disk, which makes sure the file is accessible after crashing. | ||
*/ | ||
public void flushParentDir() throws IOException { | ||
needFlushParentDir.set(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, we want to flush the parent dir first before setting needFlush to false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
Setting flag first to prevent other threads from calling flush
concurrently.
@@ -427,7 +442,7 @@ public static FileRecords open(File file, | |||
boolean preallocate) throws IOException { | |||
FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate); | |||
int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE; | |||
return new FileRecords(file, channel, 0, end, false); | |||
return new FileRecords(file, channel, 0, end, false, mutable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is ok but not the most accurate. We only need to set the flush flag to true if it's mutable and log recovery is needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed with passing in hadCleanShutdown
@@ -490,11 +490,14 @@ class LogSegment private[log] (val log: FileRecords, | |||
* Change the suffix for the index and log files for this log segment | |||
* IOException from this method should be handled by the caller | |||
*/ | |||
def changeFileSuffixes(oldSuffix: String, newSuffix: String): Unit = { | |||
def changeFileSuffixes(oldSuffix: String, newSuffix: String, needFlushParentDir: Boolean = true): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, we need to pass needFlushParentDir to each of log.renameTo and index.renameTo to disable flushing, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set the flag to false for rename.
@@ -848,6 +849,7 @@ class LogManager(logDirs: Seq[File], | |||
val dir = new File(logDirPath, logDirName) | |||
try { | |||
Files.createDirectories(dir.toPath) | |||
Utils.flushParentDir(dir.toPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if we should flush the parent dir when we delete a log too. This is not strictly required for every delete. So one option is to flush every parent dir when closing the LogManager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per our offline discussion, we decided not to flush at deletion. Deletions are async and can be retried after rebooting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ccding : Thanks for the updated PR. Just a couple of minor comments.
Also, as Ismael mentioned, it would be useful to run some per tests (e.g. ProducerPerformance) to see if there is any noticeable performance degradation with the PR.
@@ -320,7 +320,7 @@ class Log(@volatile private var _dir: File, | |||
initializeLeaderEpochCache() | |||
initializePartitionMetadata() | |||
|
|||
val nextOffset = loadSegments() | |||
val nextOffset = loadSegments(hadCleanShutdown) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no need to pass hadCleanShutdown in since it's already accessible from loadSegments().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
} | ||
|
||
public static FileRecords open(File file, | ||
boolean fileAlreadyExists, | ||
int initFileSize, | ||
boolean preallocate) throws IOException { | ||
return open(file, true, fileAlreadyExists, initFileSize, preallocate); | ||
boolean preallocate, boolean hadCleanShutdown) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably more intuitive to change hadCleanShutdown to needsRecovery and pass in the negation of the flag. Then, the default value of false makes more sense.
Also, could we put the change in a separate line to match the existing format?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ccding : Thanks for the update PR. A few more minor comments below.
@@ -433,7 +440,8 @@ public static FileRecords open(File file, | |||
public static FileRecords open(File file, | |||
boolean fileAlreadyExists, | |||
int initFileSize, | |||
boolean preallocate) throws IOException { | |||
boolean preallocate, | |||
boolean needsRecovery) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change seems unneeded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True.
@@ -59,7 +60,8 @@ class LogSegment private[log] (val log: FileRecords, | |||
val baseOffset: Long, | |||
val indexIntervalBytes: Int, | |||
val rollJitterMs: Long, | |||
val time: Time) extends Logging { | |||
val time: Time, | |||
val needsFlushParentDir: Boolean = false) extends Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add the new param to the javadoc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -95,6 +97,9 @@ class LogSegment private[log] (val log: FileRecords, | |||
/* the number of bytes since we last added an entry in the offset index */ | |||
private var bytesSinceLastIndexEntry = 0 | |||
|
|||
/* whether or not we need to flush the parent dir during flush */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
during flush => during the first flush ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really. We changed the value of atomicNeedsFlushParentDir
after the first flush. The value of needsFlushParentDir
in the construction function is for during the first flush. Do you have any suggestions on how to comment on them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it to during the next flush
.
@@ -657,17 +668,19 @@ class LogSegment private[log] (val log: FileRecords, | |||
object LogSegment { | |||
|
|||
def open(dir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false, | |||
initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = ""): LogSegment = { | |||
initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = "", | |||
needsRecovery: Boolean = true): LogSegment = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that needsRecovery should default to false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I changed this but didn't push. Thanks
Fixed comments from @junrao Also addressed the two problems we discussed offline:
Please take a look @junrao |
Ran The result before applying this PR:
The result after applying this PR:
The performance decreases a little, but not a significant difference. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ccding : Thanks for the updated PR and the performance results. The changes look good to me.
Are the jenkins test failures related to this PR?
For the performance results, what's the log segment size you used? It would be useful to try a smaller segment size (e.g. 10MB) to see the impact of this PR.
@junrao Thanks for the reiview. The previous test was run with default settings in The result before applying this PR:
The result after applying this PR:
We can clearly see some differences. Is this a concern? |
Failed tests are unrelated (connect and streams) and passed on my local run. |
@ccding : Thanks for the experimental results. It seems there is a 5-10% throughput drop with the new PR for 10MB segment. This may not be a big concern since it's an uncommon setting. It's interesting that the absolute throughput dropped significantly with 10MB segments compared with 1GB segment. Could you redo the tests for 100MB segments? Thanks. |
With 100MB segment size: Before this PR:
After this PR:
|
I am thinking it is the cost of the extra flush. We have one extra flush per segment, which is 1 extra flush per 10,000 records for 10MB segments and 1KB records. If it were 1GB segments, there is 1 extra flush per 1,000,000 records: 1/100 of the amortized extra cost. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ccding : Thanks for the last experiment. It seem the performance impact there is minimal. So, the PR LGTM.
Would the perf impact of this change be more significant with a larger number of partitions? |
…e-allocations-lz4 * apache-github/trunk: (243 commits) KAFKA-12590: Remove deprecated kafka.security.auth.Authorizer, SimpleAclAuthorizer and related classes in 3.0 (apache#10450) KAFKA-3968: fsync the parent directory of a segment file when the file is created (apache#10405) KAFKA-12283: disable flaky testMultipleWorkersRejoining to stabilize build (apache#10408) MINOR: remove KTable.to from the docs (apache#10464) MONOR: Remove redudant LocalLogManager (apache#10325) MINOR: support ImplicitLinkedHashCollection#sort (apache#10456) KAFKA-12587 Remove KafkaPrincipal#fromString for 3.0 (apache#10447) KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager (apache#10282) MINOR: Improve reproducability of raft simulation tests (apache#10422) KAFKA-12474: Handle failure to write new session keys gracefully (apache#10396) KAFKA-12593: Fix Apache License headers (apache#10452) MINOR: Fix typo in MirrorMaker v2 documentation (apache#10433) KAFKA-12600: Remove deprecated config value `default` for client config `client.dns.lookup` (apache#10458) KAFKA-12952: Remove deprecated LogConfig.Compact (apache#10451) Initial commit (apache#10454) KAFKA-12575: Eliminate Log.isLogDirOffline boolean attribute (apache#10430) KAFKA-8405; Remove deprecated `kafka-preferred-replica-election` command (apache#10443) MINOR: Fix docs for end-to-end record latency metrics (apache#10449) MINOR Replaced File with Path in LogSegmentData. (apache#10424) KAFKA-12583: Upgrade netty to 4.1.62.Final ...
…en the file is created (#10680) (reverted #10405). #10405 has several issues, for example: It fails to create a topic with 9000 partitions. It flushes in several unnecessary places. If multiple segments of the same partition are flushed at roughly the same time, we may end up doing multiple unnecessary flushes: the logic of handling the flush in LogSegments.scala is weird. Kafka does not call fsync() on the directory when a new log segment is created and flushed to disk. The problem is that following sequence of calls doesn't guarantee file durability: fd = open("log", O_RDWR | O_CREATE); // suppose open creates "log" write(fd); fsync(fd); If the system crashes after fsync() but before the parent directory has been flushed to disk, the log file can disappear. This PR is to flush the directory when flush() is called for the first time. Did performance test which shows this PR has a minimal performance impact on Kafka clusters. Reviewers: Jun Rao <[email protected]>
Kafka does not call fsync() on the directory when a new log segment is created and flushed to disk.
The problem is that following sequence of calls doesn't guarantee file durability:
fd = open("log", O_RDWR | O_CREATE); // suppose open creates "log"
write(fd);
fsync(fd);
If the system crashes after fsync() but before the parent directory has been flushed to disk, the log file can disappear.
This PR is to flush the directory when flush() is called for the first time.