Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29148][CORE] Add stage level scheduling dynamic allocation and scheduler backend changes #27313

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
0ad97e3
[SPARK-29148]Add stage level scheduling dynamic allocation and schedu…
tgravescs Jan 17, 2020
1d8e1cf
Fix tests and modify Stage info to have resource profile id
tgravescs Jan 17, 2020
6c56fbf
revert pom
tgravescs Jan 17, 2020
66745a1
cleanup
tgravescs Jan 17, 2020
1bf5faf
Fix empty map
tgravescs Jan 20, 2020
24ddabd
minor comments and error on cores being limiting resource
tgravescs Jan 20, 2020
92c0fd2
cleanup
tgravescs Jan 20, 2020
a0c3ade
fix typo
tgravescs Jan 20, 2020
54e5b43
clean up warning on shutdown
tgravescs Jan 20, 2020
0408c02
Add checks make sure cores limiting resource in local mode
tgravescs Jan 21, 2020
0a93cc9
Update comments and fix check when no resources
tgravescs Jan 21, 2020
c3358fc
Remove some tests that need scheduler changes
tgravescs Jan 21, 2020
35e0a4d
Style fix ups
tgravescs Jan 21, 2020
be4e542
Add resourceProfileManager to kubernetes test that is mocking
tgravescs Jan 21, 2020
1bfd706
Make temporary directory for test of standalone resources
tgravescs Jan 22, 2020
cd3e000
Address review comments
tgravescs Jan 22, 2020
8540b33
Update to have sparkcontext clear the default profile so that in between
tgravescs Jan 23, 2020
c5954b8
put clearnresource profile back in for tests
tgravescs Jan 23, 2020
7b7c513
Fix spacing
tgravescs Jan 23, 2020
ae4db1e
Minor comments from late review of PR 26682
tgravescs Jan 27, 2020
d270a73
Attempt to clarify commment of calculateAmountAndPartsForFraction
tgravescs Jan 27, 2020
56e34d7
Add () to calls to clearDefaultProfile
tgravescs Jan 27, 2020
e0a9d0e
Merge branch 'master' of https://github.com/apache/spark into SPARK-2…
tgravescs Feb 3, 2020
2193e91
Fix from merge to master
tgravescs Feb 3, 2020
9cbce12
Fix merge issue
tgravescs Feb 3, 2020
89dfb19
Change to val's for review comments
tgravescs Feb 3, 2020
5435640
Update test added in master
tgravescs Feb 3, 2020
5449cda
Change to use Optional for ExecutorResourceRequest instead of ""
tgravescs Feb 3, 2020
fa3f5a4
Revert "Change to use Optional for ExecutorResourceRequest instead of…
tgravescs Feb 3, 2020
87aab30
Fix speculative test
tgravescs Feb 4, 2020
bef3a67
Change allocation manager remove Executors to take resource profile id
tgravescs Feb 6, 2020
15f4c96
Change the allocation manager gauges to be sum of all resource profiles
tgravescs Feb 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,29 @@ private[spark] trait ExecutorAllocationClient {
/**
* Update the cluster manager on our scheduling needs. Three bits of information are included
* to help it make decisions.
* @param numExecutors The total number of executors we'd like to have. The cluster manager
* shouldn't kill any running executor to reach this number, but,
* if all existing executors were to die, this is the number of executors
* we'd want to be allocated.
* @param localityAwareTasks The number of tasks in all active stages that have a locality
* preferences. This includes running, pending, and completed tasks.
* @param hostToLocalTaskCount A map of hosts to the number of tasks from all active stages
* that would like to like to run on that host.
* This includes running, pending, and completed tasks.
*
* @param resourceProfileIdToNumExecutors The total number of executors we'd like to have per
* ResourceProfile id. The cluster manager shouldn't kill
* any running executor to reach this number, but, if all
* existing executors were to die, this is the number
* of executors we'd want to be allocated.
* @param numLocalityAwareTasksPerResourceProfileId The number of tasks in all active stages that
* have a locality preferences per
* ResourceProfile id. This includes running,
* pending, and completed tasks.
* @param hostToLocalTaskCount A map of ResourceProfile id to a map of hosts to the number of
* tasks from all active stages that would like to like to run on
* that host. This includes running, pending, and completed tasks.
* @return whether the request is acknowledged by the cluster manager.
*/
private[spark] def requestTotalExecutors(
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int]): Boolean
resourceProfileIdToNumExecutors: Map[Int, Int],
numLocalityAwareTasksPerResourceProfileId: Map[Int, Int],
hostToLocalTaskCount: Map[Int, Map[String, Int]]): Boolean

/**
* Request an additional number of executors from the cluster manager.
* Request an additional number of executors from the cluster manager for the default
* ResourceProfile.
* @return whether the request is acknowledged by the cluster manager.
*/
def requestExecutors(numAdditionalExecutors: Int): Boolean
Expand Down
481 changes: 332 additions & 149 deletions core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Large diffs are not rendered by default.

149 changes: 41 additions & 108 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReferenc

import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.immutable
import scala.collection.mutable.HashMap
import scala.language.implicitConversions
import scala.reflect.{classTag, ClassTag}
Expand Down Expand Up @@ -53,7 +54,7 @@ import org.apache.spark.io.CompressionCodec
import org.apache.spark.metrics.source.JVMCPUSource
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.resource.{ResourceID, ResourceInformation}
import org.apache.spark.resource._
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -219,9 +220,10 @@ class SparkContext(config: SparkConf) extends Logging {
private var _shutdownHookRef: AnyRef = _
private var _statusStore: AppStatusStore = _
private var _heartbeater: Heartbeater = _
private var _resources: scala.collection.immutable.Map[String, ResourceInformation] = _
private var _resources: immutable.Map[String, ResourceInformation] = _
private var _shuffleDriverComponents: ShuffleDriverComponents = _
private var _plugins: Option[PluginContainer] = None
private var _resourceProfileManager: ResourceProfileManager = _

/* ------------------------------------------------------------------------------------- *
| Accessors and public fields. These provide access to the internal state of the |
Expand Down Expand Up @@ -343,6 +345,8 @@ class SparkContext(config: SparkConf) extends Logging {
private[spark] def executorAllocationManager: Option[ExecutorAllocationManager] =
_executorAllocationManager

private[spark] def resourceProfileManager: ResourceProfileManager = _resourceProfileManager

private[spark] def cleaner: Option[ContextCleaner] = _cleaner

private[spark] var checkpointDir: Option[String] = None
Expand Down Expand Up @@ -451,6 +455,7 @@ class SparkContext(config: SparkConf) extends Logging {
}

_listenerBus = new LiveListenerBus(_conf)
_resourceProfileManager = new ResourceProfileManager(_conf)

// Initialize the app status store and listener before SparkEnv is created so that it gets
// all events.
Expand Down Expand Up @@ -611,7 +616,7 @@ class SparkContext(config: SparkConf) extends Logging {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
cleaner = cleaner))
cleaner = cleaner, resourceProfileManager = resourceProfileManager))
case _ =>
None
}
Expand Down Expand Up @@ -1622,7 +1627,7 @@ class SparkContext(config: SparkConf) extends Logging {

/**
* Update the cluster manager on our scheduling needs. Three bits of information are included
* to help it make decisions.
* to help it make decisions. This applies to the default ResourceProfile.
* @param numExecutors The total number of executors we'd like to have. The cluster manager
* shouldn't kill any running executor to reach this number, but,
* if all existing executors were to die, this is the number of executors
Expand All @@ -1638,11 +1643,16 @@ class SparkContext(config: SparkConf) extends Logging {
def requestTotalExecutors(
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
hostToLocalTaskCount: immutable.Map[String, Int]
): Boolean = {
schedulerBackend match {
case b: ExecutorAllocationClient =>
b.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount)
// this is being applied to the default resource profile, would need to add api to support
// others
val defaultProfId = resourceProfileManager.defaultResourceProfile.id
b.requestTotalExecutors(immutable.Map(defaultProfId-> numExecutors),
immutable.Map(localityAwareTasks -> defaultProfId),
immutable.Map(defaultProfId -> hostToLocalTaskCount))
case _ =>
logWarning("Requesting executors is not supported by current scheduler.")
false
Expand Down Expand Up @@ -2771,109 +2781,34 @@ object SparkContext extends Logging {
// When running locally, don't try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES = 1

// Ensure that executor's resources satisfies one or more tasks requirement.
def checkResourcesPerTask(clusterMode: Boolean, executorCores: Option[Int]): Unit = {
// Ensure that default executor's resources satisfies one or more tasks requirement.
// This function is for cluster managers that don't set the executor cores config, for
// others its checked in ResourceProfile.
def checkResourcesPerTask(executorCores: Int): Unit = {
val taskCores = sc.conf.get(CPUS_PER_TASK)
val execCores = if (clusterMode) {
executorCores.getOrElse(sc.conf.get(EXECUTOR_CORES))
} else {
executorCores.get
}
// some cluster managers don't set the EXECUTOR_CORES config by default (standalone
// and mesos coarse grained), so we can't rely on that config for those.
val shouldCheckExecCores = executorCores.isDefined || sc.conf.contains(EXECUTOR_CORES) ||
(master.equalsIgnoreCase("yarn") || master.startsWith("k8s"))

// Number of cores per executor must meet at least one task requirement.
if (shouldCheckExecCores && execCores < taskCores) {
throw new SparkException(s"The number of cores per executor (=$execCores) has to be >= " +
s"the task config: ${CPUS_PER_TASK.key} = $taskCores when run on $master.")
}

// Calculate the max slots each executor can provide based on resources available on each
// executor and resources required by each task.
val taskResourceRequirements = parseResourceRequirements(sc.conf, SPARK_TASK_PREFIX)
val executorResourcesAndAmounts = parseAllResourceRequests(sc.conf, SPARK_EXECUTOR_PREFIX)
.map(request => (request.id.resourceName, request.amount)).toMap

var (numSlots, limitingResourceName) = if (shouldCheckExecCores) {
(execCores / taskCores, "CPU")
} else {
(-1, "")
}

taskResourceRequirements.foreach { taskReq =>
// Make sure the executor resources were specified through config.
val execAmount = executorResourcesAndAmounts.getOrElse(taskReq.resourceName,
throw new SparkException("The executor resource config: " +
ResourceID(SPARK_EXECUTOR_PREFIX, taskReq.resourceName).amountConf +
" needs to be specified since a task requirement config: " +
ResourceID(SPARK_TASK_PREFIX, taskReq.resourceName).amountConf +
" was specified")
)
// Make sure the executor resources are large enough to launch at least one task.
if (execAmount < taskReq.amount) {
throw new SparkException("The executor resource config: " +
ResourceID(SPARK_EXECUTOR_PREFIX, taskReq.resourceName).amountConf +
s" = $execAmount has to be >= the requested amount in task resource config: " +
ResourceID(SPARK_TASK_PREFIX, taskReq.resourceName).amountConf +
s" = ${taskReq.amount}")
}
// Compare and update the max slots each executor can provide.
// If the configured amount per task was < 1.0, a task is subdividing
// executor resources. If the amount per task was > 1.0, the task wants
// multiple executor resources.
val resourceNumSlots = Math.floor(execAmount * taskReq.numParts / taskReq.amount).toInt
if (resourceNumSlots < numSlots) {
if (shouldCheckExecCores) {
throw new IllegalArgumentException("The number of slots on an executor has to be " +
"limited by the number of cores, otherwise you waste resources and " +
"dynamic allocation doesn't work properly. Your configuration has " +
s"core/task cpu slots = ${numSlots} and " +
s"${taskReq.resourceName} = ${resourceNumSlots}. " +
"Please adjust your configuration so that all resources require same number " +
"of executor slots.")
}
numSlots = resourceNumSlots
limitingResourceName = taskReq.resourceName
}
}
if(!shouldCheckExecCores && Utils.isDynamicAllocationEnabled(sc.conf)) {
// if we can't rely on the executor cores config throw a warning for user
logWarning("Please ensure that the number of slots available on your " +
"executors is limited by the number of cores to task cpus and not another " +
"custom resource. If cores is not the limiting resource then dynamic " +
"allocation will not work properly!")
}
// warn if we would waste any resources due to another resource limiting the number of
// slots on an executor
taskResourceRequirements.foreach { taskReq =>
val execAmount = executorResourcesAndAmounts(taskReq.resourceName)
if ((numSlots * taskReq.amount / taskReq.numParts) < execAmount) {
val taskReqStr = if (taskReq.numParts > 1) {
s"${taskReq.amount}/${taskReq.numParts}"
} else {
s"${taskReq.amount}"
}
val resourceNumSlots = Math.floor(execAmount * taskReq.numParts / taskReq.amount).toInt
val message = s"The configuration of resource: ${taskReq.resourceName} " +
s"(exec = ${execAmount}, task = ${taskReqStr}, " +
s"runnable tasks = ${resourceNumSlots}) will " +
s"result in wasted resources due to resource ${limitingResourceName} limiting the " +
s"number of runnable tasks per executor to: ${numSlots}. Please adjust " +
s"your configuration."
if (Utils.isTesting) {
throw new SparkException(message)
} else {
logWarning(message)
}
}
validateTaskCpusLargeEnough(executorCores, taskCores)
val defaultProf = sc.resourceProfileManager.defaultResourceProfile
// TODO - this is temporary until all of stage level scheduling feature is integrated,
// fail if any other resource limiting due to dynamic allocation and scheduler using
// slots based on cores
val cpuSlots = executorCores/taskCores
val limitingResource = defaultProf.limitingResource(sc.conf)
if (limitingResource.nonEmpty && !limitingResource.equals(ResourceProfile.CPUS) &&
defaultProf.maxTasksPerExecutor(sc.conf) < cpuSlots) {
throw new IllegalArgumentException("The number of slots on an executor has to be " +
"limited by the number of cores, otherwise you waste resources and " +
"dynamic allocation doesn't work properly. Your configuration has " +
s"core/task cpu slots = ${cpuSlots} and " +
s"${limitingResource} = " +
s"${defaultProf.maxTasksPerExecutor(sc.conf)}. Please adjust your configuration " +
"so that all resources require same number of executor slots.")
}
ResourceUtils.warnOnWastedResources(defaultProf, sc.conf, Some(executorCores))
}

master match {
case "local" =>
checkResourcesPerTask(clusterMode = false, Some(1))
checkResourcesPerTask(1)
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
scheduler.initialize(backend)
Expand All @@ -2886,7 +2821,7 @@ object SparkContext extends Logging {
if (threadCount <= 0) {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
checkResourcesPerTask(clusterMode = false, Some(threadCount))
checkResourcesPerTask(threadCount)
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
Expand All @@ -2897,22 +2832,21 @@ object SparkContext extends Logging {
// local[*, M] means the number of cores on the computer with M failures
// local[N, M] means exactly N threads with M failures
val threadCount = if (threads == "*") localCpuCount else threads.toInt
checkResourcesPerTask(clusterMode = false, Some(threadCount))
checkResourcesPerTask(threadCount)
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)

case SPARK_REGEX(sparkUrl) =>
checkResourcesPerTask(clusterMode = true, None)
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)

case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
checkResourcesPerTask(clusterMode = true, Some(coresPerSlave.toInt))
checkResourcesPerTask(coresPerSlave.toInt)
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
if (sc.executorMemory > memoryPerSlaveInt) {
Expand Down Expand Up @@ -2941,7 +2875,6 @@ object SparkContext extends Logging {
(backend, scheduler)

case masterUrl =>
checkResourcesPerTask(clusterMode = true, None)
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,13 @@ private[spark] object Tests {
val TEST_N_CORES_EXECUTOR = ConfigBuilder("spark.testing.nCoresPerExecutor")
.intConf
.createWithDefault(2)

val RESOURCES_WARNING_TESTING =
ConfigBuilder("spark.resources.warnings.testing").booleanConf.createWithDefault(false)

val RESOURCE_PROFILE_MANAGER_TESTING =
ConfigBuilder("spark.testing.resourceProfileManager")
.booleanConf
.createWithDefault(false)

}
Loading