Skip to content

Commit

Permalink
[SPARK-1860] Cleaning up the logs generated when cleaning directories.
Browse files Browse the repository at this point in the history
  • Loading branch information
mccheah committed Oct 2, 2014
1 parent e0a1f2e commit 802473e
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ private[spark] class Worker(
changeMaster(masterUrl, masterWebUiUrl)
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
if (CLEANUP_ENABLED) {
logInfo("Worker cleanup is enabled, so old application directories will be deleted"
+ " in: " + workDir)
context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
}
Expand All @@ -204,7 +206,6 @@ private[spark] class Worker(
case WorkDirCleanup =>
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
val cleanupFuture = concurrent.future {
logInfo("Cleaning up oldest application directories in " + workDir + " ...")
val appDirs = workDir.listFiles()
if (appDirs == null) {
throw new IOException("ERROR: Failed to list files in " + appDirs)
Expand All @@ -216,7 +217,10 @@ private[spark] class Worker(
val isAppStillRunning = executors.values.map(_.appId).contains(appIdFromDir)
dir.isDirectory && !isAppStillRunning &&
!Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECS)
}.foreach(Utils.deleteRecursively)
}.foreach { dir =>
logInfo("Removing directory: %s".format(dir.getPath))
Utils.deleteRecursively(dir)
}
}

cleanupFuture onFailure {
Expand Down Expand Up @@ -263,7 +267,8 @@ private[spark] class Worker(
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
} catch {
case e: Exception => {
logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
logError("Failed to launch executor %s/%d for %s. Caused by exception: %s"
.format(appId, execId, appDesc.name, e.toString))
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -710,15 +710,15 @@ private[spark] object Utils extends Logging {
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
* @param cutoff measured in seconds. Returns true if there are any files in dir newer than this.
*/
def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long) : Boolean = {
def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long): Boolean = {
val currentTimeMillis = System.currentTimeMillis
if (!dir.isDirectory) {
throw new IllegalArgumentException (dir + " is not a directory!")
} else {
val files = FileUtils.listFilesAndDirs(dir, TrueFileFilter.TRUE, TrueFileFilter.TRUE)
val cutoffTimeInMillis = (currentTimeMillis - (cutoff * 1000))
val newFiles = files.filter { file => file.lastModified > cutoffTimeInMillis }
(dir.lastModified > cutoffTimeInMillis) || (!newFiles.isEmpty)
val newFiles = files.filter { _.lastModified > cutoffTimeInMillis }
newFiles.nonEmpty
}
}

Expand Down

0 comments on commit 802473e

Please sign in to comment.