Skip to content
This repository has been archived by the owner on Nov 15, 2024. It is now read-only.

Commit

Permalink
[SPARK-21219][CORE] Task retry occurs on same executor due to race co…
Browse files Browse the repository at this point in the history
…ndition with blacklisting

There's a race condition in the current TaskSetManager where a failed task is added for retry (addPendingTask), and can asynchronously be assigned to an executor *prior* to the blacklist state (updateBlacklistForFailedTask), the result is the task might re-execute on the same executor.  This is particularly problematic if the executor is shutting down since the retry task immediately becomes a lost task (ExecutorLostFailure).  Another side effect is that the actual failure reason gets obscured by the retry task which never actually executed.  There are sample logs showing the issue in the https://issues.apache.org/jira/browse/SPARK-21219

The fix is to change the ordering of the addPendingTask and updatingBlackListForFailedTask calls in TaskSetManager.handleFailedTask

Implemented a unit test that verifies the task is black listed before it is added to the pending task.  Ran the unit test without the fix and it fails.  Ran the unit test with the fix and it passes.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Eric Vandenberg <ericvandenbergfb.com>

Closes apache#18427 from ericvandenbergfb/blacklistFix.

## What changes were proposed in this pull request?

This is a backport of the fix to SPARK-21219, already checked in as 96d58f2.

## How was this patch tested?

Ran TaskSetManagerSuite tests locally.

Author: Eric Vandenberg <[email protected]>

Closes apache#18604 from jsoltren/branch-2.2.
  • Loading branch information
Eric Vandenberg authored and MatthewRBruce committed Jul 31, 2018
1 parent 5a6129e commit 512b196
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 11 deletions.
21 changes: 11 additions & 10 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private[spark] class TaskSetManager(
private[scheduler] var emittedTaskSizeWarning = false

/** Add a task to all the pending-task lists that it should be on. */
private def addPendingTask(index: Int) {
private[spark] def addPendingTask(index: Int) {
for (loc <- tasks(index).preferredLocations) {
loc match {
case e: ExecutorCacheTaskLocation =>
Expand Down Expand Up @@ -826,15 +826,6 @@ private[spark] class TaskSetManager(

sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)

if (successful(index)) {
logInfo(s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, but the task will not" +
s" be re-executed (either because the task failed with a shuffle data fetch failure," +
s" so the previous stage needs to be re-run, or because a different copy of the task" +
s" has already succeeded).")
} else {
addPendingTask(index)
}

if (!isZombie && reason.countTowardsTaskFailures) {
taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
info.host, info.executorId, index))
Expand All @@ -848,6 +839,16 @@ private[spark] class TaskSetManager(
return
}
}

if (successful(index)) {
logInfo(s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, but the task will not" +
s" be re-executed (either because the task failed with a shuffle data fetch failure," +
s" so the previous stage needs to be re-run, or because a different copy of the task" +
s" has already succeeded).")
} else {
addPendingTask(index)
}

maybeFinishTaskSet()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.mockito.Matchers.{any, anyInt, anyString}
import org.mockito.Mockito.{mock, never, spy, verify, when}
import org.mockito.Mockito.{mock, never, spy, times, verify, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer

Expand Down Expand Up @@ -1140,6 +1140,48 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
.updateBlacklistForFailedTask(anyString(), anyString(), anyInt())
}

test("update blacklist before adding pending task to avoid race condition") {
// When a task fails, it should apply the blacklist policy prior to
// retrying the task otherwise there's a race condition where run on
// the same executor that it was intended to be black listed from.
val conf = new SparkConf().
set(config.BLACKLIST_ENABLED, true)

// Create a task with two executors.
sc = new SparkContext("local", "test", conf)
val exec = "executor1"
val host = "host1"
val exec2 = "executor2"
val host2 = "host2"
sched = new FakeTaskScheduler(sc, (exec, host), (exec2, host2))
val taskSet = FakeTask.createTaskSet(1)

val clock = new ManualClock
val mockListenerBus = mock(classOf[LiveListenerBus])
val blacklistTracker = new BlacklistTracker(mockListenerBus, conf, None, clock)
val taskSetManager = new TaskSetManager(sched, taskSet, 1, Some(blacklistTracker))
val taskSetManagerSpy = spy(taskSetManager)

val taskDesc = taskSetManagerSpy.resourceOffer(exec, host, TaskLocality.ANY)

// Assert the task has been black listed on the executor it was last executed on.
when(taskSetManagerSpy.addPendingTask(anyInt())).thenAnswer(
new Answer[Unit] {
override def answer(invocationOnMock: InvocationOnMock): Unit = {
val task = invocationOnMock.getArgumentAt(0, classOf[Int])
assert(taskSetManager.taskSetBlacklistHelperOpt.get.
isExecutorBlacklistedForTask(exec, task))
}
}
)

// Simulate a fake exception
val e = new ExceptionFailure("a", "b", Array(), "c", None)
taskSetManagerSpy.handleFailedTask(taskDesc.get.taskId, TaskState.FAILED, e)

verify(taskSetManagerSpy, times(1)).addPendingTask(anyInt())
}

private def createTaskResult(
id: Int,
accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {
Expand Down

0 comments on commit 512b196

Please sign in to comment.