Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24432] Support dynamic allocation without external shuffle service #24083

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ private[spark] class ExecutorAllocationManager(
client: ExecutorAllocationClient,
listenerBus: LiveListenerBus,
conf: SparkConf,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster)
extends Logging {

Expand All @@ -118,6 +119,11 @@ private[spark] class ExecutorAllocationManager(

private val cachedExecutorIdleTimeoutS = conf.get(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT)

private val inactiveShuffleExecutorIdleTimeoutS =
conf.get(DYN_ALLOCATION_INACTIVE_SHUFFLE_EXECUTOR_IDLE_TIMEOUT)

private val externalShuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED)

// During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.get(DYN_ALLOCATION_TESTING)

Expand Down Expand Up @@ -216,12 +222,6 @@ private[spark] class ExecutorAllocationManager(
if (cachedExecutorIdleTimeoutS < 0) {
throw new SparkException(s"${DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT.key} must be >= 0!")
}
// Require external shuffle service for dynamic allocation
// Otherwise, we may lose shuffle files when killing executors
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing) {
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
if (tasksPerExecutorForFullParallelism == 0) {
throw new SparkException(s"${EXECUTOR_CORES.key} must not be < ${CPUS_PER_TASK.key}.")
}
Expand Down Expand Up @@ -552,7 +552,7 @@ private[spark] class ExecutorAllocationManager(
// has been reached, it will no longer be marked as idle. When new executors join,
// however, we are no longer at the lower bound, and so we must mark executor X
// as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951)
executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle)
checkForIdleExecutors()
logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})")
} else {
logWarning(s"Duplicate executor $executorId has registered")
Expand Down Expand Up @@ -607,30 +607,45 @@ private[spark] class ExecutorAllocationManager(
*/
private def onExecutorIdle(executorId: String): Unit = synchronized {
if (executorIds.contains(executorId)) {
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
val hasActiveShuffleBlocks = !externalShuffleServiceEnabled &&
mapOutputTracker.hasOutputsOnExecutor(executorId, activeOnly = true)
if (!removeTimes.contains(executorId)
&& !executorsPendingToRemove.contains(executorId)
&& !hasActiveShuffleBlocks) {
// Note that it is not necessary to query the executors since all the cached
// blocks we are concerned with are reported to the driver. Note that this
// does not include broadcast blocks.
val hasCachedBlocks = blockManagerMaster.hasCachedBlocks(executorId)
val hasAnyShuffleBlocks =
!externalShuffleServiceEnabled && mapOutputTracker.hasOutputsOnExecutor(executorId)
val now = clock.getTimeMillis()
val timeout = {
if (hasCachedBlocks) {
// Use a different timeout if the executor has cached blocks.
now + cachedExecutorIdleTimeoutS * 1000
} else {
now + executorIdleTimeoutS * 1000
}
}
val realTimeout = if (timeout <= 0) Long.MaxValue else timeout // overflow
removeTimes(executorId) = realTimeout

// Use the maximum of all the timeouts that apply.
val timeoutS = List(
executorIdleTimeoutS,
if (hasCachedBlocks) cachedExecutorIdleTimeoutS else 0,
if (hasAnyShuffleBlocks) inactiveShuffleExecutorIdleTimeoutS else 0)
.max

val expiryTime = now + timeoutS * 1000;
val realExpiryTime = if (expiryTime <= 0) Long.MaxValue else expiryTime

removeTimes(executorId) = realExpiryTime
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
s"scheduled to run on the executor (to expire in ${(realTimeout - now)/1000} seconds)")
s"scheduled to run on the executor (to expire in ${(realExpiryTime - now)/1000} seconds)")
}
} else {
logWarning(s"Attempted to mark unknown executor $executorId idle")
}
}

/**
* Check if any executors are now idle, and call the idle callback for them.
*/
private def checkForIdleExecutors(): Unit = synchronized {
executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle)
}

/**
* Callback invoked when the specified executor is now running a task.
* This resets all variables used for removing this executor.
Expand Down Expand Up @@ -665,6 +680,13 @@ private[spark] class ExecutorAllocationManager(
// place the executors.
private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])]

override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
// At the end of a job, trigger the callbacks for idle executors again to clean up executors
// which we were keeping around only because they held active shuffle blocks.
logDebug("Checking for idle executors at end of job")
allocationManager.checkForIdleExecutors()
}

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
initializing = false
val stageId = stageSubmitted.stageInfo.stageId
Expand Down Expand Up @@ -712,6 +734,11 @@ private[spark] class ExecutorAllocationManager(
if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
}

// Trigger the callbacks for idle executors again to clean up executors
// which we were keeping around only because they held active shuffle blocks.
logDebug("Checking for idle executors at end of stage")
allocationManager.checkForIdleExecutors()
}
}

Expand Down
93 changes: 89 additions & 4 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import scala.concurrent.duration.Duration
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import org.apache.spark.ExecutorShuffleStatus.ExecutorShuffleStatus
import org.apache.spark.broadcast.{Broadcast, BroadcastManager}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -61,6 +62,13 @@ private class ShuffleStatus(numPartitions: Int) {
// Exposed for testing
val mapStatuses = new Array[MapStatus](numPartitions)

/**
* Whether an active/pending stage in the [[org.apache.spark.scheduler.DAGScheduler]] depends
* on this. If dynamic allocation is enabled, then executors that do not contain active shuffles
* may eventually be surrendered by the [[ExecutorAllocationManager]].
*/
var isActive = true

/**
* The cached result of serializing the map statuses array. This cache is lazily populated when
* [[serializedMapStatus]] is called. The cache is invalidated when map outputs are removed.
Expand All @@ -80,17 +88,25 @@ private class ShuffleStatus(numPartitions: Int) {
/**
* Counter tracking the number of partitions that have output. This is a performance optimization
* to avoid having to count the number of non-null entries in the `mapStatuses` array and should
* be equivalent to`mapStatuses.count(_ ne null)`.
* be equivalent to `mapStatuses.count(_ ne null)`.
*/
private[this] var _numAvailableOutputs: Int = 0

/**
* Cached mapping of executorIds to the number of outputs associated with those executors. This
* is a performance optimization to avoid having to repeatedly iterate over ever element in the
* `mapStatuses` array and should be equivalent to
* `mapStatuses.map(_.location.executorId).groupBy(x => x).mapValues(_.length)`.
*/
private[this] val _numOutputsPerExecutorId = HashMap[String, Int]().withDefaultValue(0)

/**
* Register a map output. If there is already a registered location for the map output then it
* will be replaced by the new location.
*/
def addMapOutput(mapId: Int, status: MapStatus): Unit = synchronized {
if (mapStatuses(mapId) == null) {
_numAvailableOutputs += 1
incrementNumAvailableOutputs(status.location)
invalidateSerializedMapOutputStatusCache()
}
mapStatuses(mapId) = status
Expand All @@ -103,7 +119,7 @@ private class ShuffleStatus(numPartitions: Int) {
*/
def removeMapOutput(mapId: Int, bmAddress: BlockManagerId): Unit = synchronized {
if (mapStatuses(mapId) != null && mapStatuses(mapId).location == bmAddress) {
_numAvailableOutputs -= 1
decrementNumAvailableOutputs(bmAddress)
mapStatuses(mapId) = null
invalidateSerializedMapOutputStatusCache()
}
Expand Down Expand Up @@ -133,13 +149,21 @@ private class ShuffleStatus(numPartitions: Int) {
def removeOutputsByFilter(f: (BlockManagerId) => Boolean): Unit = synchronized {
for (mapId <- 0 until mapStatuses.length) {
if (mapStatuses(mapId) != null && f(mapStatuses(mapId).location)) {
_numAvailableOutputs -= 1
decrementNumAvailableOutputs(mapStatuses(mapId).location)
mapStatuses(mapId) = null
invalidateSerializedMapOutputStatusCache()
}
}
}

def hasOutputsOnExecutor(execId: String): Boolean = synchronized {
_numOutputsPerExecutorId(execId) > 0
}

def executorsWithOutputs(): Set[String] = synchronized {
_numOutputsPerExecutorId.keySet.toSet
}

/**
* Number of partitions that have shuffle outputs.
*/
Expand Down Expand Up @@ -192,6 +216,22 @@ private class ShuffleStatus(numPartitions: Int) {
f(mapStatuses)
}

private[this] def incrementNumAvailableOutputs(bmAddress: BlockManagerId): Unit = synchronized {
_numOutputsPerExecutorId(bmAddress.executorId) += 1
_numAvailableOutputs += 1
}

private[this] def decrementNumAvailableOutputs(bmAddress: BlockManagerId): Unit = synchronized {
assert(_numOutputsPerExecutorId(bmAddress.executorId) >= 1,
s"Tried to remove non-existent output from ${bmAddress.executorId}")
if (_numOutputsPerExecutorId(bmAddress.executorId) == 1) {
_numOutputsPerExecutorId.remove(bmAddress.executorId)
} else {
_numOutputsPerExecutorId(bmAddress.executorId) -= 1
}
_numAvailableOutputs -= 1
}

/**
* Clears the cached serialized map output statuses.
*/
Expand Down Expand Up @@ -306,6 +346,11 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
def stop() {}
}

private[spark] object ExecutorShuffleStatus extends Enumeration {
type ExecutorShuffleStatus = Value
val Active, Inactive, Unknown = Value
}

/**
* Driver-side class that keeps track of the location of the map output of a stage.
*
Expand Down Expand Up @@ -452,6 +497,26 @@ private[spark] class MapOutputTrackerMaster(
}
}

def markShuffleInactive(shuffleId: Int): Unit = {
shuffleStatuses.get(shuffleId) match {
case Some(shuffleStatus) =>
shuffleStatus.isActive = false
case None =>
throw new SparkException(
s"markShuffleInactive called for nonexistent shuffle ID $shuffleId.")
}
}

def markShuffleActive(shuffleId: Int): Unit = {
shuffleStatuses.get(shuffleId) match {
case Some(shuffleStatus) =>
shuffleStatus.isActive = true
case None =>
throw new SparkException(
s"markShuffleActive called for nonexistent shuffle ID $shuffleId.")
}
}

/**
* Removes all shuffle outputs associated with this host. Note that this will also remove
* outputs which are served by an external shuffle server (if one exists).
Expand All @@ -471,6 +536,12 @@ private[spark] class MapOutputTrackerMaster(
incrementEpoch()
}

def hasOutputsOnExecutor(execId: String, activeOnly: Boolean = false): Boolean = {
shuffleStatuses.valuesIterator.exists { status =>
status.hasOutputsOnExecutor(execId) && (!activeOnly || status.isActive)
}
}

/** Check if the given shuffle is being tracked */
def containsShuffle(shuffleId: Int): Boolean = shuffleStatuses.contains(shuffleId)

Expand Down Expand Up @@ -576,6 +647,20 @@ private[spark] class MapOutputTrackerMaster(
}
}

/**
* Return the set of executors that contain tracked shuffle files, with a status of
* [[ExecutorShuffleStatus.Inactive]] iff all shuffle files on that executor are marked inactive.
*
* @return a map of executor IDs to their corresponding [[ExecutorShuffleStatus]]
*/
def getExecutorShuffleStatus: scala.collection.Map[String, ExecutorShuffleStatus] = {
shuffleStatuses.values
.flatMap(status => status.executorsWithOutputs().map(_ -> status.isActive))
.groupBy(_._1) // group by executor ID
.mapValues(_.exists(_._2)) // true if any are Active
.mapValues(if (_) ExecutorShuffleStatus.Active else ExecutorShuffleStatus.Inactive)
}

/**
* Return a list of locations that each have fraction of map output greater than the specified
* threshold.
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ class SparkContext(config: SparkConf) extends Logging {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
_env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
_env.blockManager.master))
case _ =>
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ package object config {
ConfigBuilder("spark.dynamicAllocation.cachedExecutorIdleTimeout")
.timeConf(TimeUnit.SECONDS).createWithDefault(Integer.MAX_VALUE)

private[spark] val DYN_ALLOCATION_INACTIVE_SHUFFLE_EXECUTOR_IDLE_TIMEOUT =
ConfigBuilder("spark.dynamicAllocation.inactiveShuffleExecutorIdleTimeout")
.timeConf(TimeUnit.SECONDS).createWithDefault(Integer.MAX_VALUE)

private[spark] val DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT =
ConfigBuilder("spark.dynamicAllocation.executorIdleTimeout")
.timeConf(TimeUnit.SECONDS).createWithDefault(60)
Expand Down
Loading