Skip to content

Commit

Permalink
[SPARK-1860] More conservative app directory cleanup.
Browse files Browse the repository at this point in the history
Before, the app-* directory was cleaned up whenever its timestamp was
older than a given time. However, the timestamp on a directory may be
older than the timestamps of the files the directory contains. This
change only cleans up app-* directories if all of the directory's
contents are old.
  • Loading branch information
mccheah committed Oct 2, 2014
1 parent c5414b6 commit 77a9de0
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private[spark] class ExecutorRunner(
val workerId: String,
val host: String,
val sparkHome: File,
val workDir: File,
val executorDir: File,
val workerUrl: String,
val conf: SparkConf,
var state: ExecutorState.Value)
Expand Down Expand Up @@ -130,12 +130,6 @@ private[spark] class ExecutorRunner(
*/
def fetchAndRunExecutor() {
try {
// Create the executor's working directory
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}

// Launch the process
val command = getCommandSeq
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
Expand Down Expand Up @@ -174,7 +168,7 @@ private[spark] class ExecutorRunner(
killProcess(None)
}
case e: Exception => {
logError("Error running executor", e)
logError(e.toString, e)
state = ExecutorState.FAILED
killProcess(Some(e.toString))
}
Expand Down
27 changes: 24 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
package org.apache.spark.deploy.worker

import java.io.File
import java.io.IOException
import java.text.SimpleDateFormat
import java.util.Date

import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import scala.concurrent.duration._
import scala.language.postfixOps

import akka.actor._
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.commons.io.FileUtils

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
Expand Down Expand Up @@ -202,9 +205,20 @@ private[spark] class Worker(
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
val cleanupFuture = concurrent.future {
logInfo("Cleaning up oldest application directories in " + workDir + " ...")
Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS)
.foreach(Utils.deleteRecursively)
val appDirs = workDir.listFiles()
if (appDirs == null) {
throw new IOException("ERROR: Failed to list files in " + appDirs)
}
appDirs.filter { dir => {
// the directory is used by an application - check that the application is not running
// when cleaning up
val appIdFromDir = dir.getName
val isAppStillRunning = executors.values.map(_.appId).contains(appIdFromDir)
dir.isDirectory && !isAppStillRunning &&
!Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECS)
} }.foreach(Utils.deleteRecursively)
}

cleanupFuture onFailure {
case e: Throwable =>
logError("App dir cleanup failed: " + e.getMessage, e)
Expand Down Expand Up @@ -233,8 +247,15 @@ private[spark] class Worker(
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))

// Create the executor's working directory
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}

val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.LOADING)
self, workerId, host, sparkHome, executorDir, akkaUrl, conf, ExecutorState.LOADING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
Expand Down
20 changes: 12 additions & 8 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.json4s._
import tachyon.client.{TachyonFile,TachyonFS}

import org.apache.commons.io.FileUtils
import org.apache.spark._
import org.apache.spark.executor.ExecutorUncaughtExceptionHandler
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
Expand Down Expand Up @@ -703,17 +704,20 @@ private[spark] object Utils extends Logging {
}

/**
* Finds all the files in a directory whose last modified time is older than cutoff seconds.
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
* @param cutoff measured in seconds. Files older than this are returned.
* Determines if a directory contains any files newer than cutoff seconds.
*
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
* @param cutoff measured in seconds. Returns true if there are any files in dir newer than this.
*/
def findOldFiles(dir: File, cutoff: Long): Seq[File] = {
def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long) : Boolean = {
val currentTimeMillis = System.currentTimeMillis
if (dir.isDirectory) {
val files = listFilesSafely(dir)
files.filter { file => file.lastModified < (currentTimeMillis - cutoff * 1000) }
if (!dir.isDirectory) {
throw new IllegalArgumentException (dir + " is not a directory!")
} else {
throw new IllegalArgumentException(dir + " is not a directory!")
val files = FileUtils.listFiles(dir, null, true)
val cutoffTimeInMillis = (currentTimeMillis - (cutoff * 1000))
val newFiles = files.filter { file => file.lastModified > cutoffTimeInMillis }
(dir.lastModified > cutoffTimeInMillis) || (!newFiles.isEmpty)
}
}

Expand Down
23 changes: 17 additions & 6 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -189,17 +189,28 @@ class UtilsSuite extends FunSuite {
assert(Utils.getIteratorSize(iterator) === 5L)
}

test("findOldFiles") {
test("doesDirectoryContainFilesNewerThan") {
// create some temporary directories and files
val parent: File = Utils.createTempDir()
val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories
val child2: File = Utils.createTempDir(parent.getCanonicalPath)
// set the last modified time of child1 to 10 secs old
child1.setLastModified(System.currentTimeMillis() - (1000 * 10))
val child3: File = Utils.createTempDir(child1.getCanonicalPath)
// set the last modified time of child1 to 30 secs old
child1.setLastModified(System.currentTimeMillis() - (1000 * 30))

val result = Utils.findOldFiles(parent, 5) // find files older than 5 secs
assert(result.size.equals(1))
assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath))
// although child1 is old, child2 is still new so return true
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))

child2.setLastModified(System.currentTimeMillis - (1000 * 30))
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))

parent.setLastModified(System.currentTimeMillis - (1000 * 30))
// although parent and its immediate children are new, child3 is still old
// we expect a full recursive search for new files.
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))

child3.setLastModified(System.currentTimeMillis - (1000 * 30))
assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5))
}

test("resolveURI") {
Expand Down

0 comments on commit 77a9de0

Please sign in to comment.