Skip to content

Commit

Permalink
[SPARK-47130][CORE] Use listStatus to bypass block location info when…
Browse files Browse the repository at this point in the history
… cleaning driver logs

### What changes were proposed in this pull request?

Use `listStatus` instead of `listLocatedStatus` to bypass block location info when cleaning driver logs.

### Why are the changes needed?

Reduce the loader of the file system

### Does this PR introduce _any_ user-facing change?

no
### How was this patch tested?

existing tests
### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#45215 from yaooqinn/SPARK-47130.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
  • Loading branch information
yaooqinn committed Feb 23, 2024
1 parent 6ae0abb commit b90514c
Showing 1 changed file with 9 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1053,17 +1053,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val driverLogFs = new Path(driverLogDir).getFileSystem(hadoopConf)
val currentTime = clock.getTimeMillis()
val maxTime = currentTime - conf.get(MAX_DRIVER_LOG_AGE_S) * 1000
val logFiles = driverLogFs.listLocatedStatus(new Path(driverLogDir))
while (logFiles.hasNext()) {
val f = logFiles.next()
val logFiles = driverLogFs.listStatus(new Path(driverLogDir))
logFiles.foreach { f =>
// Do not rely on 'modtime' as it is not updated for all filesystems when files are written to
val logFileStr = f.getPath.toString
val deleteFile =
try {
val info = listing.read(classOf[LogInfo], f.getPath().toString())
val info = listing.read(classOf[LogInfo], logFileStr)
// Update the lastprocessedtime of file if it's length or modification time has changed
if (info.fileSize < f.getLen() || info.lastProcessed < f.getModificationTime()) {
listing.write(
info.copy(lastProcessed = currentTime, fileSize = f.getLen()))
listing.write(info.copy(lastProcessed = currentTime, fileSize = f.getLen))
false
} else if (info.lastProcessed > maxTime) {
false
Expand All @@ -1073,13 +1072,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
} catch {
case e: NoSuchElementException =>
// For every new driver log file discovered, create a new entry in listing
listing.write(LogInfo(f.getPath().toString(), currentTime, LogType.DriverLogs, None,
listing.write(LogInfo(logFileStr, currentTime, LogType.DriverLogs, None,
None, f.getLen(), None, None, false))
false
false
}
if (deleteFile) {
logInfo(s"Deleting expired driver log for: ${f.getPath().getName()}")
listing.delete(classOf[LogInfo], f.getPath().toString())
logInfo(s"Deleting expired driver log for: $logFileStr")
listing.delete(classOf[LogInfo], logFileStr)
deleteLog(driverLogFs, f.getPath())
}
}
Expand Down

0 comments on commit b90514c

Please sign in to comment.