From 61ed95216db49f9d86ed4a5a05a9d91a43d5c9d1 Mon Sep 17 00:00:00 2001 From: Ryan Norris Date: Tue, 27 Nov 2018 22:14:18 +0800 Subject: [PATCH 1/6] Re-introduce shuffle bias task scheduling code --- .../spark/scheduler/TaskSchedulerImpl.scala | 149 +++++++++++------- .../scala/org/apache/spark/util/Utils.scala | 4 + .../scheduler/TaskSchedulerImplSuite.scala | 38 ++++- .../DynamicAllocationTestsSuite.scala | 1 + 4 files changed, 131 insertions(+), 61 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 4f870e85ad38d..2498a7f12c5ae 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -81,6 +81,9 @@ private[spark] class TaskSchedulerImpl( private val speculationScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation") + // whether to prefer assigning tasks to executors that contain shuffle files + val SHUFFLE_BIASED_SCHEDULING_ENABLED = Utils.isShuffleBiasedTaskSchedulingEnabled(conf) + // Threshold above which we warn user initial TaskSet may be starved val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s") @@ -377,11 +380,8 @@ private[spark] class TaskSchedulerImpl( } }.getOrElse(offers) - val shuffledOffers = shuffleOffers(filteredOffers) - // Build a list of tasks to assign to each worker. - val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)) - val availableCpus = shuffledOffers.map(o => o.cores).toArray - val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum + val partitionedAndShuffledOffers = partitionShuffleOffers(filteredOffers) + var allTasks: Seq[Seq[TaskDescription]] = Nil val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( @@ -391,73 +391,102 @@ private[spark] class TaskSchedulerImpl( } } - // Take each TaskSet in our scheduling order, and then offer it each node in increasing order - // of locality levels so that it gets a chance to launch local tasks on all of them. - // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY - for (taskSet <- sortedTaskSets) { - // Skip the barrier taskSet if the available slots are less than the number of pending tasks. - if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { - // Skip the launch process. - // TODO SPARK-24819 If the job requires more slots than available (both busy and free - // slots), fail the job on submit. - logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + - s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + - s"number of available slots is $availableSlots.") - } else { - var launchedAnyTask = false - // Record all the executor IDs assigned barrier tasks on. - val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]() - for (currentMaxLocality <- taskSet.myLocalityLevels) { - var launchedTaskAtCurrentMaxLocality = false - do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, - currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality - } while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) - } - if (launchedAnyTask && taskSet.isBarrier) { - // Check whether the barrier tasks are partially launched. - // TODO SPARK-24818 handle the assert failure case (that can happen when some locality - // requirements are not fulfilled, and we should revert the launched tasks). - require(addressesWithDescs.size == taskSet.numTasks, - s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + - s"because only ${addressesWithDescs.size} out of a total number of " + - s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " + - "been blacklisted or cannot fulfill task locality requirements.") - - // materialize the barrier coordinator. - maybeInitBarrierCoordinator() - - // Update the taskInfos into all the barrier task properties. - val addressesStr = addressesWithDescs - // Addresses ordered by partitionId - .sortBy(_._2.partitionId) - .map(_._1) - .mkString(",") - addressesWithDescs.foreach(_._2.properties.setProperty("addresses", addressesStr)) - - logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for barrier " + - s"stage ${taskSet.stageId}.") + for (shuffledOffers <- partitionedAndShuffledOffers.map(_._2)) { + // Build a list of tasks to assign to each worker. + val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)) + val availableCpus = shuffledOffers.map(o => o.cores).toArray + val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum + + // Take each TaskSet in our scheduling order, and then offer it each node in increasing order + // of locality levels so that it gets a chance to launch local tasks on all of them. + // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY + for (taskSet <- sortedTaskSets) { + // Skip the barrier taskSet if the available slots are less than the number of pending tasks + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { + // Skip the launch process. + // TODO SPARK-24819 If the job requires more slots than available (both busy and free + // slots), fail the job on submit. + logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + + s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + + s"number of available slots is $availableSlots.") + } else { + var launchedAnyTask = false + // Record all the executor IDs assigned barrier tasks on. + val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]() + for (currentMaxLocality <- taskSet.myLocalityLevels) { + var launchedTaskAtCurrentMaxLocality = false + do { + launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, + currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs) + launchedAnyTask |= launchedTaskAtCurrentMaxLocality + } while (launchedTaskAtCurrentMaxLocality) + } + if (!launchedAnyTask) { + taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + } + if (launchedAnyTask && taskSet.isBarrier) { + // Check whether the barrier tasks are partially launched. + // TODO SPARK-24818 handle the assert failure case (that can happen when some locality + // requirements are not fulfilled, and we should revert the launched tasks). + require(addressesWithDescs.size == taskSet.numTasks, + s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + + s"because only ${addressesWithDescs.size} out of a total number of " + + s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " + + "been blacklisted or cannot fulfill task locality requirements.") + + // materialize the barrier coordinator. + maybeInitBarrierCoordinator() + + // Update the taskInfos into all the barrier task properties. + val addressesStr = addressesWithDescs + // Addresses ordered by partitionId + .sortBy(_._2.partitionId) + .map(_._1) + .mkString(",") + addressesWithDescs.foreach(_._2.properties.setProperty("addresses", addressesStr)) + + logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for " + + s"barrier stage ${taskSet.stageId}.") + } } } + allTasks ++= tasks } // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get // launched within a configured time. - if (tasks.size > 0) { + if (allTasks.size > 0) { hasLaunchedTask = true } - return tasks + return allTasks + } + + /** + * Shuffle offers around to avoid always placing tasks on the same workers. + * If shuffle-biased task scheduling is enabled, this function will bias tasks + * towards executors with active shuffles. + */ + def partitionShuffleOffers(offers: IndexedSeq[WorkerOffer]) + : IndexedSeq[(ExecutorShuffleStatus.Value, IndexedSeq[WorkerOffer])] = { + if (SHUFFLE_BIASED_SCHEDULING_ENABLED && offers.length > 1) { + // bias towards executors that have active shuffle outputs + val execShuffles = mapOutputTracker.getExecutorShuffleStatus + offers + .groupBy(offer => execShuffles.getOrElse(offer.executorId, ExecutorShuffleStatus.Unknown)) + .mapValues(doShuffleOffers) + .toStream + .sortBy(_._1) // order: Active, Inactive, Unknown + .toIndexedSeq + } else { + IndexedSeq((ExecutorShuffleStatus.Unknown, doShuffleOffers(offers))) + } } /** - * Shuffle offers around to avoid always placing tasks on the same workers. Exposed to allow - * overriding in tests, so it can be deterministic. + * Does the shuffling for [[partitionShuffleOffers()]]. Exposed to allow overriding in tests, + * so that it can be deterministic. */ - protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = { + protected def doShuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = { Random.shuffle(offers) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 93b5826f8a74b..53e550860c116 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2511,6 +2511,10 @@ private[spark] object Utils extends Logging { (!isLocalMaster(conf) || conf.getBoolean("spark.dynamicAllocation.testing", false)) } + def isShuffleBiasedTaskSchedulingEnabled(conf: SparkConf): Boolean = { + conf.getBoolean("spark.scheduler.shuffleBiasedTaskScheduling.enabled", false) + } + /** * Return the initial number of executors for dynamic allocation. */ diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 9e1d13e369ad9..fb66527e7f645 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -29,6 +29,7 @@ import org.scalatest.mockito.MockitoSugar import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config +import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.ManualClock class FakeSchedulerBackend extends SchedulerBackend { @@ -836,7 +837,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // We customize the task scheduler just to let us control the way offers are shuffled, so we // can be sure we try both permutations, and to control the clock on the tasksetmanager. val taskScheduler = new TaskSchedulerImpl(sc) { - override def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = { + override def doShuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = { // Don't shuffle the offers around for this test. Instead, we'll just pass in all // the permutations we care about directly. offers @@ -873,6 +874,41 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } } + test("Shuffle-biased task scheduling enabled should lead to non-random offer shuffling") { + setupScheduler("spark.scheduler.shuffleBiasedTaskScheduling.enabled" -> "true") + + // Make offers in different executors, so they can be a mix of active, inactive, unknown + val offers = IndexedSeq( + WorkerOffer("exec1", "host1", 2), // inactive + WorkerOffer("exec2", "host2", 2), // active + WorkerOffer("exec3", "host3", 2) // unknown + ) + val makeMapStatus = (offer: WorkerOffer) => + MapStatus(BlockManagerId(offer.executorId, offer.host, 1), Array(10)) + val mapOutputTracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + mapOutputTracker.registerShuffle(0, 2) + mapOutputTracker.registerShuffle(1, 1) + mapOutputTracker.registerMapOutput(0, 0, makeMapStatus(offers(0))) + mapOutputTracker.registerMapOutput(0, 1, makeMapStatus(offers(1))) + mapOutputTracker.registerMapOutput(1, 0, makeMapStatus(offers(1))) + mapOutputTracker.markShuffleInactive(0) + + import ExecutorShuffleStatus._ + val execStatus = mapOutputTracker.getExecutorShuffleStatus + assert(execStatus.equals(Map("exec1" -> Inactive, "exec2" -> Active))) + + assert(taskScheduler.partitionShuffleOffers(offers).map(_._1) + .equals(IndexedSeq(Active, Inactive, Unknown))) + assert(taskScheduler.partitionShuffleOffers(offers).flatMap(_._2).map(offers.indexOf(_)) + .equals(IndexedSeq(1, 0, 2))) + + taskScheduler.submitTasks(FakeTask.createTaskSet(3, stageId = 1, stageAttemptId = 0)) + // should go to active first, then inactive + val taskDescs = taskScheduler.resourceOffers(offers).flatten + assert(taskDescs.size === 3) + assert(taskDescs.map(_.executorId).equals(Seq("exec2", "exec2", "exec1"))) + } + test("With delay scheduling off, tasks can be run at any locality level immediately") { val conf = new SparkConf() .set("spark.locality.wait", "0") diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DynamicAllocationTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DynamicAllocationTestsSuite.scala index 9bcec1fe9ec21..c74b9a803b43b 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DynamicAllocationTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DynamicAllocationTestsSuite.scala @@ -83,6 +83,7 @@ private[spark] trait DynamicAllocationTestsSuite { k8sSuite: KubernetesSuite => .addToArgs("--conf", "spark.dynamicAllocation.enabled=true") .addToArgs("--conf", "spark.dynamicAllocation.minExecutors=0") .addToArgs("--conf", "spark.dynamicAllocation.maxExecutors=1") + .addToArgs("--conf", "spark.scheduler.shuffleBiasedTaskScheduling.enabled=true") .addToArgs("--conf", s"spark.driver.host=" + s"${driverService.getMetadata.getName}.${kubernetesTestComponents.namespace}.svc") From 3c0d3e975655e42ccf873054351723994542b84e Mon Sep 17 00:00:00 2001 From: Will Manning Date: Tue, 27 Nov 2018 14:50:36 +0000 Subject: [PATCH 2/6] pull inner loop into its own function for legibility --- .../spark/scheduler/TaskSchedulerImpl.scala | 124 +++++++++--------- 1 file changed, 65 insertions(+), 59 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 2498a7f12c5ae..54aba77e78f29 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -392,65 +392,7 @@ private[spark] class TaskSchedulerImpl( } for (shuffledOffers <- partitionedAndShuffledOffers.map(_._2)) { - // Build a list of tasks to assign to each worker. - val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)) - val availableCpus = shuffledOffers.map(o => o.cores).toArray - val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum - - // Take each TaskSet in our scheduling order, and then offer it each node in increasing order - // of locality levels so that it gets a chance to launch local tasks on all of them. - // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY - for (taskSet <- sortedTaskSets) { - // Skip the barrier taskSet if the available slots are less than the number of pending tasks - if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { - // Skip the launch process. - // TODO SPARK-24819 If the job requires more slots than available (both busy and free - // slots), fail the job on submit. - logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + - s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + - s"number of available slots is $availableSlots.") - } else { - var launchedAnyTask = false - // Record all the executor IDs assigned barrier tasks on. - val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]() - for (currentMaxLocality <- taskSet.myLocalityLevels) { - var launchedTaskAtCurrentMaxLocality = false - do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, - currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality - } while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) - } - if (launchedAnyTask && taskSet.isBarrier) { - // Check whether the barrier tasks are partially launched. - // TODO SPARK-24818 handle the assert failure case (that can happen when some locality - // requirements are not fulfilled, and we should revert the launched tasks). - require(addressesWithDescs.size == taskSet.numTasks, - s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + - s"because only ${addressesWithDescs.size} out of a total number of " + - s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " + - "been blacklisted or cannot fulfill task locality requirements.") - - // materialize the barrier coordinator. - maybeInitBarrierCoordinator() - - // Update the taskInfos into all the barrier task properties. - val addressesStr = addressesWithDescs - // Addresses ordered by partitionId - .sortBy(_._2.partitionId) - .map(_._1) - .mkString(",") - addressesWithDescs.foreach(_._2.properties.setProperty("addresses", addressesStr)) - - logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for " + - s"barrier stage ${taskSet.stageId}.") - } - } - } - allTasks ++= tasks + allTasks ++= doResourceOffers(shuffledOffers, sortedTaskSets) } // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get @@ -461,6 +403,70 @@ private[spark] class TaskSchedulerImpl( return allTasks } + private def doResourceOffers( + shuffledOffers: IndexedSeq[WorkerOffer], + sortedTaskSets: IndexedSeq[TaskSetManager]): Seq[Seq[TaskDescription]] = { + // Build a list of tasks to assign to each worker. + val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)) + val availableCpus = shuffledOffers.map(o => o.cores).toArray + val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum + + // Take each TaskSet in our scheduling order, and then offer it each node in increasing order + // of locality levels so that it gets a chance to launch local tasks on all of them. + // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY + for (taskSet <- sortedTaskSets) { + // Skip the barrier taskSet if the available slots are less than the number of pending tasks + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { + // Skip the launch process. + // TODO SPARK-24819 If the job requires more slots than available (both busy and free + // slots), fail the job on submit. + logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + + s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + + s"number of available slots is $availableSlots.") + } else { + var launchedAnyTask = false + // Record all the executor IDs assigned barrier tasks on. + val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]() + for (currentMaxLocality <- taskSet.myLocalityLevels) { + var launchedTaskAtCurrentMaxLocality = false + do { + launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, + currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs) + launchedAnyTask |= launchedTaskAtCurrentMaxLocality + } while (launchedTaskAtCurrentMaxLocality) + } + if (!launchedAnyTask) { + taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + } + if (launchedAnyTask && taskSet.isBarrier) { + // Check whether the barrier tasks are partially launched. + // TODO SPARK-24818 handle the assert failure case (that can happen when some locality + // requirements are not fulfilled, and we should revert the launched tasks). + require(addressesWithDescs.size == taskSet.numTasks, + s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + + s"because only ${addressesWithDescs.size} out of a total number of " + + s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " + + "been blacklisted or cannot fulfill task locality requirements.") + + // materialize the barrier coordinator. + maybeInitBarrierCoordinator() + + // Update the taskInfos into all the barrier task properties. + val addressesStr = addressesWithDescs + // Addresses ordered by partitionId + .sortBy(_._2.partitionId) + .map(_._1) + .mkString(",") + addressesWithDescs.foreach(_._2.properties.setProperty("addresses", addressesStr)) + + logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for " + + s"barrier stage ${taskSet.stageId}.") + } + } + } + tasks + } + /** * Shuffle offers around to avoid always placing tasks on the same workers. * If shuffle-biased task scheduling is enabled, this function will bias tasks From 36511a33557f0bf645ac2ebd30c69bd51b75e16a Mon Sep 17 00:00:00 2001 From: Will Manning Date: Tue, 27 Nov 2018 14:56:47 +0000 Subject: [PATCH 3/6] clean up the diff a bit more --- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 54aba77e78f29..ed422302fd7ec 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -380,8 +380,7 @@ private[spark] class TaskSchedulerImpl( } }.getOrElse(offers) - val partitionedAndShuffledOffers = partitionShuffleOffers(filteredOffers) - var allTasks: Seq[Seq[TaskDescription]] = Nil + var tasks: Seq[Seq[TaskDescription]] = Nil val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( @@ -391,16 +390,17 @@ private[spark] class TaskSchedulerImpl( } } + val partitionedAndShuffledOffers = partitionShuffleOffers(filteredOffers) for (shuffledOffers <- partitionedAndShuffledOffers.map(_._2)) { - allTasks ++= doResourceOffers(shuffledOffers, sortedTaskSets) + tasks ++= doResourceOffers(shuffledOffers, sortedTaskSets) } // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get // launched within a configured time. - if (allTasks.size > 0) { + if (tasks.size > 0) { hasLaunchedTask = true } - return allTasks + return tasks } private def doResourceOffers( From 3713dea17a5886e626d3703bdd7c828e4d89dc5e Mon Sep 17 00:00:00 2001 From: Will Manning Date: Tue, 27 Nov 2018 15:04:31 +0000 Subject: [PATCH 4/6] more minor cleanup --- core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 4 ++-- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 5 +++-- core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ---- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index f95bcd9986f35..29fa6988dc0e9 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -656,8 +656,8 @@ private[spark] class MapOutputTrackerMaster( def getExecutorShuffleStatus: scala.collection.Map[String, ExecutorShuffleStatus] = { shuffleStatuses.values .flatMap(status => status.executorsWithOutputs().map(_ -> status.isActive)) - .groupBy(_._1) - .mapValues(_.exists(_._2)) + .groupBy(_._1) // group by executor ID + .mapValues(_.exists(_._2)) // true if any are Active .mapValues(if (_) ExecutorShuffleStatus.Active else ExecutorShuffleStatus.Inactive) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index ed422302fd7ec..d2930a0f24771 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -82,7 +82,8 @@ private[spark] class TaskSchedulerImpl( ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation") // whether to prefer assigning tasks to executors that contain shuffle files - val SHUFFLE_BIASED_SCHEDULING_ENABLED = Utils.isShuffleBiasedTaskSchedulingEnabled(conf) + val shuffledBiasedTaskSchedulingEnabled = + conf.getBoolean("spark.scheduler.shuffleBiasedTaskScheduling.enabled", false) // Threshold above which we warn user initial TaskSet may be starved val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s") @@ -474,7 +475,7 @@ private[spark] class TaskSchedulerImpl( */ def partitionShuffleOffers(offers: IndexedSeq[WorkerOffer]) : IndexedSeq[(ExecutorShuffleStatus.Value, IndexedSeq[WorkerOffer])] = { - if (SHUFFLE_BIASED_SCHEDULING_ENABLED && offers.length > 1) { + if (shuffledBiasedTaskSchedulingEnabled && offers.length > 1) { // bias towards executors that have active shuffle outputs val execShuffles = mapOutputTracker.getExecutorShuffleStatus offers diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 53e550860c116..93b5826f8a74b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2511,10 +2511,6 @@ private[spark] object Utils extends Logging { (!isLocalMaster(conf) || conf.getBoolean("spark.dynamicAllocation.testing", false)) } - def isShuffleBiasedTaskSchedulingEnabled(conf: SparkConf): Boolean = { - conf.getBoolean("spark.scheduler.shuffleBiasedTaskScheduling.enabled", false) - } - /** * Return the initial number of executors for dynamic allocation. */ From 183d9a9d2b3c08906d170cab7aaaa9fc1f2f364a Mon Sep 17 00:00:00 2001 From: Will Manning Date: Tue, 27 Nov 2018 15:55:52 +0000 Subject: [PATCH 5/6] comment, plus fix typo --- .../spark/scheduler/TaskSchedulerImpl.scala | 18 +++++++++++------- .../scheduler/TaskSchedulerImplSuite.scala | 4 ++-- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index d2930a0f24771..3ebeb6512d1cc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -82,7 +82,7 @@ private[spark] class TaskSchedulerImpl( ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation") // whether to prefer assigning tasks to executors that contain shuffle files - val shuffledBiasedTaskSchedulingEnabled = + val shuffleBiasedTaskSchedulingEnabled = conf.getBoolean("spark.scheduler.shuffleBiasedTaskScheduling.enabled", false) // Threshold above which we warn user initial TaskSet may be starved @@ -391,7 +391,11 @@ private[spark] class TaskSchedulerImpl( } } - val partitionedAndShuffledOffers = partitionShuffleOffers(filteredOffers) + // If shuffle-biased task scheduling is enabled, then first assign as many tasks as possible to + // executors containing active shuffle files, followed by assigning to executors with inactive + // shuffle files, and then finally to those without shuffle files. This bin packing allows for + // more efficient dynamic allocation in the absence of an external shuffle service. + val partitionedAndShuffledOffers = partitionAndShuffleOffers(filteredOffers) for (shuffledOffers <- partitionedAndShuffledOffers.map(_._2)) { tasks ++= doResourceOffers(shuffledOffers, sortedTaskSets) } @@ -470,12 +474,12 @@ private[spark] class TaskSchedulerImpl( /** * Shuffle offers around to avoid always placing tasks on the same workers. - * If shuffle-biased task scheduling is enabled, this function will bias tasks - * towards executors with active shuffles. + * If shuffle-biased task scheduling is enabled, this function partitions the offers based on + * whether they have active/inactive/no shuffle files present. */ - def partitionShuffleOffers(offers: IndexedSeq[WorkerOffer]) + def partitionAndShuffleOffers(offers: IndexedSeq[WorkerOffer]) : IndexedSeq[(ExecutorShuffleStatus.Value, IndexedSeq[WorkerOffer])] = { - if (shuffledBiasedTaskSchedulingEnabled && offers.length > 1) { + if (shuffleBiasedTaskSchedulingEnabled && offers.length > 1) { // bias towards executors that have active shuffle outputs val execShuffles = mapOutputTracker.getExecutorShuffleStatus offers @@ -490,7 +494,7 @@ private[spark] class TaskSchedulerImpl( } /** - * Does the shuffling for [[partitionShuffleOffers()]]. Exposed to allow overriding in tests, + * Does the shuffling for [[partitionAndShuffleOffers()]]. Exposed to allow overriding in tests, * so that it can be deterministic. */ protected def doShuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index fb66527e7f645..7079500b80488 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -897,9 +897,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val execStatus = mapOutputTracker.getExecutorShuffleStatus assert(execStatus.equals(Map("exec1" -> Inactive, "exec2" -> Active))) - assert(taskScheduler.partitionShuffleOffers(offers).map(_._1) + assert(taskScheduler.partitionAndShuffleOffers(offers).map(_._1) .equals(IndexedSeq(Active, Inactive, Unknown))) - assert(taskScheduler.partitionShuffleOffers(offers).flatMap(_._2).map(offers.indexOf(_)) + assert(taskScheduler.partitionAndShuffleOffers(offers).flatMap(_._2).map(offers.indexOf(_)) .equals(IndexedSeq(1, 0, 2))) taskScheduler.submitTasks(FakeTask.createTaskSet(3, stageId = 1, stageAttemptId = 0)) From 93249116b3914fa05b271989a3cfd9b3297893e2 Mon Sep 17 00:00:00 2001 From: Will Manning Date: Wed, 28 Nov 2018 16:19:26 +0000 Subject: [PATCH 6/6] fix CR comment --- .../org/apache/spark/scheduler/TaskSchedulerImplSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 7079500b80488..34169c1de2778 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -27,6 +27,7 @@ import org.scalatest.BeforeAndAfterEach import org.scalatest.mockito.MockitoSugar import org.apache.spark._ +import org.apache.spark.ExecutorShuffleStatus._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.storage.BlockManagerId @@ -893,7 +894,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B mapOutputTracker.registerMapOutput(1, 0, makeMapStatus(offers(1))) mapOutputTracker.markShuffleInactive(0) - import ExecutorShuffleStatus._ val execStatus = mapOutputTracker.getExecutorShuffleStatus assert(execStatus.equals(Map("exec1" -> Inactive, "exec2" -> Active)))