Skip to content

Commit

Permalink
Track active shuffles by stage (#446)
Browse files Browse the repository at this point in the history
  • Loading branch information
rynorris authored and Will Manning committed Mar 13, 2019
1 parent 8473d16 commit 3e8b1f4
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,7 @@ private[spark] class ExecutorAllocationManager(
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
// At the end of a job, trigger the callbacks for idle executors again to clean up executors
// which we were keeping around only because they held active shuffle blocks.
logDebug("Checking for idle executors at end of job")
allocationManager.checkForIdleExecutors()
}

Expand Down Expand Up @@ -733,6 +734,11 @@ private[spark] class ExecutorAllocationManager(
if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
}

// Trigger the callbacks for idle executors again to clean up executors
// which we were keeping around only because they held active shuffle blocks.
logDebug("Checking for idle executors at end of stage")
allocationManager.checkForIdleExecutors()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util.function.BiFunction

import scala.annotation.tailrec
import scala.collection.Map
import scala.collection.mutable.{ArrayStack, HashMap, HashSet}
import scala.collection.mutable.{ArrayStack, HashMap, HashSet, Set}
import scala.concurrent.duration._
import scala.language.existentials
import scala.language.postfixOps
Expand Down Expand Up @@ -150,6 +150,13 @@ private[spark] class DAGScheduler(
* the shuffle data will be in the MapOutputTracker).
*/
private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]

/**
* Mapping from shuffle dependency ID to the IDs of the stages which depend on the shuffle data.
* Used to track when shuffle data becomes no longer active.
*/
private[scheduler] val shuffleIdToDependentStages = new HashMap[Int, Set[Int]]

private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]

// Stages we need to run whose parents aren't done
Expand Down Expand Up @@ -397,6 +404,7 @@ private[spark] class DAGScheduler(
stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
updateJobIdStageIdMaps(jobId, stage)
updateShuffleDependenciesMap(stage)

if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
mapOutputTracker.markShuffleActive(shuffleDep.shuffleId)
Expand Down Expand Up @@ -456,6 +464,7 @@ private[spark] class DAGScheduler(
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
updateShuffleDependenciesMap(stage)
stage
}

Expand Down Expand Up @@ -602,6 +611,17 @@ private[spark] class DAGScheduler(
updateJobIdStageIdMapsList(List(stage))
}

/**
* Registers the shuffle dependencies of the given stage.
*/
private def updateShuffleDependenciesMap(stage: Stage): Unit = {
getShuffleDependencies(stage.rdd).foreach { shuffleDep =>
val shuffleId = shuffleDep.shuffleId
logDebug("Tracking that stage " + stage.id + " depends on shuffle " + shuffleId)
shuffleIdToDependentStages.getOrElseUpdate(shuffleId, Set.empty) += stage.id
}
}

/**
* Removes state for job and any stages that are not needed by any other job. Does not
* handle cancelling tasks or notifying the SparkListener about finished jobs/stages/tasks.
Expand Down Expand Up @@ -1905,6 +1925,24 @@ private[spark] class DAGScheduler(
case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
case _ => "Unknown"
}

getShuffleDependencies(stage.rdd).foreach { shuffleDep =>
val shuffleId = shuffleDep.shuffleId
if (!shuffleIdToDependentStages.contains(shuffleId)) {
logDebug("Stage finished with untracked shuffle dependency " + shuffleId)
} else {
var dependentStages = shuffleIdToDependentStages(shuffleId)
dependentStages -= stage.id;
logDebug("Stage " + stage.id + " finished. " +
"Shuffle " + shuffleId + " now has dependencies " + dependentStages)
if (dependentStages.isEmpty) {
logDebug("Shuffle " + shuffleId + " is no longer needed. Marking it inactive.")
shuffleIdToDependentStages.remove(shuffleId)
mapOutputTracker.markShuffleInactive(shuffleId)
}
}
}

if (errorMessage.isEmpty) {
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
stage.latestInfo.completionTime = Some(clock.getTimeMillis())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.rdd.{DeterministicLevel, RDD}
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException}
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils}
import org.apache.spark.util._

class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
extends DAGSchedulerEventProcessLoop(dagScheduler) {
Expand Down Expand Up @@ -2195,6 +2195,102 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assertDataStructuresEmpty()
}

test("stage level active shuffle tracking") {
// We will have 3 stages depending on each other.
// The second stage is composed of 2 RDDs to check we're tracking shuffle up the chain.
val shuffleMapRdd1 = new MyRDD(sc, 2, Nil)
val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(1))
val shuffleId1 = shuffleDep1.shuffleId
val shuffleMapRdd2 = new MyRDD(sc, 2, List(shuffleDep1), tracker = mapOutputTracker)
val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(1))
val shuffleId2 = shuffleDep2.shuffleId
val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep2), tracker = mapOutputTracker)
val intermediateDep = new OneToOneDependency(intermediateRdd)
val reduceRdd = new MyRDD(sc, 1, List(intermediateDep), tracker = mapOutputTracker)

// Submit the job.
// Both shuffles should become active.
submit(reduceRdd, Array(0))
assert(mapOutputTracker.shuffleStatuses(shuffleId1).isActive === true)
assert(mapOutputTracker.shuffleStatuses(shuffleId2).isActive === true)

// Complete the first stage.
// Both shuffles remain active.
completeShuffleMapStageSuccessfully(0, 0, 2)
assert(mapOutputTracker.shuffleStatuses(shuffleId1).isActive === true)
assert(mapOutputTracker.shuffleStatuses(shuffleId2).isActive === true)

// Complete the second stage.
// Shuffle 1 is no longer needed and should become inactive.
completeShuffleMapStageSuccessfully(1, 0, 1)
assert(mapOutputTracker.shuffleStatuses(shuffleId1).isActive === false)
assert(mapOutputTracker.shuffleStatuses(shuffleId2).isActive === true)

// Complete the results stage.
// Both shuffles are no longer needed and should become inactive.
completeNextResultStageWithSuccess(2, 0)
assert(mapOutputTracker.shuffleStatuses(shuffleId1).isActive === false)
assert(mapOutputTracker.shuffleStatuses(shuffleId2).isActive === false)

// Double check results.
assert(results === Map(0 -> 42))
results.clear()
assertDataStructuresEmpty()
}

test("stage level active shuffle tracking with multiple dependents") {
// We will have a diamond shape dependency.
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
val shuffleId = shuffleDep.shuffleId
val intermediateRdd1 = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
val intermediateRdd2 = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
val intermediateDep1 = new ShuffleDependency(intermediateRdd1, new HashPartitioner(1))
val intermediateDep2 = new ShuffleDependency(intermediateRdd2, new HashPartitioner(1))
val reduceRdd =
new MyRDD(sc, 1, List(intermediateDep1, intermediateDep2), tracker = mapOutputTracker)

// Submit the job.
// Shuffle becomes active.
submit(reduceRdd, Array(0))
assert(mapOutputTracker.shuffleStatuses(shuffleId).isActive === true)

// Complete the shuffle stage.
// Shuffle remains active.
completeShuffleMapStageSuccessfully(0, 0, 2)
assert(mapOutputTracker.shuffleStatuses(shuffleId).isActive === true)

// Complete first intermediate stage.
// Shuffle is still active.
val stageAttempt = taskSets(1)
checkStageId(1, 0, stageAttempt)
complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map {
case (task, idx) =>
(Success, makeMapStatus("host" + ('A' + idx).toChar, 1))
}.toSeq)
assert(mapOutputTracker.shuffleStatuses(shuffleId).isActive === true)

// Complete second intermediate stage.
// Shuffle is no longer active.
val stageAttempt2 = taskSets(2)
checkStageId(2, 0, stageAttempt2)
complete(stageAttempt2, stageAttempt2.tasks.zipWithIndex.map {
case (task, idx) =>
(Success, makeMapStatus("host" + ('A' + idx).toChar, 1))
}.toSeq)
assert(mapOutputTracker.shuffleStatuses(shuffleId).isActive === false)

// Complete the results stage.
// Shuffle is still inactive.
completeNextResultStageWithSuccess(3, 0)
assert(mapOutputTracker.shuffleStatuses(shuffleId).isActive === false)

// Double check results.
assert(results === Map(0 -> 42))
results.clear()
assertDataStructuresEmpty()
}

test("map stage submission with fetch failure") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
Expand Down

0 comments on commit 3e8b1f4

Please sign in to comment.