Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-1860] More conservative app directory cleanup. #2609

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private[spark] class ExecutorRunner(
val workerId: String,
val host: String,
val sparkHome: File,
val workDir: File,
val executorDir: File,
val workerUrl: String,
val conf: SparkConf,
var state: ExecutorState.Value)
Expand Down Expand Up @@ -130,12 +130,6 @@ private[spark] class ExecutorRunner(
*/
def fetchAndRunExecutor() {
try {
// Create the executor's working directory
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}

// Launch the process
val command = getCommandSeq
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
Expand Down Expand Up @@ -174,7 +168,7 @@ private[spark] class ExecutorRunner(
killProcess(None)
}
case e: Exception => {
logError("Error running executor", e)
logError(e.toString, e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine as it was -- I was referring to this line:
https://github.com/apache/spark/pull/2609/files#diff-916ca56b663f178f302c265b7ef38499R271

state = ExecutorState.FAILED
killProcess(Some(e.toString))
}
Expand Down
27 changes: 24 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
package org.apache.spark.deploy.worker

import java.io.File
import java.io.IOException
import java.text.SimpleDateFormat
import java.util.Date

import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import scala.concurrent.duration._
import scala.language.postfixOps

import akka.actor._
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.commons.io.FileUtils

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
Expand Down Expand Up @@ -202,9 +205,20 @@ private[spark] class Worker(
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
val cleanupFuture = concurrent.future {
logInfo("Cleaning up oldest application directories in " + workDir + " ...")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's get rid of this unconditional periodic log message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, perhaps add a logInfo on line 196 above when CLEANUP_ENABLED to say like "Worker cleanup is enabled, so old application directories will be deleted." I think this message on startup, plus the message on actual directory deletion, is ideal.

Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS)
.foreach(Utils.deleteRecursively)
val appDirs = workDir.listFiles()
if (appDirs == null) {
throw new IOException("ERROR: Failed to list files in " + appDirs)
}
appDirs.filter { dir => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You do not need the extra bracket after the "dir =>". We use the enclosing bracket's scope.

// the directory is used by an application - check that the application is not running
// when cleaning up
val appIdFromDir = dir.getName
val isAppStillRunning = executors.values.map(_.appId).contains(appIdFromDir)
dir.isDirectory && !isAppStillRunning &&
!Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECS)
} }.foreach(Utils.deleteRecursively)
}

cleanupFuture onFailure {
case e: Throwable =>
logError("App dir cleanup failed: " + e.getMessage, e)
Expand Down Expand Up @@ -233,8 +247,15 @@ private[spark] class Worker(
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))

// Create the executor's working directory
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that the ExecutorStateChanged here does not give any indication of what happened. Would you mind setting the message to Some(e.toString). This will include the Exception's class and message, but not stack trace, which seems reasonable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, I'm referring to the catch clause of this try.

}

val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.LOADING)
self, workerId, host, sparkHome, executorDir, akkaUrl, conf, ExecutorState.LOADING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
Expand Down
20 changes: 12 additions & 8 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.json4s._
import tachyon.client.{TachyonFile,TachyonFS}

import org.apache.commons.io.FileUtils
import org.apache.spark._
import org.apache.spark.executor.ExecutorUncaughtExceptionHandler
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
Expand Down Expand Up @@ -703,17 +704,20 @@ private[spark] object Utils extends Logging {
}

/**
* Finds all the files in a directory whose last modified time is older than cutoff seconds.
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
* @param cutoff measured in seconds. Files older than this are returned.
* Determines if a directory contains any files newer than cutoff seconds.
*
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
* @param cutoff measured in seconds. Returns true if there are any files in dir newer than this.
*/
def findOldFiles(dir: File, cutoff: Long): Seq[File] = {
def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long) : Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: style is to not put a space before the colon

val currentTimeMillis = System.currentTimeMillis
if (dir.isDirectory) {
val files = listFilesSafely(dir)
files.filter { file => file.lastModified < (currentTimeMillis - cutoff * 1000) }
if (!dir.isDirectory) {
throw new IllegalArgumentException (dir + " is not a directory!")
} else {
throw new IllegalArgumentException(dir + " is not a directory!")
val files = FileUtils.listFiles(dir, null, true)
val cutoffTimeInMillis = (currentTimeMillis - (cutoff * 1000))
val newFiles = files.filter { file => file.lastModified > cutoffTimeInMillis }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could be simplified to
files.filter(_.lastModified > cutoffTimeInMillis)

(dir.lastModified > cutoffTimeInMillis) || (!newFiles.isEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newFiles.nonEmpty

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileUtils.listFilesAndDirs() appears to include the top-level directory as well, so I don't think we need to special-case it.

}
}

Expand Down
23 changes: 17 additions & 6 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -189,17 +189,28 @@ class UtilsSuite extends FunSuite {
assert(Utils.getIteratorSize(iterator) === 5L)
}

test("findOldFiles") {
test("doesDirectoryContainFilesNewerThan") {
// create some temporary directories and files
val parent: File = Utils.createTempDir()
val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories
val child2: File = Utils.createTempDir(parent.getCanonicalPath)
// set the last modified time of child1 to 10 secs old
child1.setLastModified(System.currentTimeMillis() - (1000 * 10))
val child3: File = Utils.createTempDir(child1.getCanonicalPath)
// set the last modified time of child1 to 30 secs old
child1.setLastModified(System.currentTimeMillis() - (1000 * 30))

val result = Utils.findOldFiles(parent, 5) // find files older than 5 secs
assert(result.size.equals(1))
assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath))
// although child1 is old, child2 is still new so return true
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))

child2.setLastModified(System.currentTimeMillis - (1000 * 30))
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))

parent.setLastModified(System.currentTimeMillis - (1000 * 30))
// although parent and its immediate children are new, child3 is still old
// we expect a full recursive search for new files.
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))

child3.setLastModified(System.currentTimeMillis - (1000 * 30))
assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5))
}

test("resolveURI") {
Expand Down