Skip to content

Commit

Permalink
SPARK-1154: Add a periodic task to clean up app directories
Browse files Browse the repository at this point in the history
This adds two config params:
  spark.worker.cleanup_interval
  spark.worker.app_data_ttl
  • Loading branch information
Evan Chan committed Apr 3, 2014
1 parent 47ebea5 commit b92752b
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
20 changes: 20 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ private[spark] class Worker(
val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3

// How often worker will clean up old app folders
val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup_interval", 60 * 30) * 1000
// TTL for app folders/data; after TTL expires it will be cleaned up
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.app_data_ttl", 15 * 24 * 3600)

// Index into masterUrls that we're currently trying to register with.
var masterIndex = 0

Expand Down Expand Up @@ -179,12 +184,26 @@ private[spark] class Worker(
registered = true
changeMaster(masterUrl, masterWebUiUrl)
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
CLEANUP_INTERVAL_MILLIS millis, self, Worker.AppDirCleanup)

case SendHeartbeat =>
masterLock.synchronized {
if (connected) { master ! Heartbeat(workerId) }
}

case Worker.AppDirCleanup =>
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
val cleanupFuture = concurrent.future {
logInfo("Cleaning up oldest application directories in " + workDir + " ...")
Utils.findOldestFiles(workDir, APP_DATA_RETENTION_SECS)
.foreach(Utils.deleteRecursively(_))
}
cleanupFuture onFailure {
case e: Throwable =>
logError("App dir cleanup failed: " + e.getMessage, e)
}

case MasterChanged(masterUrl, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterUrl)
changeMaster(masterUrl, masterWebUiUrl)
Expand Down Expand Up @@ -331,6 +350,7 @@ private[spark] class Worker(
}

private[spark] object Worker {
case object AppDirCleanup // Sent to Worker actor periodically for cleaning up app folders

def main(argStrings: Array[String]) {
val args = new WorkerArguments(argStrings)
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,22 @@ private[spark] object Utils extends Logging {
}
}

/**
* Finds all the files in a directory whose last modified time is older than cutoff seconds.
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
* @param cutoff filter for files is lastModified < (currentTimeMillis/1000 - cutoff)
*/
def findOldestFiles(dir: File, cutoff: Long): Seq[File] = {
if (dir.isDirectory) {
val files = listFilesSafely(dir)
files.filter { file =>
file.lastModified < ((System.currentTimeMillis / 1000) - cutoff)
}
} else {
throw new IllegalArgumentException(dir + " is not a directory!")
}
}

/**
* Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
*/
Expand Down

0 comments on commit b92752b

Please sign in to comment.