diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 8a71ddda4cb5e..3314568dd512d 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -64,6 +64,11 @@ private[spark] class Worker( val REGISTRATION_TIMEOUT = 20.seconds val REGISTRATION_RETRIES = 3 + // How often worker will clean up old app folders + val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup_interval", 60 * 30) * 1000 + // TTL for app folders/data; after TTL expires it will be cleaned up + val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.app_data_ttl", 15 * 24 * 3600) + // Index into masterUrls that we're currently trying to register with. var masterIndex = 0 @@ -179,12 +184,26 @@ private[spark] class Worker( registered = true changeMaster(masterUrl, masterWebUiUrl) context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat) + context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis, + CLEANUP_INTERVAL_MILLIS millis, self, Worker.AppDirCleanup) case SendHeartbeat => masterLock.synchronized { if (connected) { master ! Heartbeat(workerId) } } + case Worker.AppDirCleanup => + // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor + val cleanupFuture = concurrent.future { + logInfo("Cleaning up oldest application directories in " + workDir + " ...") + Utils.findOldestFiles(workDir, APP_DATA_RETENTION_SECS) + .foreach(Utils.deleteRecursively(_)) + } + cleanupFuture onFailure { + case e: Throwable => + logError("App dir cleanup failed: " + e.getMessage, e) + } + case MasterChanged(masterUrl, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterUrl) changeMaster(masterUrl, masterWebUiUrl) @@ -331,6 +350,7 @@ private[spark] class Worker( } private[spark] object Worker { + case object AppDirCleanup // Sent to Worker actor periodically for cleaning up app folders def main(argStrings: Array[String]) { val args = new WorkerArguments(argStrings) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 62ee704d580c2..3add43cf2cf68 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -536,6 +536,22 @@ private[spark] object Utils extends Logging { } } + /** + * Finds all the files in a directory whose last modified time is older than cutoff seconds. + * @param dir must be the path to a directory, or IllegalArgumentException is thrown + * @param cutoff filter for files is lastModified < (currentTimeMillis/1000 - cutoff) + */ + def findOldestFiles(dir: File, cutoff: Long): Seq[File] = { + if (dir.isDirectory) { + val files = listFilesSafely(dir) + files.filter { file => + file.lastModified < ((System.currentTimeMillis / 1000) - cutoff) + } + } else { + throw new IllegalArgumentException(dir + " is not a directory!") + } + } + /** * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes. */