Skip to content

Commit

Permalink
[SPARK-29149][YARN] Update YARN cluster manager For Stage Level Sched…
Browse files Browse the repository at this point in the history
…uling

### What changes were proposed in this pull request?

Yarn side changes for Stage level scheduling.  The previous PR for dynamic allocation changes was apache#27313

Modified the data structures to store things on a per ResourceProfile basis.
 I tried to keep the code changes to a minimum, the main loop that requests just goes through each Resourceprofile and the logic inside for each one stayed very close to the same.
On submission we now have to give each ResourceProfile a separate yarn Priority because yarn doesn't support asking for containers with different resources at the same Priority. We just use the profile id as the priority level.
Using a different Priority actually makes things easier when the containers come back to match them again which ResourceProfile they were requested for.
The expectation is that yarn will only give you a container with resource amounts you requested or more. It should never give you a container if it doesn't satisfy your resource requests.

If you want to see the full feature changes you can look at https://github.com/apache/spark/pull/27053/files for reference

### Why are the changes needed?

For stage level scheduling YARN support.

### Does this PR introduce any user-facing change?

no

### How was this patch tested?

Tested manually on YARN cluster and then unit tests.

Closes apache#27583 from tgravescs/SPARK-29149.

Authored-by: Thomas Graves <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
  • Loading branch information
tgravescs authored and Seongjin Cho committed Apr 14, 2020
1 parent d9284d2 commit 4b84b34
Show file tree
Hide file tree
Showing 12 changed files with 663 additions and 316 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ private[spark] object CoarseGrainedClusterMessages {
// Request executors by specifying the new total number of executors desired
// This includes executors already pending or running
case class RequestExecutors(
requestedTotal: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
resourceProfileToTotalExecs: Map[ResourceProfile, Int],
numLocalityAwareTasksPerResourceProfileId: Map[Int, Int],
hostToLocalTaskCount: Map[Int, Map[String, Int]],
nodeBlacklist: Set[String])
extends CoarseGrainedClusterMessage

Expand Down
30 changes: 15 additions & 15 deletions core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.scalatest.concurrent.Eventually._

import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.config.DYN_ALLOCATION_TESTING
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.resource.{ResourceProfile, ResourceProfileManager}
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
Expand Down Expand Up @@ -61,22 +61,22 @@ class HeartbeatReceiverSuite
PrivateMethod[collection.Map[String, Long]](Symbol("executorLastSeen"))
private val _executorTimeoutMs = PrivateMethod[Long](Symbol("executorTimeoutMs"))
private val _killExecutorThread = PrivateMethod[ExecutorService](Symbol("killExecutorThread"))
var conf: SparkConf = _

/**
* Before each test, set up the SparkContext and a custom [[HeartbeatReceiver]]
* that uses a manual clock.
*/
override def beforeEach(): Unit = {
super.beforeEach()
val conf = new SparkConf()
conf = new SparkConf()
.setMaster("local[2]")
.setAppName("test")
.set(DYN_ALLOCATION_TESTING, true)
sc = spy(new SparkContext(conf))
scheduler = mock(classOf[TaskSchedulerImpl])
when(sc.taskScheduler).thenReturn(scheduler)
when(scheduler.nodeBlacklist).thenReturn(Predef.Set[String]())
when(scheduler.resourcesReqsPerTask).thenReturn(Seq.empty)
when(scheduler.sc).thenReturn(sc)
heartbeatReceiverClock = new ManualClock
heartbeatReceiver = new HeartbeatReceiver(sc, heartbeatReceiverClock)
Expand Down Expand Up @@ -164,9 +164,10 @@ class HeartbeatReceiverSuite
test("expire dead hosts should kill executors with replacement (SPARK-8119)") {
// Set up a fake backend and cluster manager to simulate killing executors
val rpcEnv = sc.env.rpcEnv
val fakeClusterManager = new FakeClusterManager(rpcEnv)
val fakeClusterManager = new FakeClusterManager(rpcEnv, conf)
val fakeClusterManagerRef = rpcEnv.setupEndpoint("fake-cm", fakeClusterManager)
val fakeSchedulerBackend = new FakeSchedulerBackend(scheduler, rpcEnv, fakeClusterManagerRef)
val fakeSchedulerBackend =
new FakeSchedulerBackend(scheduler, rpcEnv, fakeClusterManagerRef, sc.resourceProfileManager)
when(sc.schedulerBackend).thenReturn(fakeSchedulerBackend)

// Register fake executors with our fake scheduler backend
Expand Down Expand Up @@ -282,18 +283,16 @@ private class FakeExecutorEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpo
private class FakeSchedulerBackend(
scheduler: TaskSchedulerImpl,
rpcEnv: RpcEnv,
clusterManagerEndpoint: RpcEndpointRef)
clusterManagerEndpoint: RpcEndpointRef,
resourceProfileManager: ResourceProfileManager)
extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {

protected override def doRequestTotalExecutors(
resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] = {
clusterManagerEndpoint.ask[Boolean](
RequestExecutors(
resourceProfileToTotalExecs(ResourceProfile.getOrCreateDefaultProfile(conf)),
numLocalityAwareTasksPerResourceProfileId(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID),
rpHostToLocalTaskCount(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID),
Set.empty))
}
RequestExecutors(resourceProfileToTotalExecs, numLocalityAwareTasksPerResourceProfileId,
rpHostToLocalTaskCount, Set.empty))
}

protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
clusterManagerEndpoint.ask[Boolean](KillExecutors(executorIds))
Expand All @@ -303,16 +302,17 @@ private class FakeSchedulerBackend(
/**
* Dummy cluster manager to simulate responses to executor allocation requests.
*/
private class FakeClusterManager(override val rpcEnv: RpcEnv) extends RpcEndpoint {
private class FakeClusterManager(override val rpcEnv: RpcEnv, conf: SparkConf) extends RpcEndpoint {
private var targetNumExecutors = 0
private val executorIdsToKill = new mutable.HashSet[String]

def getTargetNumExecutors: Int = targetNumExecutors
def getExecutorIdsToKill: Set[String] = executorIdsToKill.toSet

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RequestExecutors(requestedTotal, _, _, _) =>
targetNumExecutors = requestedTotal
case RequestExecutors(resourceProfileToTotalExecs, _, _, _) =>
targetNumExecutors =
resourceProfileToTotalExecs(ResourceProfile.getOrCreateDefaultProfile(conf))
context.reply(true)
case KillExecutors(executorIds) =>
executorIdsToKill ++= executorIds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ private[spark] class ApplicationMaster(
}
}
try {
val numPendingAllocate = allocator.getPendingAllocate.size
val numPendingAllocate = allocator.getNumContainersPendingAllocate
var sleepStartNs = 0L
var sleepInterval = 200L // ms
allocatorLock.synchronized {
Expand Down Expand Up @@ -778,8 +778,11 @@ private[spark] class ApplicationMaster(
case r: RequestExecutors =>
Option(allocator) match {
case Some(a) =>
if (a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,
r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) {
if (a.requestTotalExecutorsWithPreferredLocalities(
r.resourceProfileToTotalExecs,
r.numLocalityAwareTasksPerResourceProfileId,
r.hostToLocalTaskCount,
r.nodeBlacklist)) {
resetAllocatorInterval()
}
context.reply(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ private[spark] class ApplicationMasterSource(prefix: String, yarnAllocator: Yarn
})

metricRegistry.register(MetricRegistry.name("numLocalityAwareTasks"), new Gauge[Int] {
override def getValue: Int = yarnAllocator.numLocalityAwareTasks
override def getValue: Int = yarnAllocator.getNumLocalityAwareTasks
})

metricRegistry.register(MetricRegistry.name("numContainersPendingAllocate"), new Gauge[Int] {
override def getValue: Int = yarnAllocator.numContainersPendingAllocate
override def getValue: Int = yarnAllocator.getNumContainersPendingAllocate
})

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.api.records.{ContainerId, Resource}
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest

import org.apache.spark.SparkConf
import org.apache.spark.internal.config._
import org.apache.spark.resource.ResourceProfile

private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String])

Expand Down Expand Up @@ -82,7 +82,6 @@ private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], rack
private[yarn] class LocalityPreferredContainerPlacementStrategy(
val sparkConf: SparkConf,
val yarnConf: Configuration,
val resource: Resource,
resolver: SparkRackResolver) {

/**
Expand All @@ -96,6 +95,7 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
* containers
* @param localityMatchedPendingAllocations A sequence of pending container request which
* matches the localities of current required tasks.
* @param rp The ResourceProfile associated with this container.
* @return node localities and rack localities, each locality is an array of string,
* the length of localities is the same as number of containers
*/
Expand All @@ -104,11 +104,12 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
numLocalityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
allocatedHostToContainersMap: HashMap[String, Set[ContainerId]],
localityMatchedPendingAllocations: Seq[ContainerRequest]
localityMatchedPendingAllocations: Seq[ContainerRequest],
rp: ResourceProfile
): Array[ContainerLocalityPreferences] = {
val updatedHostToContainerCount = expectedHostToContainerCount(
numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainersMap,
localityMatchedPendingAllocations)
localityMatchedPendingAllocations, rp)
val updatedLocalityAwareContainerNum = updatedHostToContainerCount.values.sum

// The number of containers to allocate, divided into two groups, one with preferred locality,
Expand Down Expand Up @@ -152,11 +153,14 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
}

/**
* Calculate the number of executors need to satisfy the given number of pending tasks.
* Calculate the number of executors needed to satisfy the given number of pending tasks for
* the ResourceProfile.
*/
private def numExecutorsPending(numTasksPending: Int): Int = {
val coresPerExecutor = resource.getVirtualCores
(numTasksPending * sparkConf.get(CPUS_PER_TASK) + coresPerExecutor - 1) / coresPerExecutor
private def numExecutorsPending(
numTasksPending: Int,
rp: ResourceProfile): Int = {
val tasksPerExec = rp.maxTasksPerExecutor(sparkConf)
math.ceil(numTasksPending / tasksPerExec.toDouble).toInt
}

/**
Expand All @@ -175,14 +179,15 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
allocatedHostToContainersMap: HashMap[String, Set[ContainerId]],
localityMatchedPendingAllocations: Seq[ContainerRequest]
localityMatchedPendingAllocations: Seq[ContainerRequest],
rp: ResourceProfile
): Map[String, Int] = {
val totalLocalTaskNum = hostToLocalTaskCount.values.sum
val pendingHostToContainersMap = pendingHostToContainerCount(localityMatchedPendingAllocations)

hostToLocalTaskCount.map { case (host, count) =>
val expectedCount =
count.toDouble * numExecutorsPending(localityAwareTasks) / totalLocalTaskNum
count.toDouble * numExecutorsPending(localityAwareTasks, rp) / totalLocalTaskNum
// Take the locality of pending containers into consideration
val existedCount = allocatedHostToContainersMap.get(host).map(_.size).getOrElse(0) +
pendingHostToContainersMap.getOrElse(host, 0.0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,17 @@ private object ResourceRequestHelper extends Logging {
resourceInformation
}

def isYarnCustomResourcesNonEmpty(resource: Resource): Boolean = {
try {
// Use reflection as this uses APIs only available in Hadoop 3
val getResourcesMethod = resource.getClass().getMethod("getResources")
val resources = getResourcesMethod.invoke(resource).asInstanceOf[Array[Any]]
if (resources.nonEmpty) true else false
} catch {
case _: NoSuchMethodException => false
}
}

/**
* Checks whether Hadoop 2.x or 3 is used as a dependency.
* In case of Hadoop 3 and later, the ResourceInformation class
Expand Down
Loading

0 comments on commit 4b84b34

Please sign in to comment.