Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track active shuffle by stage #446

Merged
merged 5 commits into from
Nov 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,7 @@ private[spark] class ExecutorAllocationManager(
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
// At the end of a job, trigger the callbacks for idle executors again to clean up executors
// which we were keeping around only because they held active shuffle blocks.
logDebug("Checking for idle executors at end of job")
allocationManager.checkForIdleExecutors()
}

Expand Down Expand Up @@ -724,6 +725,11 @@ private[spark] class ExecutorAllocationManager(
if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
}

// Trigger the callbacks for idle executors again to clean up executors
// which we were keeping around only because they held active shuffle blocks.
logDebug("Checking for idle executors at end of stage")
allocationManager.checkForIdleExecutors()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util.function.BiFunction

import scala.annotation.tailrec
import scala.collection.Map
import scala.collection.mutable.{ArrayStack, HashMap, HashSet}
import scala.collection.mutable.{ArrayStack, HashMap, HashSet, Set}
import scala.concurrent.duration._
import scala.language.existentials
import scala.language.postfixOps
Expand Down Expand Up @@ -149,6 +149,13 @@ private[spark] class DAGScheduler(
* the shuffle data will be in the MapOutputTracker).
*/
private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]

/**
* Mapping from shuffle dependency ID to the IDs of the stages which depend on the shuffle data.
* Used to track when shuffle data becomes no longer active.
*/
private[scheduler] val shuffleIdToDependentStages = new HashMap[Int, Set[Int]]

private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]

// Stages we need to run whose parents aren't done
Expand Down Expand Up @@ -396,6 +403,7 @@ private[spark] class DAGScheduler(
stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
updateJobIdStageIdMaps(jobId, stage)
updateShuffleDependenciesMap(stage)

if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
mapOutputTracker.markShuffleActive(shuffleDep.shuffleId)
Expand Down Expand Up @@ -455,6 +463,7 @@ private[spark] class DAGScheduler(
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
updateShuffleDependenciesMap(stage)
stage
}

Expand Down Expand Up @@ -601,6 +610,17 @@ private[spark] class DAGScheduler(
updateJobIdStageIdMapsList(List(stage))
}

/**
* Registers the shuffle dependencies of the given stage.
*/
private def updateShuffleDependenciesMap(stage: Stage): Unit = {
getShuffleDependencies(stage.rdd).foreach { shuffleDep =>
val shuffleId = shuffleDep.shuffleId
logDebug("Tracking that stage " + stage.id + " depends on shuffle " + shuffleId)
shuffleIdToDependentStages.getOrElseUpdate(shuffleId, Set.empty) += stage.id
}
}

/**
* Removes state for job and any stages that are not needed by any other job. Does not
* handle cancelling tasks or notifying the SparkListener about finished jobs/stages/tasks.
Expand Down Expand Up @@ -1857,6 +1877,24 @@ private[spark] class DAGScheduler(
case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
case _ => "Unknown"
}

getShuffleDependencies(stage.rdd).foreach { shuffleDep =>
val shuffleId = shuffleDep.shuffleId
if (!shuffleIdToDependentStages.contains(shuffleId)) {
logDebug("Stage finished with untracked shuffle dependency " + shuffleId)
} else {
var dependentStages = shuffleIdToDependentStages(shuffleId)
dependentStages -= stage.id;
logDebug("Stage " + stage.id + " finished. " +
"Shuffle " + shuffleId + " now has dependencies " + dependentStages)
if (dependentStages.isEmpty) {
logDebug("Shuffle " + shuffleId + " is no longer needed. Marking it inactive.")
shuffleIdToDependentStages.remove(shuffleId)
mapOutputTracker.markShuffleInactive(shuffleId)
}
}
}

if (errorMessage.isEmpty) {
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
stage.latestInfo.completionTime = Some(clock.getTimeMillis())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.rdd.{DeterministicLevel, RDD}
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException}
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils}
import org.apache.spark.util._

class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
extends DAGSchedulerEventProcessLoop(dagScheduler) {
Expand Down Expand Up @@ -2172,6 +2172,102 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assertDataStructuresEmpty()
}

test("stage level active shuffle tracking") {
// We will have 3 stages depending on each other.
// The second stage is composed of 2 RDDs to check we're tracking shuffle up the chain.
val shuffleMapRdd1 = new MyRDD(sc, 2, Nil)
val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(1))
val shuffleId1 = shuffleDep1.shuffleId
val shuffleMapRdd2 = new MyRDD(sc, 2, List(shuffleDep1), tracker = mapOutputTracker)
val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(1))
val shuffleId2 = shuffleDep2.shuffleId
val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep2), tracker = mapOutputTracker)
val intermediateDep = new OneToOneDependency(intermediateRdd)
val reduceRdd = new MyRDD(sc, 1, List(intermediateDep), tracker = mapOutputTracker)

// Submit the job.
// Both shuffles should become active.
submit(reduceRdd, Array(0))
assert(mapOutputTracker.shuffleStatuses(shuffleId1).isActive === true)
assert(mapOutputTracker.shuffleStatuses(shuffleId2).isActive === true)

// Complete the first stage.
// Both shuffles remain active.
completeShuffleMapStageSuccessfully(0, 0, 2)
assert(mapOutputTracker.shuffleStatuses(shuffleId1).isActive === true)
assert(mapOutputTracker.shuffleStatuses(shuffleId2).isActive === true)

// Complete the second stage.
// Shuffle 1 is no longer needed and should become inactive.
completeShuffleMapStageSuccessfully(1, 0, 1)
assert(mapOutputTracker.shuffleStatuses(shuffleId1).isActive === false)
assert(mapOutputTracker.shuffleStatuses(shuffleId2).isActive === true)

// Complete the results stage.
// Both shuffles are no longer needed and should become inactive.
completeNextResultStageWithSuccess(2, 0)
assert(mapOutputTracker.shuffleStatuses(shuffleId1).isActive === false)
assert(mapOutputTracker.shuffleStatuses(shuffleId2).isActive === false)

// Double check results.
assert(results === Map(0 -> 42))
results.clear()
assertDataStructuresEmpty()
}

test("stage level active shuffle tracking with multiple dependents") {
// We will have a diamond shape dependency.
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
val shuffleId = shuffleDep.shuffleId
val intermediateRdd1 = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
val intermediateRdd2 = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
val intermediateDep1 = new ShuffleDependency(intermediateRdd1, new HashPartitioner(1))
val intermediateDep2 = new ShuffleDependency(intermediateRdd2, new HashPartitioner(1))
val reduceRdd =
new MyRDD(sc, 1, List(intermediateDep1, intermediateDep2), tracker = mapOutputTracker)

// Submit the job.
// Shuffle becomes active.
submit(reduceRdd, Array(0))
assert(mapOutputTracker.shuffleStatuses(shuffleId).isActive === true)

// Complete the shuffle stage.
// Shuffle remains active.
completeShuffleMapStageSuccessfully(0, 0, 2)
assert(mapOutputTracker.shuffleStatuses(shuffleId).isActive === true)

// Complete first intermediate stage.
// Shuffle is still active.
val stageAttempt = taskSets(1)
checkStageId(1, 0, stageAttempt)
complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map {
case (task, idx) =>
(Success, makeMapStatus("host" + ('A' + idx).toChar, 1))
}.toSeq)
assert(mapOutputTracker.shuffleStatuses(shuffleId).isActive === true)

// Complete second intermediate stage.
// Shuffle is no longer active.
val stageAttempt2 = taskSets(2)
checkStageId(2, 0, stageAttempt2)
complete(stageAttempt2, stageAttempt2.tasks.zipWithIndex.map {
case (task, idx) =>
(Success, makeMapStatus("host" + ('A' + idx).toChar, 1))
}.toSeq)
assert(mapOutputTracker.shuffleStatuses(shuffleId).isActive === false)

// Complete the results stage.
// Shuffle is still inactive.
completeNextResultStageWithSuccess(3, 0)
assert(mapOutputTracker.shuffleStatuses(shuffleId).isActive === false)

// Double check results.
assert(results === Map(0 -> 42))
results.clear()
assertDataStructuresEmpty()
}

test("map stage submission with fetch failure") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
Expand Down