diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 83ce14a0a806a..a7368f9f3dfbe 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -86,6 +86,10 @@ private[deploy] object DeployMessages { case class KillDriver(driverId: String) extends DeployMessage + // Worker internal + + case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders + // AppClient to Master case class RegisterApplication(appDescription: ApplicationDescription) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 8a71ddda4cb5e..bf5a8d09dd2df 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -64,6 +64,12 @@ private[spark] class Worker( val REGISTRATION_TIMEOUT = 20.seconds val REGISTRATION_RETRIES = 3 + val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", true) + // How often worker will clean up old app folders + val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000 + // TTL for app folders/data; after TTL expires it will be cleaned up + val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600) + // Index into masterUrls that we're currently trying to register with. var masterIndex = 0 @@ -179,12 +185,28 @@ private[spark] class Worker( registered = true changeMaster(masterUrl, masterWebUiUrl) context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat) + if (CLEANUP_ENABLED) { + context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis, + CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup) + } case SendHeartbeat => masterLock.synchronized { if (connected) { master ! Heartbeat(workerId) } } + case WorkDirCleanup => + // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor + val cleanupFuture = concurrent.future { + logInfo("Cleaning up oldest application directories in " + workDir + " ...") + Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS) + .foreach(Utils.deleteRecursively) + } + cleanupFuture onFailure { + case e: Throwable => + logError("App dir cleanup failed: " + e.getMessage, e) + } + case MasterChanged(masterUrl, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterUrl) changeMaster(masterUrl, masterWebUiUrl) @@ -331,7 +353,6 @@ private[spark] class Worker( } private[spark] object Worker { - def main(argStrings: Array[String]) { val args = new WorkerArguments(argStrings) val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores, diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index d3c39dee330b2..4435b21a7505e 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -597,9 +597,24 @@ private[spark] object Utils extends Logging { } if (fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())) { - return false; + return false } else { - return true; + return true + } + } + + /** + * Finds all the files in a directory whose last modified time is older than cutoff seconds. + * @param dir must be the path to a directory, or IllegalArgumentException is thrown + * @param cutoff measured in seconds. Files older than this are returned. + */ + def findOldFiles(dir: File, cutoff: Long): Seq[File] = { + val currentTimeMillis = System.currentTimeMillis + if (dir.isDirectory) { + val files = listFilesSafely(dir) + files.filter { file => file.lastModified < (currentTimeMillis - cutoff * 1000) } + } else { + throw new IllegalArgumentException(dir + " is not a directory!") } } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 616214fb5e3a6..eb7fb6318262b 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.util import scala.util.Random -import java.io.{ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream} +import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream} import java.nio.{ByteBuffer, ByteOrder} import com.google.common.base.Charsets @@ -154,5 +154,18 @@ class UtilsSuite extends FunSuite { val iterator = Iterator.range(0, 5) assert(Utils.getIteratorSize(iterator) === 5L) } + + test("findOldFiles") { + // create some temporary directories and files + val parent: File = Utils.createTempDir() + val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories + val child2: File = Utils.createTempDir(parent.getCanonicalPath) + // set the last modified time of child1 to 10 secs old + child1.setLastModified(System.currentTimeMillis() - (1000 * 10)) + + val result = Utils.findOldFiles(parent, 5) // find files older than 5 secs + assert(result.size.equals(1)) + assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath)) + } } diff --git a/docs/configuration.md b/docs/configuration.md index b6005acac8b93..57bda20edcdf1 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -348,6 +348,32 @@ Apart from these, the following properties are also available, and may be useful receives no heartbeats. +