Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29149][YARN] Update YARN cluster manager For Stage Level Scheduling #27583

Closed
wants to merge 42 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
0ad97e3
[SPARK-29148]Add stage level scheduling dynamic allocation and schedu…
tgravescs Jan 17, 2020
1d8e1cf
Fix tests and modify Stage info to have resource profile id
tgravescs Jan 17, 2020
6c56fbf
revert pom
tgravescs Jan 17, 2020
66745a1
cleanup
tgravescs Jan 17, 2020
1bf5faf
Fix empty map
tgravescs Jan 20, 2020
24ddabd
minor comments and error on cores being limiting resource
tgravescs Jan 20, 2020
92c0fd2
cleanup
tgravescs Jan 20, 2020
a0c3ade
fix typo
tgravescs Jan 20, 2020
54e5b43
clean up warning on shutdown
tgravescs Jan 20, 2020
0408c02
Add checks make sure cores limiting resource in local mode
tgravescs Jan 21, 2020
0a93cc9
Update comments and fix check when no resources
tgravescs Jan 21, 2020
c3358fc
Remove some tests that need scheduler changes
tgravescs Jan 21, 2020
35e0a4d
Style fix ups
tgravescs Jan 21, 2020
be4e542
Add resourceProfileManager to kubernetes test that is mocking
tgravescs Jan 21, 2020
1bfd706
Make temporary directory for test of standalone resources
tgravescs Jan 22, 2020
cd3e000
Address review comments
tgravescs Jan 22, 2020
8540b33
Update to have sparkcontext clear the default profile so that in between
tgravescs Jan 23, 2020
c5954b8
put clearnresource profile back in for tests
tgravescs Jan 23, 2020
7b7c513
Fix spacing
tgravescs Jan 23, 2020
ae4db1e
Minor comments from late review of PR 26682
tgravescs Jan 27, 2020
d270a73
Attempt to clarify commment of calculateAmountAndPartsForFraction
tgravescs Jan 27, 2020
56e34d7
Add () to calls to clearDefaultProfile
tgravescs Jan 27, 2020
e0a9d0e
Merge branch 'master' of https://github.com/apache/spark into SPARK-2…
tgravescs Feb 3, 2020
2193e91
Fix from merge to master
tgravescs Feb 3, 2020
9cbce12
Fix merge issue
tgravescs Feb 3, 2020
89dfb19
Change to val's for review comments
tgravescs Feb 3, 2020
5435640
Update test added in master
tgravescs Feb 3, 2020
5449cda
Change to use Optional for ExecutorResourceRequest instead of ""
tgravescs Feb 3, 2020
fa3f5a4
Revert "Change to use Optional for ExecutorResourceRequest instead of…
tgravescs Feb 3, 2020
87aab30
Fix speculative test
tgravescs Feb 4, 2020
bef3a67
Change allocation manager remove Executors to take resource profile id
tgravescs Feb 6, 2020
15f4c96
Change the allocation manager gauges to be sum of all resource profiles
tgravescs Feb 10, 2020
bde49f0
[SPARK-29149] Update YARN cluster manager For Stage Level Scheduling
tgravescs Feb 11, 2020
d6d4c8d
cleanup
tgravescs Feb 11, 2020
25f59d3
minor fixes
tgravescs Feb 12, 2020
6f2ace0
Merge branch 'master' of https://github.com/apache/spark into SPARK-2…
tgravescs Feb 13, 2020
48db848
Add GuardedBy to the class datastructures
tgravescs Feb 25, 2020
f9c1a05
Update locking to remove unneeded concurrent structures and use
tgravescs Feb 25, 2020
9e79f1a
Add in more explicit synchronized calls to go along with GuardedBy to
tgravescs Feb 27, 2020
e89a8b5
Update to fix locking in matchContainerToRequest
tgravescs Feb 27, 2020
14b6251
Update to use concurrentHashMap
tgravescs Feb 27, 2020
bd3509c
Update comment to kick jenkins
tgravescs Feb 28, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ private[spark] object CoarseGrainedClusterMessages {
// Request executors by specifying the new total number of executors desired
// This includes executors already pending or running
case class RequestExecutors(
requestedTotal: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
resourceProfileToTotalExecs: Map[ResourceProfile, Int],
numLocalityAwareTasksPerResourceProfileId: Map[Int, Int],
hostToLocalTaskCount: Map[Int, Map[String, Int]],
nodeBlacklist: Set[String])
extends CoarseGrainedClusterMessage

Expand Down
30 changes: 15 additions & 15 deletions core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.scalatest.concurrent.Eventually._

import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.config.DYN_ALLOCATION_TESTING
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.resource.{ResourceProfile, ResourceProfileManager}
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
Expand Down Expand Up @@ -61,22 +61,22 @@ class HeartbeatReceiverSuite
PrivateMethod[collection.Map[String, Long]](Symbol("executorLastSeen"))
private val _executorTimeoutMs = PrivateMethod[Long](Symbol("executorTimeoutMs"))
private val _killExecutorThread = PrivateMethod[ExecutorService](Symbol("killExecutorThread"))
var conf: SparkConf = _

/**
* Before each test, set up the SparkContext and a custom [[HeartbeatReceiver]]
* that uses a manual clock.
*/
override def beforeEach(): Unit = {
super.beforeEach()
val conf = new SparkConf()
conf = new SparkConf()
.setMaster("local[2]")
.setAppName("test")
.set(DYN_ALLOCATION_TESTING, true)
sc = spy(new SparkContext(conf))
scheduler = mock(classOf[TaskSchedulerImpl])
when(sc.taskScheduler).thenReturn(scheduler)
when(scheduler.nodeBlacklist).thenReturn(Predef.Set[String]())
when(scheduler.resourcesReqsPerTask).thenReturn(Seq.empty)
when(scheduler.sc).thenReturn(sc)
heartbeatReceiverClock = new ManualClock
heartbeatReceiver = new HeartbeatReceiver(sc, heartbeatReceiverClock)
Expand Down Expand Up @@ -164,9 +164,10 @@ class HeartbeatReceiverSuite
test("expire dead hosts should kill executors with replacement (SPARK-8119)") {
// Set up a fake backend and cluster manager to simulate killing executors
val rpcEnv = sc.env.rpcEnv
val fakeClusterManager = new FakeClusterManager(rpcEnv)
val fakeClusterManager = new FakeClusterManager(rpcEnv, conf)
val fakeClusterManagerRef = rpcEnv.setupEndpoint("fake-cm", fakeClusterManager)
val fakeSchedulerBackend = new FakeSchedulerBackend(scheduler, rpcEnv, fakeClusterManagerRef)
val fakeSchedulerBackend =
new FakeSchedulerBackend(scheduler, rpcEnv, fakeClusterManagerRef, sc.resourceProfileManager)
when(sc.schedulerBackend).thenReturn(fakeSchedulerBackend)

// Register fake executors with our fake scheduler backend
Expand Down Expand Up @@ -282,18 +283,16 @@ private class FakeExecutorEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpo
private class FakeSchedulerBackend(
scheduler: TaskSchedulerImpl,
rpcEnv: RpcEnv,
clusterManagerEndpoint: RpcEndpointRef)
clusterManagerEndpoint: RpcEndpointRef,
resourceProfileManager: ResourceProfileManager)
extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {

protected override def doRequestTotalExecutors(
resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] = {
clusterManagerEndpoint.ask[Boolean](
RequestExecutors(
resourceProfileToTotalExecs(ResourceProfile.getOrCreateDefaultProfile(conf)),
numLocalityAwareTasksPerResourceProfileId(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID),
rpHostToLocalTaskCount(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID),
Set.empty))
}
RequestExecutors(resourceProfileToTotalExecs, numLocalityAwareTasksPerResourceProfileId,
rpHostToLocalTaskCount, Set.empty))
}

protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
clusterManagerEndpoint.ask[Boolean](KillExecutors(executorIds))
Expand All @@ -303,16 +302,17 @@ private class FakeSchedulerBackend(
/**
* Dummy cluster manager to simulate responses to executor allocation requests.
*/
private class FakeClusterManager(override val rpcEnv: RpcEnv) extends RpcEndpoint {
private class FakeClusterManager(override val rpcEnv: RpcEnv, conf: SparkConf) extends RpcEndpoint {
private var targetNumExecutors = 0
private val executorIdsToKill = new mutable.HashSet[String]

def getTargetNumExecutors: Int = targetNumExecutors
def getExecutorIdsToKill: Set[String] = executorIdsToKill.toSet

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RequestExecutors(requestedTotal, _, _, _) =>
targetNumExecutors = requestedTotal
case RequestExecutors(resourceProfileToTotalExecs, _, _, _) =>
targetNumExecutors =
resourceProfileToTotalExecs(ResourceProfile.getOrCreateDefaultProfile(conf))
context.reply(true)
case KillExecutors(executorIds) =>
executorIdsToKill ++= executorIds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ private[spark] class ApplicationMaster(
}
}
try {
val numPendingAllocate = allocator.getPendingAllocate.size
val numPendingAllocate = allocator.getNumContainersPendingAllocate
var sleepStartNs = 0L
var sleepInterval = 200L // ms
allocatorLock.synchronized {
Expand Down Expand Up @@ -778,8 +778,11 @@ private[spark] class ApplicationMaster(
case r: RequestExecutors =>
Option(allocator) match {
case Some(a) =>
if (a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,
r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) {
if (a.requestTotalExecutorsWithPreferredLocalities(
r.resourceProfileToTotalExecs,
r.numLocalityAwareTasksPerResourceProfileId,
r.hostToLocalTaskCount,
r.nodeBlacklist)) {
resetAllocatorInterval()
}
context.reply(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ private[spark] class ApplicationMasterSource(prefix: String, yarnAllocator: Yarn
})

metricRegistry.register(MetricRegistry.name("numLocalityAwareTasks"), new Gauge[Int] {
override def getValue: Int = yarnAllocator.numLocalityAwareTasks
override def getValue: Int = yarnAllocator.getNumLocalityAwareTasks
})

metricRegistry.register(MetricRegistry.name("numContainersPendingAllocate"), new Gauge[Int] {
override def getValue: Int = yarnAllocator.numContainersPendingAllocate
override def getValue: Int = yarnAllocator.getNumContainersPendingAllocate
})

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.api.records.{ContainerId, Resource}
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest

import org.apache.spark.SparkConf
import org.apache.spark.internal.config._
import org.apache.spark.resource.ResourceProfile

private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String])

Expand Down Expand Up @@ -82,7 +82,6 @@ private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], rack
private[yarn] class LocalityPreferredContainerPlacementStrategy(
val sparkConf: SparkConf,
val yarnConf: Configuration,
val resource: Resource,
resolver: SparkRackResolver) {

/**
Expand All @@ -96,6 +95,7 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
* containers
* @param localityMatchedPendingAllocations A sequence of pending container request which
* matches the localities of current required tasks.
* @param rp The ResourceProfile associated with this container.
* @return node localities and rack localities, each locality is an array of string,
* the length of localities is the same as number of containers
*/
Expand All @@ -104,11 +104,12 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
numLocalityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
allocatedHostToContainersMap: HashMap[String, Set[ContainerId]],
localityMatchedPendingAllocations: Seq[ContainerRequest]
localityMatchedPendingAllocations: Seq[ContainerRequest],
rp: ResourceProfile
): Array[ContainerLocalityPreferences] = {
val updatedHostToContainerCount = expectedHostToContainerCount(
numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainersMap,
localityMatchedPendingAllocations)
localityMatchedPendingAllocations, rp)
val updatedLocalityAwareContainerNum = updatedHostToContainerCount.values.sum

// The number of containers to allocate, divided into two groups, one with preferred locality,
Expand Down Expand Up @@ -152,11 +153,14 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
}

/**
* Calculate the number of executors need to satisfy the given number of pending tasks.
* Calculate the number of executors needed to satisfy the given number of pending tasks for
* the ResourceProfile.
*/
private def numExecutorsPending(numTasksPending: Int): Int = {
val coresPerExecutor = resource.getVirtualCores
(numTasksPending * sparkConf.get(CPUS_PER_TASK) + coresPerExecutor - 1) / coresPerExecutor
private def numExecutorsPending(
numTasksPending: Int,
rp: ResourceProfile): Int = {
val tasksPerExec = rp.maxTasksPerExecutor(sparkConf)
math.ceil(numTasksPending / tasksPerExec.toDouble).toInt
}

/**
Expand All @@ -175,14 +179,15 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
allocatedHostToContainersMap: HashMap[String, Set[ContainerId]],
localityMatchedPendingAllocations: Seq[ContainerRequest]
localityMatchedPendingAllocations: Seq[ContainerRequest],
rp: ResourceProfile
): Map[String, Int] = {
val totalLocalTaskNum = hostToLocalTaskCount.values.sum
val pendingHostToContainersMap = pendingHostToContainerCount(localityMatchedPendingAllocations)

hostToLocalTaskCount.map { case (host, count) =>
val expectedCount =
count.toDouble * numExecutorsPending(localityAwareTasks) / totalLocalTaskNum
count.toDouble * numExecutorsPending(localityAwareTasks, rp) / totalLocalTaskNum
// Take the locality of pending containers into consideration
val existedCount = allocatedHostToContainersMap.get(host).map(_.size).getOrElse(0) +
pendingHostToContainersMap.getOrElse(host, 0.0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,17 @@ private object ResourceRequestHelper extends Logging {
resourceInformation
}

def isYarnCustomResourcesNonEmpty(resource: Resource): Boolean = {
try {
// Use reflection as this uses APIs only available in Hadoop 3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For full functionality we are targetting, which is the minimum hadoop version ?
Does hadoop 3 have all the wiring required for supporting gpu, accel cards, fpga, etc ? Or is it a subset of resources ?

(This is not directly related to this pr, but was for my own understanding, given you should know this well :) ).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hadoop 3.1.1 has full gpu support, they backported some of it to hadoop 2.10 as well. I've tested the normal GPU scheduling feature with both of these as well as older hadoop 2.7 release. With older versions you can still ask Spark for GPUs but if yarn doesn't support it doesn't ask yarn for it but Spark internally still tries to do it. If you are running on nodes with GPUs spark will still use your discovery script to find them and assign them out. if the discovery script doesn't find a gpu and you asked for one then it fails.

This was actually a more recent change that I put in for gpu scheduling as more and more people were asking for support on older versions of hadoop because they don't plan on upgrading to hadoop 3 for a while.

I do need to test all that again with the stage level scheduling.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying the behavior when YARN does support GPU, etc as a resource.

I am probably missing something here, would be great to understand this better when YARN does not.
Suppose I have a spark application, depending on some library which requires GPU (for example) and set corresponding resource profile expectations on the RDD's created (I am trying to make a case where app developer did not explicitly configure the resource profiles, but is implicitly leveraging them via some library).

Now, if this application gets run on hadoop 2.7 (or anything before 2.10 as you mentioned), what will be the behavior ?
If I understood it right :

  1. We will make requests to YARN without GPU's in the allocation request since YARN does not support it.
  2. On the nodes received, we will try to use the discovery script in assumption that GPU's are available - YARN is just oblivious about them. We will probably be using node-label constraint to ensure GPU availability ?
  3. If there are GPU's detected, we use them - else executor fails ?

Is this right?
If yes, how do we handle multi-tenancy on the executor host ? or choose which gpu(s) to use ?
Is the assumption that in workloads like this, the entire node is reserved to prevent contention ? I am not sure if you have documented/detailed this somewhere and I missed it !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct on the behavior. Many companies requested for this to work with their existing Hadoop installs (2.x where its < 2.10 or 3.1.1) and use the methods they are using with hadoop 2. I'm not trying to create a solution for everyone, just allow their existing solutions to work.
In most cases I've heard they have like a GPU queue or node labels so they know they run on nodes with GPUs. After that different companies have different ways of doing the multi-tenancy. I've heard of some using file locking for instance. Or you could also put the GPUs in process exclusive mode and then just iterate over them to acquire a free one. The idea here is they can use whatever solution they already have. They can write a custom discovery script and I also added the ability to plugin a class if its easier to write Java code to do this. https://issues.apache.org/jira/browse/SPARK-30689?filter=-2

val getResourcesMethod = resource.getClass().getMethod("getResources")
val resources = getResourcesMethod.invoke(resource).asInstanceOf[Array[Any]]
if (resources.nonEmpty) true else false
} catch {
case _: NoSuchMethodException => false
}
}

/**
* Checks whether Hadoop 2.x or 3 is used as a dependency.
* In case of Hadoop 3 and later, the ResourceInformation class
Expand Down
Loading