diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 7c888a07263a8..e0128e35b761a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -1053,17 +1053,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val driverLogFs = new Path(driverLogDir).getFileSystem(hadoopConf) val currentTime = clock.getTimeMillis() val maxTime = currentTime - conf.get(MAX_DRIVER_LOG_AGE_S) * 1000 - val logFiles = driverLogFs.listLocatedStatus(new Path(driverLogDir)) - while (logFiles.hasNext()) { - val f = logFiles.next() + val logFiles = driverLogFs.listStatus(new Path(driverLogDir)) + logFiles.foreach { f => // Do not rely on 'modtime' as it is not updated for all filesystems when files are written to + val logFileStr = f.getPath.toString val deleteFile = try { - val info = listing.read(classOf[LogInfo], f.getPath().toString()) + val info = listing.read(classOf[LogInfo], logFileStr) // Update the lastprocessedtime of file if it's length or modification time has changed if (info.fileSize < f.getLen() || info.lastProcessed < f.getModificationTime()) { - listing.write( - info.copy(lastProcessed = currentTime, fileSize = f.getLen())) + listing.write(info.copy(lastProcessed = currentTime, fileSize = f.getLen)) false } else if (info.lastProcessed > maxTime) { false @@ -1073,13 +1072,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } catch { case e: NoSuchElementException => // For every new driver log file discovered, create a new entry in listing - listing.write(LogInfo(f.getPath().toString(), currentTime, LogType.DriverLogs, None, + listing.write(LogInfo(logFileStr, currentTime, LogType.DriverLogs, None, None, f.getLen(), None, None, false)) - false + false } if (deleteFile) { - logInfo(s"Deleting expired driver log for: ${f.getPath().getName()}") - listing.delete(classOf[LogInfo], f.getPath().toString()) + logInfo(s"Deleting expired driver log for: $logFileStr") + listing.delete(classOf[LogInfo], logFileStr) deleteLog(driverLogFs, f.getPath()) } }