From b5b5dda79eeffad55ebaa98704ad73e2c5086f79 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 20 Apr 2017 09:55:10 -0700 Subject: [PATCH] [SPARK-20358][CORE] Executors failing stage on interrupted exception thrown by cancelled tasks ## What changes were proposed in this pull request? This was a regression introduced by my earlier PR here: https://github.com/apache/spark/pull/17531 It turns out NonFatal() does not in fact catch InterruptedException. ## How was this patch tested? Extended cancellation unit test coverage. The first test fails before this patch. cc JoshRosen mridulm Author: Eric Liang Closes #17659 from ericl/spark-20358. --- .../org/apache/spark/executor/Executor.scala | 3 ++- .../org/apache/spark/SparkContextSuite.scala | 26 ++++++++++++------- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 83469c5ff0600..18f04391d64c3 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -432,7 +432,8 @@ private[spark] class Executor( setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason))) - case NonFatal(_) if task != null && task.reasonIfKilled.isDefined => + case _: InterruptedException | NonFatal(_) if + task != null && task.reasonIfKilled.isDefined => val killReason = task.reasonIfKilled.getOrElse("unknown reason") logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason") setTaskFinishedAndClearInterruptStatus() diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 735f4454e299e..7e26139a2bead 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -540,10 +540,24 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } } - // Launches one task that will run forever. Once the SparkListener detects the task has + testCancellingTasks("that raise interrupted exception on cancel") { + Thread.sleep(9999999) + } + + // SPARK-20217 should not fail stage if task throws non-interrupted exception + testCancellingTasks("that raise runtime exception on cancel") { + try { + Thread.sleep(9999999) + } catch { + case t: Throwable => + throw new RuntimeException("killed") + } + } + + // Launches one task that will block forever. Once the SparkListener detects the task has // started, kill and re-schedule it. The second run of the task will complete immediately. // If this test times out, then the first version of the task wasn't killed successfully. - test("Killing tasks") { + def testCancellingTasks(desc: String)(blockFn: => Unit): Unit = test(s"Killing tasks $desc") { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) SparkContextSuite.isTaskStarted = false @@ -572,13 +586,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu // first attempt will hang if (!SparkContextSuite.isTaskStarted) { SparkContextSuite.isTaskStarted = true - try { - Thread.sleep(9999999) - } catch { - case t: Throwable => - // SPARK-20217 should not fail stage if task throws non-interrupted exception - throw new RuntimeException("killed") - } + blockFn } // second attempt succeeds immediately }