-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-3968: fsync the parent directory of a segment file when the file is created #10405
Changes from 23 commits
459facb
b4c8284
b260169
ba086e9
2a19e0e
40a1abe
1ac80b6
09cac0b
5be95aa
c9448c8
daeb698
0d4800b
95a6c3f
8c859f3
fdc1faa
6795ec9
55ae3bc
e653af4
85861ee
1578678
fffc353
f66c545
56be9d8
1ecf94b
080a79a
61eee4a
7543938
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,7 +28,7 @@ import kafka.server.metadata.ConfigRepository | |
import kafka.server._ | ||
import kafka.utils._ | ||
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} | ||
import org.apache.kafka.common.utils.Time | ||
import org.apache.kafka.common.utils.{Time, Utils} | ||
import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException, LogDirNotFoundException} | ||
|
||
import scala.jdk.CollectionConverters._ | ||
|
@@ -150,6 +150,7 @@ class LogManager(logDirs: Seq[File], | |
val created = dir.mkdirs() | ||
if (!created) | ||
throw new IOException(s"Failed to create data directory ${dir.getAbsolutePath}") | ||
Utils.flushParentDir(dir.toPath) | ||
} | ||
if (!dir.isDirectory || !dir.canRead) | ||
throw new IOException(s"${dir.getAbsolutePath} is not a readable log directory.") | ||
|
@@ -866,6 +867,7 @@ class LogManager(logDirs: Seq[File], | |
val dir = new File(logDirPath, logDirName) | ||
try { | ||
Files.createDirectories(dir.toPath) | ||
Utils.flushParentDir(dir.toPath) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am wondering if we should flush the parent dir when we delete a log too. This is not strictly required for every delete. So one option is to flush every parent dir when closing the LogManager. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Per our offline discussion, we decided not to flush at deletion. Deletions are async and can be retried after rebooting. |
||
Success(dir) | ||
} catch { | ||
case e: IOException => | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ import java.io.{File, IOException} | |
import java.nio.file.{Files, NoSuchFileException} | ||
import java.nio.file.attribute.FileTime | ||
import java.util.concurrent.TimeUnit | ||
import java.util.concurrent.atomic.AtomicBoolean | ||
import kafka.common.LogSegmentOffsetOverflowException | ||
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} | ||
import kafka.server.epoch.LeaderEpochFileCache | ||
|
@@ -59,7 +60,8 @@ class LogSegment private[log] (val log: FileRecords, | |
val baseOffset: Long, | ||
val indexIntervalBytes: Int, | ||
val rollJitterMs: Long, | ||
val time: Time) extends Logging { | ||
val time: Time, | ||
val needsFlushParentDir: Boolean = false) extends Logging { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we add the new param to the javadoc? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
|
||
def offsetIndex: OffsetIndex = lazyOffsetIndex.get | ||
|
||
|
@@ -95,6 +97,9 @@ class LogSegment private[log] (val log: FileRecords, | |
/* the number of bytes since we last added an entry in the offset index */ | ||
private var bytesSinceLastIndexEntry = 0 | ||
|
||
/* whether or not we need to flush the parent dir during flush */ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. during flush => during the first flush ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not really. We changed the value of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed it to |
||
private val atomicNeedsFlushParentDir = new AtomicBoolean(needsFlushParentDir) | ||
|
||
// The timestamp we used for time based log rolling and for ensuring max compaction delay | ||
// volatile for LogCleaner to see the update | ||
@volatile private var rollingBasedTimestamp: Option[Long] = None | ||
|
@@ -472,6 +477,9 @@ class LogSegment private[log] (val log: FileRecords, | |
offsetIndex.flush() | ||
timeIndex.flush() | ||
txnIndex.flush() | ||
// We only need to flush the parent of the log file because all other files share the same parent | ||
if (atomicNeedsFlushParentDir.getAndSet(false)) | ||
log.flushParentDir() | ||
} | ||
} | ||
|
||
|
@@ -490,11 +498,14 @@ class LogSegment private[log] (val log: FileRecords, | |
* Change the suffix for the index and log files for this log segment | ||
* IOException from this method should be handled by the caller | ||
*/ | ||
def changeFileSuffixes(oldSuffix: String, newSuffix: String): Unit = { | ||
def changeFileSuffixes(oldSuffix: String, newSuffix: String, needsFlushParentDir: Boolean = true): Unit = { | ||
log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) | ||
lazyOffsetIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyOffsetIndex.file.getPath, oldSuffix, newSuffix))) | ||
lazyTimeIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyTimeIndex.file.getPath, oldSuffix, newSuffix))) | ||
txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix))) | ||
// We only need to flush the parent of the log file because all other files share the same parent | ||
if (needsFlushParentDir) | ||
log.flushParentDir() | ||
} | ||
|
||
/** | ||
|
@@ -657,17 +668,19 @@ class LogSegment private[log] (val log: FileRecords, | |
object LogSegment { | ||
|
||
def open(dir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false, | ||
initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = ""): LogSegment = { | ||
initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = "", | ||
needsRecovery: Boolean = true): LogSegment = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that needsRecovery should default to false? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I changed this but didn't push. Thanks |
||
val maxIndexSize = config.maxIndexSize | ||
new LogSegment( | ||
FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate), | ||
FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate, needsRecovery), | ||
LazyIndex.forOffset(Log.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize), | ||
LazyIndex.forTime(Log.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize), | ||
new TransactionIndex(baseOffset, Log.transactionIndexFile(dir, baseOffset, fileSuffix)), | ||
baseOffset, | ||
indexIntervalBytes = config.indexInterval, | ||
rollJitterMs = config.randomSegmentJitter, | ||
time) | ||
time, | ||
needsRecovery || !fileAlreadyExists) | ||
} | ||
|
||
def deleteIfExists(dir: File, baseOffset: Long, fileSuffix: String = ""): Unit = { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change seems unneeded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True.