Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert soft dynamic allocation for SPARK-25299. #513

Merged
merged 9 commits into from
Mar 14, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ private[spark] class ExecutorAllocationManager(
client: ExecutorAllocationClient,
listenerBus: LiveListenerBus,
conf: SparkConf,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster)
extends Logging {

Expand All @@ -119,11 +118,6 @@ private[spark] class ExecutorAllocationManager(

private val cachedExecutorIdleTimeoutS = conf.get(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT)

private val inactiveShuffleExecutorIdleTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.inactiveShuffleExecutorIdleTimeout", s"${Integer.MAX_VALUE}s")

private val externalShuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED)

// During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.get(DYN_ALLOCATION_TESTING)

Expand Down Expand Up @@ -222,6 +216,12 @@ private[spark] class ExecutorAllocationManager(
if (cachedExecutorIdleTimeoutS < 0) {
throw new SparkException(s"${DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT.key} must be >= 0!")
}
// Require external shuffle service for dynamic allocation
// Otherwise, we may lose shuffle files when killing executors
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing) {
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
if (tasksPerExecutorForFullParallelism == 0) {
throw new SparkException(s"${EXECUTOR_CORES.key} must not be < ${CPUS_PER_TASK.key}.")
}
Expand Down Expand Up @@ -312,8 +312,6 @@ private[spark] class ExecutorAllocationManager(
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis

updateAndSyncNumExecutorsTarget(now)

val executorIdsToBeRemoved = ArrayBuffer[String]()
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
Expand All @@ -323,6 +321,8 @@ private[spark] class ExecutorAllocationManager(
}
!expired
}
// Update executor target number only after initializing flag is unset
updateAndSyncNumExecutorsTarget(now)
if (executorIdsToBeRemoved.nonEmpty) {
removeExecutors(executorIdsToBeRemoved)
}
Expand Down Expand Up @@ -552,7 +552,7 @@ private[spark] class ExecutorAllocationManager(
// has been reached, it will no longer be marked as idle. When new executors join,
// however, we are no longer at the lower bound, and so we must mark executor X
// as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951)
checkForIdleExecutors()
executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle)
logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})")
} else {
logWarning(s"Duplicate executor $executorId has registered")
Expand Down Expand Up @@ -607,45 +607,30 @@ private[spark] class ExecutorAllocationManager(
*/
private def onExecutorIdle(executorId: String): Unit = synchronized {
if (executorIds.contains(executorId)) {
val hasActiveShuffleBlocks = !externalShuffleServiceEnabled &&
mapOutputTracker.hasOutputsOnExecutor(executorId, activeOnly = true)
if (!removeTimes.contains(executorId)
&& !executorsPendingToRemove.contains(executorId)
&& !hasActiveShuffleBlocks) {
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
// Note that it is not necessary to query the executors since all the cached
// blocks we are concerned with are reported to the driver. Note that this
// does not include broadcast blocks.
val hasCachedBlocks = blockManagerMaster.hasCachedBlocks(executorId)
val hasAnyShuffleBlocks =
!externalShuffleServiceEnabled && mapOutputTracker.hasOutputsOnExecutor(executorId)
val now = clock.getTimeMillis()

// Use the maximum of all the timeouts that apply.
val timeoutS = List(
executorIdleTimeoutS,
if (hasCachedBlocks) cachedExecutorIdleTimeoutS else 0,
if (hasAnyShuffleBlocks) inactiveShuffleExecutorIdleTimeoutS else 0)
.max

val expiryTime = now + timeoutS * 1000;
val realExpiryTime = if (expiryTime <= 0) Long.MaxValue else expiryTime

removeTimes(executorId) = realExpiryTime
val timeout = {
if (hasCachedBlocks) {
// Use a different timeout if the executor has cached blocks.
now + cachedExecutorIdleTimeoutS * 1000
} else {
now + executorIdleTimeoutS * 1000
}
}
val realTimeout = if (timeout <= 0) Long.MaxValue else timeout // overflow
removeTimes(executorId) = realTimeout
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
s"scheduled to run on the executor (to expire in ${(realExpiryTime - now)/1000} seconds)")
s"scheduled to run on the executor (to expire in ${(realTimeout - now)/1000} seconds)")
}
} else {
logWarning(s"Attempted to mark unknown executor $executorId idle")
}
}

/**
* Check if any executors are now idle, and call the idle callback for them.
*/
private def checkForIdleExecutors(): Unit = synchronized {
executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle)
}

/**
* Callback invoked when the specified executor is now running a task.
* This resets all variables used for removing this executor.
Expand Down Expand Up @@ -680,13 +665,6 @@ private[spark] class ExecutorAllocationManager(
// place the executors.
private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])]

override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
// At the end of a job, trigger the callbacks for idle executors again to clean up executors
// which we were keeping around only because they held active shuffle blocks.
logDebug("Checking for idle executors at end of job")
allocationManager.checkForIdleExecutors()
}

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
initializing = false
val stageId = stageSubmitted.stageInfo.stageId
Expand Down Expand Up @@ -734,11 +712,6 @@ private[spark] class ExecutorAllocationManager(
if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
}

// Trigger the callbacks for idle executors again to clean up executors
// which we were keeping around only because they held active shuffle blocks.
logDebug("Checking for idle executors at end of stage")
allocationManager.checkForIdleExecutors()
}
}

Expand All @@ -752,10 +725,15 @@ private[spark] class ExecutorAllocationManager(
if (stageIdToNumRunningTask.contains(stageId)) {
stageIdToNumRunningTask(stageId) += 1
}
// This guards against the race condition in which the `SparkListenerTaskStart`
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
// possible because these events are posted in different threads. (see SPARK-4951)
if (!allocationManager.executorIds.contains(executorId)) {
// This guards against the following race condition:
// 1. The `SparkListenerTaskStart` event is posted before the
// `SparkListenerExecutorAdded` event
// 2. The `SparkListenerExecutorRemoved` event is posted before the
// `SparkListenerTaskStart` event
// Above cases are possible because these events are posted in different threads.
// (see SPARK-4951 SPARK-26927)
if (!allocationManager.executorIds.contains(executorId) &&
client.getExecutorIds().contains(executorId)) {
allocationManager.onExecutorAdded(executorId)
}

Expand Down
100 changes: 8 additions & 92 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark

import java.io._
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor}
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import scala.collection.JavaConverters._
Expand All @@ -28,7 +28,6 @@ import scala.concurrent.duration.Duration
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import org.apache.spark.ExecutorShuffleStatus.ExecutorShuffleStatus
import org.apache.spark.broadcast.{Broadcast, BroadcastManager}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -62,13 +61,6 @@ private class ShuffleStatus(numPartitions: Int) {
// Exposed for testing
val mapStatuses = new Array[MapStatus](numPartitions)

/**
* Whether an active job in the [[org.apache.spark.scheduler.DAGScheduler]] depends on this.
* If dynamic allocation is enabled, then executors that do not contain active shuffles may
* eventually be surrendered by the [[ExecutorAllocationManager]].
*/
var isActive = true

/**
* The cached result of serializing the map statuses array. This cache is lazily populated when
* [[serializedMapStatus]] is called. The cache is invalidated when map outputs are removed.
Expand All @@ -88,24 +80,17 @@ private class ShuffleStatus(numPartitions: Int) {
/**
* Counter tracking the number of partitions that have output. This is a performance optimization
* to avoid having to count the number of non-null entries in the `mapStatuses` array and should
* be equivalent to `mapStatuses.count(_ ne null)`.
* be equivalent to`mapStatuses.count(_ ne null)`.
*/
private[this] var _numAvailableOutputs: Int = 0

/**
* Cached set of executorIds on which outputs exist. This is a performance optimization to avoid
* having to repeatedly iterate over ever element in the `mapStatuses` array and should be
* equivalent to `mapStatuses.map(_.location.executorId).groupBy(x => x).mapValues(_.length)`.
*/
private[this] val _numOutputsPerExecutorId = HashMap[String, Int]().withDefaultValue(0)

/**
* Register a map output. If there is already a registered location for the map output then it
* will be replaced by the new location.
*/
def addMapOutput(mapId: Int, status: MapStatus): Unit = synchronized {
if (mapStatuses(mapId) == null) {
incrementNumAvailableOutputs(status.location)
_numAvailableOutputs += 1
invalidateSerializedMapOutputStatusCache()
}
mapStatuses(mapId) = status
Expand All @@ -118,7 +103,7 @@ private class ShuffleStatus(numPartitions: Int) {
*/
def removeMapOutput(mapId: Int, bmAddress: BlockManagerId): Unit = synchronized {
if (mapStatuses(mapId) != null && mapStatuses(mapId).location == bmAddress) {
decrementNumAvailableOutputs(bmAddress)
_numAvailableOutputs -= 1
mapStatuses(mapId) = null
invalidateSerializedMapOutputStatusCache()
}
Expand Down Expand Up @@ -148,21 +133,13 @@ private class ShuffleStatus(numPartitions: Int) {
def removeOutputsByFilter(f: (BlockManagerId) => Boolean): Unit = synchronized {
for (mapId <- 0 until mapStatuses.length) {
if (mapStatuses(mapId) != null && f(mapStatuses(mapId).location)) {
decrementNumAvailableOutputs(mapStatuses(mapId).location)
_numAvailableOutputs -= 1
mapStatuses(mapId) = null
invalidateSerializedMapOutputStatusCache()
}
}
}

def hasOutputsOnExecutor(execId: String): Boolean = synchronized {
_numOutputsPerExecutorId(execId) > 0
}

def executorsWithOutputs(): Set[String] = synchronized {
_numOutputsPerExecutorId.keySet.toSet
}

/**
* Number of partitions that have shuffle outputs.
*/
Expand Down Expand Up @@ -215,22 +192,6 @@ private class ShuffleStatus(numPartitions: Int) {
f(mapStatuses)
}

private[this] def incrementNumAvailableOutputs(bmAddress: BlockManagerId): Unit = synchronized {
_numOutputsPerExecutorId(bmAddress.executorId) += 1
_numAvailableOutputs += 1
}

private[this] def decrementNumAvailableOutputs(bmAddress: BlockManagerId): Unit = synchronized {
assert(_numOutputsPerExecutorId(bmAddress.executorId) >= 1,
s"Tried to remove non-existent output from ${bmAddress.executorId}")
if (_numOutputsPerExecutorId(bmAddress.executorId) == 1) {
_numOutputsPerExecutorId.remove(bmAddress.executorId)
} else {
_numOutputsPerExecutorId(bmAddress.executorId) -= 1
}
_numAvailableOutputs -= 1
}

/**
* Clears the cached serialized map output statuses.
*/
Expand All @@ -240,7 +201,7 @@ private class ShuffleStatus(numPartitions: Int) {
Utils.tryLogNonFatalError {
// Use `blocking = false` so that this operation doesn't hang while trying to send cleanup
// RPCs to dead executors.
cachedSerializedBroadcast.destroy(blocking = false)
cachedSerializedBroadcast.destroy()
}
cachedSerializedBroadcast = null
}
Expand Down Expand Up @@ -345,11 +306,6 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
def stop() {}
}

private[spark] object ExecutorShuffleStatus extends Enumeration {
type ExecutorShuffleStatus = Value
val Active, Inactive, Unknown = Value
}

/**
* Driver-side class that keeps track of the location of the map output of a stage.
*
Expand Down Expand Up @@ -496,26 +452,6 @@ private[spark] class MapOutputTrackerMaster(
}
}

def markShuffleInactive(shuffleId: Int): Unit = {
shuffleStatuses.get(shuffleId) match {
case Some(shuffleStatus) =>
shuffleStatus.isActive = false
case None =>
throw new SparkException(
s"markShuffleInactive called for nonexistent shuffle ID $shuffleId.")
}
}

def markShuffleActive(shuffleId: Int): Unit = {
shuffleStatuses.get(shuffleId) match {
case Some(shuffleStatus) =>
shuffleStatus.isActive = true
case None =>
throw new SparkException(
s"markShuffleActive called for nonexistent shuffle ID $shuffleId.")
}
}

/**
* Removes all shuffle outputs associated with this host. Note that this will also remove
* outputs which are served by an external shuffle server (if one exists).
Expand All @@ -535,12 +471,6 @@ private[spark] class MapOutputTrackerMaster(
incrementEpoch()
}

def hasOutputsOnExecutor(execId: String, activeOnly: Boolean = false): Boolean = {
shuffleStatuses.valuesIterator.exists { status =>
status.hasOutputsOnExecutor(execId) && (!activeOnly || status.isActive)
}
}

/** Check if the given shuffle is being tracked */
def containsShuffle(shuffleId: Int): Boolean = shuffleStatuses.contains(shuffleId)

Expand Down Expand Up @@ -646,20 +576,6 @@ private[spark] class MapOutputTrackerMaster(
}
}

/**
* Return the set of executors that contain tracked shuffle files, with a status of
* [[ExecutorShuffleStatus.Inactive]] iff all shuffle files on that executor are marked inactive.
*
* @return a map of executor IDs to their corresponding [[ExecutorShuffleStatus]]
*/
def getExecutorShuffleStatus: scala.collection.Map[String, ExecutorShuffleStatus] = {
shuffleStatuses.values
.flatMap(status => status.executorsWithOutputs().map(_ -> status.isActive))
.groupBy(_._1) // group by executor ID
.mapValues(_.exists(_._2)) // true if any are Active
.mapValues(if (_) ExecutorShuffleStatus.Active else ExecutorShuffleStatus.Inactive)
}

/**
* Return a list of locations that each have fraction of map output greater than the specified
* threshold.
Expand Down Expand Up @@ -790,7 +706,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
val statuses = mapStatuses.get(shuffleId).orNull
if (statuses == null) {
logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
val startTime = System.currentTimeMillis
val startTimeNs = System.nanoTime()
var fetchedStatuses: Array[MapStatus] = null
fetching.synchronized {
// Someone else is fetching it; wait for them to be done
Expand Down Expand Up @@ -828,7 +744,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
}
}
logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
s"${System.currentTimeMillis - startTime} ms")
s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms")

if (fetchedStatuses != null) {
fetchedStatuses
Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,6 @@ class SparkContext(config: SparkConf) extends SafeLogging {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
_env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
_env.blockManager.master))
case _ =>
None
Expand Down
Loading