diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 44744a6d4e596..9a6e1b8bcc579 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1049,27 +1049,25 @@ class DAGScheduler( val mapStage = shuffleToMapStage(shuffleId) // It is likely that we receive multiple FetchFailed for a single stage (because we have // multiple tasks running concurrently on different executors). In that case, it is possible - // the fetch failure has already been handled by the executor. + // the fetch failure has already been handled by the scheduler. if (runningStages.contains(failedStage)) { markStageAsFinished(failedStage, Some("Fetch failure")) runningStages -= failedStage // TODO: Cancel running tasks in the stage - logInfo("Marking " + failedStage + " (" + failedStage.name + - ") for resubmision due to a fetch failure") - - logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name + - "); marking it for resubmission") - if (failedStages.isEmpty && eventProcessActor != null) { - // Don't schedule an event to resubmit failed stages if failed isn't empty, because - // in that case the event will already have been scheduled. eventProcessActor may be - // null during unit tests. - import env.actorSystem.dispatcher - env.actorSystem.scheduler.scheduleOnce( - RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages) - } - failedStages += failedStage - failedStages += mapStage + logInfo(s"Marking $failedStage (${failedStage.name}) for resubmision " + + s"due to a fetch failure from $mapStage (${mapStage.name}") + } + + if (failedStages.isEmpty && eventProcessActor != null) { + // Don't schedule an event to resubmit failed stages if failed isn't empty, because + // in that case the event will already have been scheduled. eventProcessActor may be + // null during unit tests. + import env.actorSystem.dispatcher + env.actorSystem.scheduler.scheduleOnce( + RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages) } + failedStages += failedStage + failedStages += mapStage // Mark the map whose fetch failed as broken in the map stage if (mapId != -1) {