-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-1860] More conservative app directory cleanup. #2609
Changes from 2 commits
77a9de0
e0a1f2e
802473e
87b5d03
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,15 +18,18 @@ | |
package org.apache.spark.deploy.worker | ||
|
||
import java.io.File | ||
import java.io.IOException | ||
import java.text.SimpleDateFormat | ||
import java.util.Date | ||
|
||
import scala.collection.JavaConversions._ | ||
import scala.collection.mutable.HashMap | ||
import scala.concurrent.duration._ | ||
import scala.language.postfixOps | ||
|
||
import akka.actor._ | ||
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} | ||
import org.apache.commons.io.FileUtils | ||
|
||
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} | ||
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState} | ||
|
@@ -202,9 +205,20 @@ private[spark] class Worker( | |
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor | ||
val cleanupFuture = concurrent.future { | ||
logInfo("Cleaning up oldest application directories in " + workDir + " ...") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's get rid of this unconditional periodic log message. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, perhaps add a logInfo on line 196 above when CLEANUP_ENABLED to say like "Worker cleanup is enabled, so old application directories will be deleted." I think this message on startup, plus the message on actual directory deletion, is ideal. |
||
Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS) | ||
.foreach(Utils.deleteRecursively) | ||
val appDirs = workDir.listFiles() | ||
if (appDirs == null) { | ||
throw new IOException("ERROR: Failed to list files in " + appDirs) | ||
} | ||
appDirs.filter { dir => | ||
// the directory is used by an application - check that the application is not running | ||
// when cleaning up | ||
val appIdFromDir = dir.getName | ||
val isAppStillRunning = executors.values.map(_.appId).contains(appIdFromDir) | ||
dir.isDirectory && !isAppStillRunning && | ||
!Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECS) | ||
}.foreach(Utils.deleteRecursively) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add a logInfo here that we're cleaning up a particular directory right before the deleteRecursively. |
||
} | ||
|
||
cleanupFuture onFailure { | ||
case e: Throwable => | ||
logError("App dir cleanup failed: " + e.getMessage, e) | ||
|
@@ -233,8 +247,15 @@ private[spark] class Worker( | |
} else { | ||
try { | ||
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) | ||
|
||
// Create the executor's working directory | ||
val executorDir = new File(workDir, appId + "/" + execId) | ||
if (!executorDir.mkdirs()) { | ||
throw new IOException("Failed to create directory " + executorDir) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just realized that the ExecutorStateChanged here does not give any indication of what happened. Would you mind setting the message to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be clear, I'm referring to the |
||
} | ||
|
||
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, | ||
self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.LOADING) | ||
self, workerId, host, sparkHome, executorDir, akkaUrl, conf, ExecutorState.LOADING) | ||
executors(appId + "/" + execId) = manager | ||
manager.start() | ||
coresUsed += cores_ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,8 @@ import scala.util.control.{ControlThrowable, NonFatal} | |
|
||
import com.google.common.io.Files | ||
import com.google.common.util.concurrent.ThreadFactoryBuilder | ||
import org.apache.commons.io.FileUtils | ||
import org.apache.commons.io.filefilter.TrueFileFilter | ||
import org.apache.commons.lang3.SystemUtils | ||
import org.apache.hadoop.conf.Configuration | ||
import org.apache.log4j.PropertyConfigurator | ||
|
@@ -703,17 +705,20 @@ private[spark] object Utils extends Logging { | |
} | ||
|
||
/** | ||
* Finds all the files in a directory whose last modified time is older than cutoff seconds. | ||
* @param dir must be the path to a directory, or IllegalArgumentException is thrown | ||
* @param cutoff measured in seconds. Files older than this are returned. | ||
* Determines if a directory contains any files newer than cutoff seconds. | ||
* | ||
* @param dir must be the path to a directory, or IllegalArgumentException is thrown | ||
* @param cutoff measured in seconds. Returns true if there are any files in dir newer than this. | ||
*/ | ||
def findOldFiles(dir: File, cutoff: Long): Seq[File] = { | ||
def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long) : Boolean = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: style is to not put a space before the colon |
||
val currentTimeMillis = System.currentTimeMillis | ||
if (dir.isDirectory) { | ||
val files = listFilesSafely(dir) | ||
files.filter { file => file.lastModified < (currentTimeMillis - cutoff * 1000) } | ||
if (!dir.isDirectory) { | ||
throw new IllegalArgumentException (dir + " is not a directory!") | ||
} else { | ||
throw new IllegalArgumentException(dir + " is not a directory!") | ||
val files = FileUtils.listFilesAndDirs(dir, TrueFileFilter.TRUE, TrueFileFilter.TRUE) | ||
val cutoffTimeInMillis = (currentTimeMillis - (cutoff * 1000)) | ||
val newFiles = files.filter { file => file.lastModified > cutoffTimeInMillis } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: could be simplified to |
||
(dir.lastModified > cutoffTimeInMillis) || (!newFiles.isEmpty) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: newFiles.nonEmpty There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FileUtils.listFilesAndDirs() appears to include the top-level directory as well, so I don't think we need to special-case it. |
||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine as it was -- I was referring to this line:
https://github.com/apache/spark/pull/2609/files#diff-916ca56b663f178f302c265b7ef38499R271