diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 00a43673e5cd3..260cf71c780a2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -42,7 +42,7 @@ private[spark] class ExecutorRunner( val workerId: String, val host: String, val sparkHome: File, - val workDir: File, + val executorDir: File, val workerUrl: String, val conf: SparkConf, var state: ExecutorState.Value) @@ -130,12 +130,6 @@ private[spark] class ExecutorRunner( */ def fetchAndRunExecutor() { try { - // Create the executor's working directory - val executorDir = new File(workDir, appId + "/" + execId) - if (!executorDir.mkdirs()) { - throw new IOException("Failed to create directory " + executorDir) - } - // Launch the process val command = getCommandSeq logInfo("Launch command: " + command.mkString("\"", "\" \"", "\"")) @@ -174,7 +168,7 @@ private[spark] class ExecutorRunner( killProcess(None) } case e: Exception => { - logError("Error running executor", e) + logError(e.toString, e) state = ExecutorState.FAILED killProcess(Some(e.toString)) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 0c454e4138c96..57dc9b88dc756 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -18,15 +18,18 @@ package org.apache.spark.deploy.worker import java.io.File +import java.io.IOException import java.text.SimpleDateFormat import java.util.Date +import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap import scala.concurrent.duration._ import scala.language.postfixOps import akka.actor._ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} +import org.apache.commons.io.FileUtils import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.{ExecutorDescription, ExecutorState} @@ -202,9 +205,20 @@ private[spark] class Worker( // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor val cleanupFuture = concurrent.future { logInfo("Cleaning up oldest application directories in " + workDir + " ...") - Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS) - .foreach(Utils.deleteRecursively) + val appDirs = workDir.listFiles() + if (appDirs == null) { + throw new IOException("ERROR: Failed to list files in " + appDirs) + } + appDirs.filter { dir => { + // the directory is used by an application - check that the application is not running + // when cleaning up + val appIdFromDir = dir.getName + val isAppStillRunning = executors.values.map(_.appId).contains(appIdFromDir) + dir.isDirectory && !isAppStillRunning && + !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECS) + } }.foreach(Utils.deleteRecursively) } + cleanupFuture onFailure { case e: Throwable => logError("App dir cleanup failed: " + e.getMessage, e) @@ -233,8 +247,15 @@ private[spark] class Worker( } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) + + // Create the executor's working directory + val executorDir = new File(workDir, appId + "/" + execId) + if (!executorDir.mkdirs()) { + throw new IOException("Failed to create directory " + executorDir) + } + val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, - self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.LOADING) + self, workerId, host, sparkHome, executorDir, akkaUrl, conf, ExecutorState.LOADING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index dbe0cfa2b8ff9..5629e4e4b371b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.json4s._ import tachyon.client.{TachyonFile,TachyonFS} +import org.apache.commons.io.FileUtils import org.apache.spark._ import org.apache.spark.executor.ExecutorUncaughtExceptionHandler import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} @@ -703,17 +704,20 @@ private[spark] object Utils extends Logging { } /** - * Finds all the files in a directory whose last modified time is older than cutoff seconds. - * @param dir must be the path to a directory, or IllegalArgumentException is thrown - * @param cutoff measured in seconds. Files older than this are returned. + * Determines if a directory contains any files newer than cutoff seconds. + * + * @param dir must be the path to a directory, or IllegalArgumentException is thrown + * @param cutoff measured in seconds. Returns true if there are any files in dir newer than this. */ - def findOldFiles(dir: File, cutoff: Long): Seq[File] = { + def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long) : Boolean = { val currentTimeMillis = System.currentTimeMillis - if (dir.isDirectory) { - val files = listFilesSafely(dir) - files.filter { file => file.lastModified < (currentTimeMillis - cutoff * 1000) } + if (!dir.isDirectory) { + throw new IllegalArgumentException (dir + " is not a directory!") } else { - throw new IllegalArgumentException(dir + " is not a directory!") + val files = FileUtils.listFiles(dir, null, true) + val cutoffTimeInMillis = (currentTimeMillis - (cutoff * 1000)) + val newFiles = files.filter { file => file.lastModified > cutoffTimeInMillis } + (dir.lastModified > cutoffTimeInMillis) || (!newFiles.isEmpty) } } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 70d423ba8a04d..e63d9d085e385 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -189,17 +189,28 @@ class UtilsSuite extends FunSuite { assert(Utils.getIteratorSize(iterator) === 5L) } - test("findOldFiles") { + test("doesDirectoryContainFilesNewerThan") { // create some temporary directories and files val parent: File = Utils.createTempDir() val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories val child2: File = Utils.createTempDir(parent.getCanonicalPath) - // set the last modified time of child1 to 10 secs old - child1.setLastModified(System.currentTimeMillis() - (1000 * 10)) + val child3: File = Utils.createTempDir(child1.getCanonicalPath) + // set the last modified time of child1 to 30 secs old + child1.setLastModified(System.currentTimeMillis() - (1000 * 30)) - val result = Utils.findOldFiles(parent, 5) // find files older than 5 secs - assert(result.size.equals(1)) - assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath)) + // although child1 is old, child2 is still new so return true + assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) + + child2.setLastModified(System.currentTimeMillis - (1000 * 30)) + assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) + + parent.setLastModified(System.currentTimeMillis - (1000 * 30)) + // although parent and its immediate children are new, child3 is still old + // we expect a full recursive search for new files. + assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) + + child3.setLastModified(System.currentTimeMillis - (1000 * 30)) + assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5)) } test("resolveURI") {