Skip to content

Commit

Permalink
SPARK-1104: kill Process in workerThread of ExecutorRunner
Browse files Browse the repository at this point in the history
As reported in https://spark-project.atlassian.net/browse/SPARK-1104

By @pwendell: "Sometimes due to large shuffles executors will take a long time shutting down. In particular this can happen if large numbers of shuffle files are around (this will be alleviated by SPARK-1103, but nonetheless...).

The symptom is you have DEAD workers sitting around in the UI and the existing workers keep trying to re-register but can't because they've been assumed dead."

In this patch, I add lines in the handler of InterruptedException in workerThread of executorRunner, so that the process.destroy() and process.waitFor() can only block the workerThread instead of blocking the worker Actor...

---------

analysis: process.destroy() is a blocking method, i.e. it only returns when all shutdownHook threads return...so calling it in Worker thread will make Worker block for a long while....

about what will happen on the shutdown hooks when the JVM process is killed: http://www.tutorialspoint.com/java/lang/runtime_addshutdownhook.htm

Author: CodingCat <[email protected]>

Closes #35 from CodingCat/SPARK-1104 and squashes the following commits:

85767da [CodingCat] add null checking and remove unnecessary killProce
3107aeb [CodingCat] address Aaron's comments
eb615ba [CodingCat] kill the process when the error happens
0accf2f [CodingCat] set process to null after killed it
1d511c8 [CodingCat] kill Process in workerThread
  • Loading branch information
CodingCat authored and aarondav committed Apr 24, 2014
1 parent a03ac22 commit f99af85
Showing 1 changed file with 14 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,30 +58,29 @@ private[spark] class ExecutorRunner(
override def run() { fetchAndRunExecutor() }
}
workerThread.start()

// Shutdown hook that kills actors on shutdown.
shutdownHook = new Thread() {
override def run() {
if (process != null) {
logInfo("Shutdown hook killing child process.")
process.destroy()
process.waitFor()
}
killProcess()
}
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
}

private def killProcess() {
if (process != null) {
logInfo("Killing process!")
process.destroy()
process.waitFor()
}
}

/** Stop this executor runner, including killing the process it launched */
def kill() {
if (workerThread != null) {
// the workerThread will kill the child process when interrupted
workerThread.interrupt()
workerThread = null
if (process != null) {
logInfo("Killing process!")
process.destroy()
process.waitFor()
}
state = ExecutorState.KILLED
worker ! ExecutorStateChanged(appId, execId, state, None, None)
Runtime.getRuntime.removeShutdownHook(shutdownHook)
Expand Down Expand Up @@ -128,7 +127,6 @@ private[spark] class ExecutorRunner(
// parent process for the executor command
env.put("SPARK_LAUNCH_WITH_SCALA", "0")
process = builder.start()

val header = "Spark Executor Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)

Expand All @@ -148,14 +146,13 @@ private[spark] class ExecutorRunner(
val message = "Command exited with code " + exitCode
worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))
} catch {
case interrupted: InterruptedException =>
case interrupted: InterruptedException => {
logInfo("Runner thread for executor " + fullId + " interrupted")

killProcess()
}
case e: Exception => {
logError("Error running executor", e)
if (process != null) {
process.destroy()
}
killProcess()
state = ExecutorState.FAILED
val message = e.getClass + ": " + e.getMessage
worker ! ExecutorStateChanged(appId, execId, state, Some(message), None)
Expand Down

0 comments on commit f99af85

Please sign in to comment.