Skip to content

Commit

Permalink
Kay's feedback.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Aug 27, 2014
1 parent 3f01847 commit 49282b3
Showing 1 changed file with 14 additions and 16 deletions.
30 changes: 14 additions & 16 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1049,27 +1049,25 @@ class DAGScheduler(
val mapStage = shuffleToMapStage(shuffleId)
// It is likely that we receive multiple FetchFailed for a single stage (because we have
// multiple tasks running concurrently on different executors). In that case, it is possible
// the fetch failure has already been handled by the executor.
// the fetch failure has already been handled by the scheduler.
if (runningStages.contains(failedStage)) {
markStageAsFinished(failedStage, Some("Fetch failure"))
runningStages -= failedStage
// TODO: Cancel running tasks in the stage
logInfo("Marking " + failedStage + " (" + failedStage.name +
") for resubmision due to a fetch failure")

logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name +
"); marking it for resubmission")
if (failedStages.isEmpty && eventProcessActor != null) {
// Don't schedule an event to resubmit failed stages if failed isn't empty, because
// in that case the event will already have been scheduled. eventProcessActor may be
// null during unit tests.
import env.actorSystem.dispatcher
env.actorSystem.scheduler.scheduleOnce(
RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
}
failedStages += failedStage
failedStages += mapStage
logInfo(s"Marking $failedStage (${failedStage.name}) for resubmision " +
s"due to a fetch failure from $mapStage (${mapStage.name}")
}

if (failedStages.isEmpty && eventProcessActor != null) {
// Don't schedule an event to resubmit failed stages if failed isn't empty, because
// in that case the event will already have been scheduled. eventProcessActor may be
// null during unit tests.
import env.actorSystem.dispatcher
env.actorSystem.scheduler.scheduleOnce(
RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
}
failedStages += failedStage
failedStages += mapStage

// Mark the map whose fetch failed as broken in the map stage
if (mapId != -1) {
Expand Down

0 comments on commit 49282b3

Please sign in to comment.