Skip to content

Commit

Permalink
[SPARK-23433][CORE] Late zombie task completions update all tasksets
Browse files Browse the repository at this point in the history
Fetch failure lead to multiple tasksets which are active for a given
stage.  While there is only one "active" version of the taskset, the
earlier attempts can still have running tasks, which can complete
successfully.  So a task completion needs to update every taskset
so that it knows the partition is completed.  That way the final active
taskset does not try to submit another task for the same partition,
and so that it knows when it is completed and when it should be
marked as a "zombie".

Added a regression test.

Author: Imran Rashid <[email protected]>

Closes apache#21131 from squito/SPARK-23433.
  • Loading branch information
squito committed May 3, 2018
1 parent 96a5001 commit 94641fe
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,20 @@ private[spark] class TaskSchedulerImpl(
}
}

/**
* Marks the task has completed in all TaskSetManagers for the given stage.
*
* After stage failure and retry, there may be multiple TaskSetManagers for the stage.
* If an earlier attempt of a stage completes a task, we should ensure that the later attempts
* do not also submit those same tasks. That also means that a task completion from an earlier
* attempt can lead to the entire stage getting marked as successful.
*/
private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, partitionId: Int) = {
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
tsm.markPartitionCompleted(partitionId)
}
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ private[spark] class TaskSetManager(
val ser = env.closureSerializer.newInstance()

val tasks = taskSet.tasks
private[scheduler] val partitionToIndex = tasks.zipWithIndex
.map { case (t, idx) => t.partitionId -> idx }.toMap
val numTasks = tasks.length
val copiesRunning = new Array[Int](numTasks)

Expand Down Expand Up @@ -153,7 +155,7 @@ private[spark] class TaskSetManager(
private[scheduler] val speculatableTasks = new HashSet[Int]

// Task index, start and finish time for each task attempt (indexed by task ID)
private val taskInfos = new HashMap[Long, TaskInfo]
private[scheduler] val taskInfos = new HashMap[Long, TaskInfo]

// Use a MedianHeap to record durations of successful tasks so we know when to launch
// speculative tasks. This is only used when speculation is enabled, to avoid the overhead
Expand Down Expand Up @@ -754,6 +756,9 @@ private[spark] class TaskSetManager(
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
" because task " + index + " has already completed successfully")
}
// There may be multiple tasksets for this stage -- we let all of them know that the partition
// was completed. This may result in some of the tasksets getting completed.
sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId)
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
Expand All @@ -764,6 +769,19 @@ private[spark] class TaskSetManager(
maybeFinishTaskSet()
}

private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
partitionToIndex.get(partitionId).foreach { index =>
if (!successful(index)) {
tasksSuccessful += 1
successful(index) = true
if (tasksSuccessful == numTasks) {
isZombie = true
}
maybeFinishTaskSet()
}
}
}

/**
* Marks the task as failed, re-adds it to the list of pending tasks, and notifies the
* DAG Scheduler.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -917,4 +917,108 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
taskScheduler.initialize(new FakeSchedulerBackend)
}
}

test("Completions in zombie tasksets update status of non-zombie taskset") {
val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
val valueSer = SparkEnv.get.serializer.newInstance()

def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = {
val indexInTsm = tsm.partitionToIndex(partition)
val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
}

// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
// two times, so we have three active task sets for one stage. (For this to really happen,
// you'd need the previous stage to also get restarted, and then succeed, in between each
// attempt, but that happens outside what we're mocking here.)
val zombieAttempts = (0 until 2).map { stageAttempt =>
val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
taskScheduler.submitTasks(attempt)
val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
taskScheduler.resourceOffers(offers)
assert(tsm.runningTasks === 10)
// fail attempt
tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED,
FetchFailed(null, 0, 0, 0, "fetch failed"))
// the attempt is a zombie, but the tasks are still running (this could be true even if
// we actively killed those tasks, as killing is best-effort)
assert(tsm.isZombie)
assert(tsm.runningTasks === 9)
tsm
}

// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for
// the stage, but this time with insufficient resources so not all tasks are active.

val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
taskScheduler.submitTasks(finalAttempt)
val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
finalAttempt.tasks(task.index).partitionId
}.toSet
assert(finalTsm.runningTasks === 5)
assert(!finalTsm.isZombie)

// We simulate late completions from our zombie tasksets, corresponding to all the pending
// partitions in our final attempt. This means we're only waiting on the tasks we've already
// launched.
val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions)
finalAttemptPendingPartitions.foreach { partition =>
completeTaskSuccessfully(zombieAttempts(0), partition)
}

// If there is another resource offer, we shouldn't run anything. Though our final attempt
// used to have pending tasks, now those tasks have been completed by zombie attempts. The
// remaining tasks to compute are already active in the non-zombie attempt.
assert(
taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)

val remainingTasks = finalAttemptLaunchedPartitions.toIndexedSeq.sorted

// finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be
// marked as zombie.
// for each of the remaining tasks, find the tasksets with an active copy of the task, and
// finish the task.
remainingTasks.foreach { partition =>
val tsm = if (partition == 0) {
// we failed this task on both zombie attempts, this one is only present in the latest
// taskset
finalTsm
} else {
// should be active in every taskset. We choose a zombie taskset just to make sure that
// we transition the active taskset correctly even if the final completion comes
// from a zombie.
zombieAttempts(partition % 2)
}
completeTaskSuccessfully(tsm, partition)
}

assert(finalTsm.isZombie)

// no taskset has completed all of its tasks, so no updates to the blacklist tracker yet
verify(blacklist, never).updateBlacklistForSuccessfulTaskSet(anyInt(), anyInt(), anyObject())

// finally, lets complete all the tasks. We simulate failures in attempt 1, but everything
// else succeeds, to make sure we get the right updates to the blacklist in all cases.
(zombieAttempts ++ Seq(finalTsm)).foreach { tsm =>
val stageAttempt = tsm.taskSet.stageAttemptId
tsm.runningTasksSet.foreach { index =>
if (stageAttempt == 1) {
tsm.handleFailedTask(tsm.taskInfos(index).taskId, TaskState.FAILED, TaskResultLost)
} else {
val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
tsm.handleSuccessfulTask(tsm.taskInfos(index).taskId, result)
}
}

// we update the blacklist for the stage attempts with all successful tasks. Even though
// some tasksets had failures, we still consider them all successful from a blacklisting
// perspective, as the failures weren't from a problem w/ the tasks themselves.
verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), anyObject())
}
}
}

0 comments on commit 94641fe

Please sign in to comment.