Skip to content

Commit

Permalink
[SPARK-20358][CORE] Executors failing stage on interrupted exception …
Browse files Browse the repository at this point in the history
…thrown by cancelled tasks

## What changes were proposed in this pull request?

This was a regression introduced by my earlier PR here: apache#17531

It turns out NonFatal() does not in fact catch InterruptedException.

## How was this patch tested?

Extended cancellation unit test coverage. The first test fails before this patch.

cc JoshRosen mridulm

Author: Eric Liang <[email protected]>

Closes apache#17659 from ericl/spark-20358.
  • Loading branch information
ericl authored and Mingjie Tang committed Apr 24, 2017
1 parent 9280019 commit b5b5dda
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,8 @@ private[spark] class Executor(
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason)))

case NonFatal(_) if task != null && task.reasonIfKilled.isDefined =>
case _: InterruptedException | NonFatal(_) if
task != null && task.reasonIfKilled.isDefined =>
val killReason = task.reasonIfKilled.getOrElse("unknown reason")
logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason")
setTaskFinishedAndClearInterruptStatus()
Expand Down
26 changes: 17 additions & 9 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -540,10 +540,24 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}
}

// Launches one task that will run forever. Once the SparkListener detects the task has
testCancellingTasks("that raise interrupted exception on cancel") {
Thread.sleep(9999999)
}

// SPARK-20217 should not fail stage if task throws non-interrupted exception
testCancellingTasks("that raise runtime exception on cancel") {
try {
Thread.sleep(9999999)
} catch {
case t: Throwable =>
throw new RuntimeException("killed")
}
}

// Launches one task that will block forever. Once the SparkListener detects the task has
// started, kill and re-schedule it. The second run of the task will complete immediately.
// If this test times out, then the first version of the task wasn't killed successfully.
test("Killing tasks") {
def testCancellingTasks(desc: String)(blockFn: => Unit): Unit = test(s"Killing tasks $desc") {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))

SparkContextSuite.isTaskStarted = false
Expand Down Expand Up @@ -572,13 +586,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
// first attempt will hang
if (!SparkContextSuite.isTaskStarted) {
SparkContextSuite.isTaskStarted = true
try {
Thread.sleep(9999999)
} catch {
case t: Throwable =>
// SPARK-20217 should not fail stage if task throws non-interrupted exception
throw new RuntimeException("killed")
}
blockFn
}
// second attempt succeeds immediately
}
Expand Down

0 comments on commit b5b5dda

Please sign in to comment.