Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shuffle biased task scheduling #447

Merged
merged 6 commits into from
Nov 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -656,8 +656,8 @@ private[spark] class MapOutputTrackerMaster(
def getExecutorShuffleStatus: scala.collection.Map[String, ExecutorShuffleStatus] = {
shuffleStatuses.values
.flatMap(status => status.executorsWithOutputs().map(_ -> status.isActive))
.groupBy(_._1)
.mapValues(_.exists(_._2))
.groupBy(_._1) // group by executor ID
.mapValues(_.exists(_._2)) // true if any are Active
.mapValues(if (_) ExecutorShuffleStatus.Active else ExecutorShuffleStatus.Inactive)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ private[spark] class TaskSchedulerImpl(
private val speculationScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")

// whether to prefer assigning tasks to executors that contain shuffle files
val shuffleBiasedTaskSchedulingEnabled =
conf.getBoolean("spark.scheduler.shuffleBiasedTaskScheduling.enabled", false)

// Threshold above which we warn user initial TaskSet may be starved
val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s")

Expand Down Expand Up @@ -377,11 +381,7 @@ private[spark] class TaskSchedulerImpl(
}
}.getOrElse(offers)

val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum
var tasks: Seq[Seq[TaskDescription]] = Nil
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
Expand All @@ -391,11 +391,36 @@ private[spark] class TaskSchedulerImpl(
}
}

// If shuffle-biased task scheduling is enabled, then first assign as many tasks as possible to
// executors containing active shuffle files, followed by assigning to executors with inactive
// shuffle files, and then finally to those without shuffle files. This bin packing allows for
// more efficient dynamic allocation in the absence of an external shuffle service.
val partitionedAndShuffledOffers = partitionAndShuffleOffers(filteredOffers)
for (shuffledOffers <- partitionedAndShuffledOffers.map(_._2)) {
tasks ++= doResourceOffers(shuffledOffers, sortedTaskSets)
}

// TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get
// launched within a configured time.
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}

private def doResourceOffers(
shuffledOffers: IndexedSeq[WorkerOffer],
sortedTaskSets: IndexedSeq[TaskSetManager]): Seq[Seq[TaskDescription]] = {
// Build a list of tasks to assign to each worker.
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum

// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
for (taskSet <- sortedTaskSets) {
// Skip the barrier taskSet if the available slots are less than the number of pending tasks.
// Skip the barrier taskSet if the available slots are less than the number of pending tasks
if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
// Skip the launch process.
// TODO SPARK-24819 If the job requires more slots than available (both busy and free
Expand Down Expand Up @@ -439,25 +464,40 @@ private[spark] class TaskSchedulerImpl(
.mkString(",")
addressesWithDescs.foreach(_._2.properties.setProperty("addresses", addressesStr))

logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for barrier " +
s"stage ${taskSet.stageId}.")
logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for " +
s"barrier stage ${taskSet.stageId}.")
}
}
}
tasks
}

// TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get
// launched within a configured time.
if (tasks.size > 0) {
hasLaunchedTask = true
/**
* Shuffle offers around to avoid always placing tasks on the same workers.
* If shuffle-biased task scheduling is enabled, this function partitions the offers based on
* whether they have active/inactive/no shuffle files present.
*/
def partitionAndShuffleOffers(offers: IndexedSeq[WorkerOffer])
: IndexedSeq[(ExecutorShuffleStatus.Value, IndexedSeq[WorkerOffer])] = {
if (shuffleBiasedTaskSchedulingEnabled && offers.length > 1) {
// bias towards executors that have active shuffle outputs
val execShuffles = mapOutputTracker.getExecutorShuffleStatus
offers
.groupBy(offer => execShuffles.getOrElse(offer.executorId, ExecutorShuffleStatus.Unknown))
.mapValues(doShuffleOffers)
.toStream
.sortBy(_._1) // order: Active, Inactive, Unknown
.toIndexedSeq
} else {
IndexedSeq((ExecutorShuffleStatus.Unknown, doShuffleOffers(offers)))
}
return tasks
}

/**
* Shuffle offers around to avoid always placing tasks on the same workers. Exposed to allow
* overriding in tests, so it can be deterministic.
* Does the shuffling for [[partitionAndShuffleOffers()]]. Exposed to allow overriding in tests,
* so that it can be deterministic.
*/
protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = {
protected def doShuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = {
Random.shuffle(offers)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import org.scalatest.BeforeAndAfterEach
import org.scalatest.mockito.MockitoSugar

import org.apache.spark._
import org.apache.spark.ExecutorShuffleStatus._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.ManualClock

class FakeSchedulerBackend extends SchedulerBackend {
Expand Down Expand Up @@ -836,7 +838,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// We customize the task scheduler just to let us control the way offers are shuffled, so we
// can be sure we try both permutations, and to control the clock on the tasksetmanager.
val taskScheduler = new TaskSchedulerImpl(sc) {
override def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = {
override def doShuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = {
// Don't shuffle the offers around for this test. Instead, we'll just pass in all
// the permutations we care about directly.
offers
Expand Down Expand Up @@ -873,6 +875,40 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
}
}

test("Shuffle-biased task scheduling enabled should lead to non-random offer shuffling") {
setupScheduler("spark.scheduler.shuffleBiasedTaskScheduling.enabled" -> "true")

// Make offers in different executors, so they can be a mix of active, inactive, unknown
val offers = IndexedSeq(
WorkerOffer("exec1", "host1", 2), // inactive
WorkerOffer("exec2", "host2", 2), // active
WorkerOffer("exec3", "host3", 2) // unknown
)
val makeMapStatus = (offer: WorkerOffer) =>
MapStatus(BlockManagerId(offer.executorId, offer.host, 1), Array(10))
val mapOutputTracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
mapOutputTracker.registerShuffle(0, 2)
mapOutputTracker.registerShuffle(1, 1)
mapOutputTracker.registerMapOutput(0, 0, makeMapStatus(offers(0)))
mapOutputTracker.registerMapOutput(0, 1, makeMapStatus(offers(1)))
mapOutputTracker.registerMapOutput(1, 0, makeMapStatus(offers(1)))
mapOutputTracker.markShuffleInactive(0)

val execStatus = mapOutputTracker.getExecutorShuffleStatus
assert(execStatus.equals(Map("exec1" -> Inactive, "exec2" -> Active)))

assert(taskScheduler.partitionAndShuffleOffers(offers).map(_._1)
.equals(IndexedSeq(Active, Inactive, Unknown)))
assert(taskScheduler.partitionAndShuffleOffers(offers).flatMap(_._2).map(offers.indexOf(_))
.equals(IndexedSeq(1, 0, 2)))

taskScheduler.submitTasks(FakeTask.createTaskSet(3, stageId = 1, stageAttemptId = 0))
// should go to active first, then inactive
val taskDescs = taskScheduler.resourceOffers(offers).flatten
assert(taskDescs.size === 3)
assert(taskDescs.map(_.executorId).equals(Seq("exec2", "exec2", "exec1")))
}

test("With delay scheduling off, tasks can be run at any locality level immediately") {
val conf = new SparkConf()
.set("spark.locality.wait", "0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ private[spark] trait DynamicAllocationTestsSuite { k8sSuite: KubernetesSuite =>
.addToArgs("--conf", "spark.dynamicAllocation.enabled=true")
.addToArgs("--conf", "spark.dynamicAllocation.minExecutors=0")
.addToArgs("--conf", "spark.dynamicAllocation.maxExecutors=1")
.addToArgs("--conf", "spark.scheduler.shuffleBiasedTaskScheduling.enabled=true")
.addToArgs("--conf",
s"spark.driver.host=" +
s"${driverService.getMetadata.getName}.${kubernetesTestComponents.namespace}.svc")
Expand Down