From 79556f3a169e628eb75e7f0f01d0d7cc361a2c58 Mon Sep 17 00:00:00 2001 From: Will Manning Date: Mon, 26 Nov 2018 21:45:54 +0000 Subject: [PATCH 1/8] Support dynamic allocation without external shuffle service (#427) Allows dynamically scaling executors up and down without external shuffle service. Tracks shuffle locations to know if executors can be safely scaled down. --- .../spark/ExecutorAllocationManager.scala | 58 +++++---- .../org/apache/spark/MapOutputTracker.scala | 92 +++++++++++++- .../scala/org/apache/spark/SparkContext.scala | 1 + .../apache/spark/scheduler/DAGScheduler.scala | 41 +++++-- .../ExecutorAllocationManagerSuite.scala | 70 ++++++++++- .../apache/spark/MapOutputTrackerSuite.scala | 27 +++++ .../cluster/k8s/ExecutorPodsAllocator.scala | 35 +++--- .../k8s/ExecutorPodsAllocatorSuite.scala | 21 ++++ .../DynamicAllocationTestsSuite.scala | 113 ++++++++++++++++++ .../k8s/integrationtest/KubernetesSuite.scala | 7 +- .../spark/deploy/yarn/YarnClusterSuite.scala | 21 +++- 11 files changed, 429 insertions(+), 57 deletions(-) create mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DynamicAllocationTestsSuite.scala diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 60d04046c7b2a..149318ea77999 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -25,7 +25,7 @@ import scala.util.control.{ControlThrowable, NonFatal} import com.codahale.metrics.{Gauge, MetricRegistry} -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL import org.apache.spark.metrics.source.Source @@ -94,6 +94,7 @@ private[spark] class ExecutorAllocationManager( client: ExecutorAllocationClient, listenerBus: LiveListenerBus, conf: SparkConf, + mapOutputTracker: MapOutputTrackerMaster, blockManagerMaster: BlockManagerMaster) extends Logging { @@ -118,6 +119,9 @@ private[spark] class ExecutorAllocationManager( private val cachedExecutorIdleTimeoutS = conf.get(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT) + private val inactiveShuffleExecutorIdleTimeoutS = conf.getTimeAsSeconds( + "spark.dynamicAllocation.inactiveShuffleExecutorIdleTimeout", s"${Integer.MAX_VALUE}s") + // During testing, the methods to actually kill and add executors are mocked out private val testing = conf.get(DYN_ALLOCATION_TESTING) @@ -216,12 +220,6 @@ private[spark] class ExecutorAllocationManager( if (cachedExecutorIdleTimeoutS < 0) { throw new SparkException(s"${DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT.key} must be >= 0!") } - // Require external shuffle service for dynamic allocation - // Otherwise, we may lose shuffle files when killing executors - if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing) { - throw new SparkException("Dynamic allocation of executors requires the external " + - "shuffle service. You may enable this through spark.shuffle.service.enabled.") - } if (tasksPerExecutorForFullParallelism == 0) { throw new SparkException(s"${EXECUTOR_CORES.key} must not be < ${CPUS_PER_TASK.key}.") } @@ -552,7 +550,7 @@ private[spark] class ExecutorAllocationManager( // has been reached, it will no longer be marked as idle. When new executors join, // however, we are no longer at the lower bound, and so we must mark executor X // as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951) - executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle) + checkForIdleExecutors() logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})") } else { logWarning(s"Duplicate executor $executorId has registered") @@ -607,30 +605,44 @@ private[spark] class ExecutorAllocationManager( */ private def onExecutorIdle(executorId: String): Unit = synchronized { if (executorIds.contains(executorId)) { - if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) { + val hasActiveShuffleBlocks = + mapOutputTracker.hasOutputsOnExecutor(executorId, activeOnly = true) + if (!removeTimes.contains(executorId) + && !executorsPendingToRemove.contains(executorId) + && !hasActiveShuffleBlocks) { // Note that it is not necessary to query the executors since all the cached // blocks we are concerned with are reported to the driver. Note that this // does not include broadcast blocks. val hasCachedBlocks = blockManagerMaster.hasCachedBlocks(executorId) + val hasAnyShuffleBlocks = mapOutputTracker.hasOutputsOnExecutor(executorId) val now = clock.getTimeMillis() - val timeout = { - if (hasCachedBlocks) { - // Use a different timeout if the executor has cached blocks. - now + cachedExecutorIdleTimeoutS * 1000 - } else { - now + executorIdleTimeoutS * 1000 - } - } - val realTimeout = if (timeout <= 0) Long.MaxValue else timeout // overflow - removeTimes(executorId) = realTimeout + + // Use the maximum of all the timeouts that apply. + val timeoutS = List( + executorIdleTimeoutS, + if (hasCachedBlocks) cachedExecutorIdleTimeoutS else 0, + if (hasAnyShuffleBlocks) inactiveShuffleExecutorIdleTimeoutS else 0) + .max + + val expiryTime = now + timeoutS * 1000; + val realExpiryTime = if (expiryTime <= 0) Long.MaxValue else expiryTime + + removeTimes(executorId) = realExpiryTime logDebug(s"Starting idle timer for $executorId because there are no more tasks " + - s"scheduled to run on the executor (to expire in ${(realTimeout - now)/1000} seconds)") + s"scheduled to run on the executor (to expire in ${(realExpiryTime - now)/1000} seconds)") } } else { logWarning(s"Attempted to mark unknown executor $executorId idle") } } + /** + * Check if any executors are now idle, and call the idle callback for them. + */ + private def checkForIdleExecutors(): Unit = synchronized { + executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle) + } + /** * Callback invoked when the specified executor is now running a task. * This resets all variables used for removing this executor. @@ -665,6 +677,12 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + // At the end of a job, trigger the callbacks for idle executors again to clean up executors + // which we were keeping around only because they held active shuffle blocks. + allocationManager.checkForIdleExecutors() + } + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { initializing = false val stageId = stageSubmitted.stageInfo.stageId diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 1d4b1ef9c9a1c..f7466cf2862dc 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -28,6 +28,7 @@ import scala.concurrent.duration.Duration import scala.reflect.ClassTag import scala.util.control.NonFatal +import org.apache.spark.ExecutorShuffleStatus.ExecutorShuffleStatus import org.apache.spark.broadcast.{Broadcast, BroadcastManager} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -61,6 +62,13 @@ private class ShuffleStatus(numPartitions: Int) { // Exposed for testing val mapStatuses = new Array[MapStatus](numPartitions) + /** + * Whether an active job in the [[org.apache.spark.scheduler.DAGScheduler]] depends on this. + * If dynamic allocation is enabled, then executors that do not contain active shuffles may + * eventually be surrendered by the [[ExecutorAllocationManager]]. + */ + var isActive = true + /** * The cached result of serializing the map statuses array. This cache is lazily populated when * [[serializedMapStatus]] is called. The cache is invalidated when map outputs are removed. @@ -80,17 +88,24 @@ private class ShuffleStatus(numPartitions: Int) { /** * Counter tracking the number of partitions that have output. This is a performance optimization * to avoid having to count the number of non-null entries in the `mapStatuses` array and should - * be equivalent to`mapStatuses.count(_ ne null)`. + * be equivalent to `mapStatuses.count(_ ne null)`. */ private[this] var _numAvailableOutputs: Int = 0 + /** + * Cached set of executorIds on which outputs exist. This is a performance optimization to avoid + * having to repeatedly iterate over ever element in the `mapStatuses` array and should be + * equivalent to `mapStatuses.map(_.location.executorId).groupBy(x => x).mapValues(_.length)`. + */ + private[this] val _numOutputsPerExecutorId = HashMap[String, Int]().withDefaultValue(0) + /** * Register a map output. If there is already a registered location for the map output then it * will be replaced by the new location. */ def addMapOutput(mapId: Int, status: MapStatus): Unit = synchronized { if (mapStatuses(mapId) == null) { - _numAvailableOutputs += 1 + incrementNumAvailableOutputs(status.location) invalidateSerializedMapOutputStatusCache() } mapStatuses(mapId) = status @@ -103,7 +118,7 @@ private class ShuffleStatus(numPartitions: Int) { */ def removeMapOutput(mapId: Int, bmAddress: BlockManagerId): Unit = synchronized { if (mapStatuses(mapId) != null && mapStatuses(mapId).location == bmAddress) { - _numAvailableOutputs -= 1 + decrementNumAvailableOutputs(bmAddress) mapStatuses(mapId) = null invalidateSerializedMapOutputStatusCache() } @@ -133,13 +148,21 @@ private class ShuffleStatus(numPartitions: Int) { def removeOutputsByFilter(f: (BlockManagerId) => Boolean): Unit = synchronized { for (mapId <- 0 until mapStatuses.length) { if (mapStatuses(mapId) != null && f(mapStatuses(mapId).location)) { - _numAvailableOutputs -= 1 + decrementNumAvailableOutputs(mapStatuses(mapId).location) mapStatuses(mapId) = null invalidateSerializedMapOutputStatusCache() } } } + def hasOutputsOnExecutor(execId: String): Boolean = synchronized { + _numOutputsPerExecutorId(execId) > 0 + } + + def executorsWithOutputs(): Set[String] = synchronized { + _numOutputsPerExecutorId.keySet.toSet + } + /** * Number of partitions that have shuffle outputs. */ @@ -192,6 +215,22 @@ private class ShuffleStatus(numPartitions: Int) { f(mapStatuses) } + private[this] def incrementNumAvailableOutputs(bmAddress: BlockManagerId): Unit = synchronized { + _numOutputsPerExecutorId(bmAddress.executorId) += 1 + _numAvailableOutputs += 1 + } + + private[this] def decrementNumAvailableOutputs(bmAddress: BlockManagerId): Unit = synchronized { + assert(_numOutputsPerExecutorId(bmAddress.executorId) >= 1, + s"Tried to remove non-existent output from ${bmAddress.executorId}") + if (_numOutputsPerExecutorId(bmAddress.executorId) == 1) { + _numOutputsPerExecutorId.remove(bmAddress.executorId) + } else { + _numOutputsPerExecutorId(bmAddress.executorId) -= 1 + } + _numAvailableOutputs -= 1 + } + /** * Clears the cached serialized map output statuses. */ @@ -306,6 +345,11 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging def stop() {} } +private[spark] object ExecutorShuffleStatus extends Enumeration { + type ExecutorShuffleStatus = Value + val Active, Inactive, Unknown = Value +} + /** * Driver-side class that keeps track of the location of the map output of a stage. * @@ -452,6 +496,26 @@ private[spark] class MapOutputTrackerMaster( } } + def markShuffleInactive(shuffleId: Int): Unit = { + shuffleStatuses.get(shuffleId) match { + case Some(shuffleStatus) => + shuffleStatus.isActive = false + case None => + throw new SparkException( + s"markShuffleInactive called for nonexistent shuffle ID $shuffleId.") + } + } + + def markShuffleActive(shuffleId: Int): Unit = { + shuffleStatuses.get(shuffleId) match { + case Some(shuffleStatus) => + shuffleStatus.isActive = true + case None => + throw new SparkException( + s"markShuffleActive called for nonexistent shuffle ID $shuffleId.") + } + } + /** * Removes all shuffle outputs associated with this host. Note that this will also remove * outputs which are served by an external shuffle server (if one exists). @@ -471,6 +535,12 @@ private[spark] class MapOutputTrackerMaster( incrementEpoch() } + def hasOutputsOnExecutor(execId: String, activeOnly: Boolean = false): Boolean = { + shuffleStatuses.valuesIterator.exists { status => + status.hasOutputsOnExecutor(execId) && (!activeOnly || status.isActive) + } + } + /** Check if the given shuffle is being tracked */ def containsShuffle(shuffleId: Int): Boolean = shuffleStatuses.contains(shuffleId) @@ -576,6 +646,20 @@ private[spark] class MapOutputTrackerMaster( } } + /** + * Return the set of executors that contain tracked shuffle files, with a status of + * [[ExecutorShuffleStatus.Inactive]] iff all shuffle files on that executor are marked inactive. + * + * @return a map of executor IDs to their corresponding [[ExecutorShuffleStatus]] + */ + def getExecutorShuffleStatus: scala.collection.Map[String, ExecutorShuffleStatus] = { + shuffleStatuses.values + .flatMap(status => status.executorsWithOutputs().map(_ -> status.isActive)) + .groupBy(_._1) + .mapValues(_.exists(_._2)) + .mapValues(if (_) ExecutorShuffleStatus.Active else ExecutorShuffleStatus.Inactive) + } + /** * Return a list of locations that each have fraction of map output greater than the specified * threshold. diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5cd6c2b9c380a..fa233a2304447 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -544,6 +544,7 @@ class SparkContext(config: SparkConf) extends Logging { case b: ExecutorAllocationClient => Some(new ExecutorAllocationManager( schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf, + _env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], _env.blockManager.master)) case _ => None diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index dd1b2595461fc..8a668bc1f6b64 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -398,7 +398,9 @@ private[spark] class DAGScheduler( shuffleIdToMapStage(shuffleDep.shuffleId) = stage updateJobIdStageIdMaps(jobId, stage) - if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { + if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { + mapOutputTracker.markShuffleActive(shuffleDep.shuffleId) + } else { // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of partitions is unknown logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")") @@ -628,6 +630,7 @@ private[spark] class DAGScheduler( } for ((k, v) <- shuffleIdToMapStage.find(_._2 == stage)) { shuffleIdToMapStage.remove(k) + mapOutputTracker.markShuffleInactive(k) } if (waitingStages.contains(stage)) { logDebug("Removing stage %d from waiting set.".format(stageId)) @@ -1379,6 +1382,31 @@ private[spark] class DAGScheduler( case _: ExceptionFailure | _: TaskKilled => updateAccumulators(event) case _ => } + + // Make sure shuffle outputs are registered before we post the event so that + // handlers can act on up-to-date shuffle information. + event.reason match { + case Success => + task match { + case smt: ShuffleMapTask => + val shuffleStage = stage.asInstanceOf[ShuffleMapStage] + val status = event.result.asInstanceOf[MapStatus] + val execId = status.location.executorId + logDebug("Registering shuffle output on executor " + execId) + if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { + logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") + } else { + // The epoch of the task is acceptable (i.e., the task was launched after the most + // recent failure we're aware of for the executor), so mark the task's output as + // available. + mapOutputTracker.registerMapOutput( + shuffleStage.shuffleDep.shuffleId, smt.partitionId, status) + } + case _ => + } + case _ => + } + postTaskEnd(event) event.reason match { @@ -1430,21 +1458,12 @@ private[spark] class DAGScheduler( logInfo("Ignoring result from " + rt + " because its job has finished") } - case smt: ShuffleMapTask => + case _: ShuffleMapTask => val shuffleStage = stage.asInstanceOf[ShuffleMapStage] shuffleStage.pendingPartitions -= task.partitionId val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) - if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { - logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") - } else { - // The epoch of the task is acceptable (i.e., the task was launched after the most - // recent failure we're aware of for the executor), so mark the task's output as - // available. - mapOutputTracker.registerMapOutput( - shuffleStage.shuffleDep.shuffleId, smt.partitionId, status) - } if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { markStageAsFinished(shuffleStage) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 12c8a9d6c9c45..ea67bd1e4bd3e 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -604,6 +604,69 @@ class ExecutorAllocationManagerSuite assert(firstRemoveTime !== secondRemoveTime) } + test("starting remove timers with cached/shuffle data") { + val minExecutors = 1 + val initialExecutors = 1 + val maxExecutors = 2 + val conf = new SparkConf() + .set(config.DYN_ALLOCATION_ENABLED, true) + .set(config.DYN_ALLOCATION_MIN_EXECUTORS, minExecutors) + .set(config.DYN_ALLOCATION_MAX_EXECUTORS, maxExecutors) + .set(config.DYN_ALLOCATION_INITIAL_EXECUTORS, initialExecutors) + .set(config.DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT.key, "1000ms") + .set(config.DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key, "1000ms") + .set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, s"${executorIdleTimeout}s") + .set(config.DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT.key, + s"${cachedExecutorIdleTimeout}s") + .set("spark.dynamicAllocation.inactiveShuffleExecutorIdleTimeout", + s"${inactiveShuffleExecutorIdleTimeout}s") + val mockAllocationClient = mock(classOf[ExecutorAllocationClient]) + val mockMOTM = mock(classOf[MapOutputTrackerMaster]) + val mockBMM = mock(classOf[BlockManagerMaster]) + val manager = new ExecutorAllocationManager( + mockAllocationClient, mock(classOf[LiveListenerBus]), conf, mockMOTM, mockBMM) + val clock = new ManualClock() + manager.setClock(clock) + + // 1. No cache or shuffle data => gets normal timeout. + when(mockBMM.hasCachedBlocks("executor-1")).thenReturn(false) + when(mockMOTM.hasOutputsOnExecutor("executor-1")).thenReturn(false) + when(mockMOTM.hasOutputsOnExecutor("executor-1", activeOnly = true)).thenReturn(false) + onExecutorAdded(manager, "executor-1") + onExecutorIdle(manager, "executor-1") + assert(removeTimes(manager).contains("executor-1")) + assert(removeTimes(manager)("executor-1") === + clock.getTimeMillis() + executorIdleTimeout * 1000) + + // 2. Cached data => gets cached executor timeout. + when(mockBMM.hasCachedBlocks("executor-2")).thenReturn(true) + when(mockMOTM.hasOutputsOnExecutor("executor-2")).thenReturn(false) + when(mockMOTM.hasOutputsOnExecutor("executor-2", activeOnly = true)).thenReturn(false) + onExecutorAdded(manager, "executor-2") + onExecutorIdle(manager, "executor-2") + assert(removeTimes(manager).contains("executor-2")) + assert(removeTimes(manager)("executor-2") === + clock.getTimeMillis() + cachedExecutorIdleTimeout * 1000) + + // 3. Inactive shuffle data => gets inactive shuffle executor timeout. + when(mockBMM.hasCachedBlocks("executor-3")).thenReturn(false) + when(mockMOTM.hasOutputsOnExecutor("executor-3")).thenReturn(true) + when(mockMOTM.hasOutputsOnExecutor("executor-3", activeOnly = true)).thenReturn(false) + onExecutorAdded(manager, "executor-3") + onExecutorIdle(manager, "executor-3") + assert(removeTimes(manager).contains("executor-3")) + assert(removeTimes(manager)("executor-3") === + clock.getTimeMillis() + inactiveShuffleExecutorIdleTimeout * 1000) + + // 4. Active shuffle data => not scheduled for removal. + when(mockBMM.hasCachedBlocks("executor-4")).thenReturn(false) + when(mockMOTM.hasOutputsOnExecutor("executor-4")).thenReturn(true) + when(mockMOTM.hasOutputsOnExecutor("executor-4", activeOnly = true)).thenReturn(true) + onExecutorAdded(manager, "executor-4") + onExecutorIdle(manager, "executor-4") + assert(!removeTimes(manager).contains("executor-4")) + } + test("mock polling loop with no events") { sc = createSparkContext(0, 20, 0) val manager = sc.executorAllocationManager.get @@ -1098,9 +1161,10 @@ class ExecutorAllocationManagerSuite .set(config.DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key, "1000ms") .set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "3000ms") val mockAllocationClient = mock(classOf[ExecutorAllocationClient]) + val mockMOTM = mock(classOf[MapOutputTrackerMaster]) val mockBMM = mock(classOf[BlockManagerMaster]) val manager = new ExecutorAllocationManager( - mockAllocationClient, mock(classOf[LiveListenerBus]), conf, mockBMM) + mockAllocationClient, mock(classOf[LiveListenerBus]), conf, mockMOTM, mockBMM) val clock = new ManualClock() manager.setClock(clock) @@ -1203,6 +1267,8 @@ class ExecutorAllocationManagerSuite s"${sustainedSchedulerBacklogTimeout.toString}s") .set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, s"${executorIdleTimeout.toString}s") .set(config.DYN_ALLOCATION_TESTING, true) + .set(config.DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT.key, + s"${cachedExecutorIdleTimeout.toString}s") // SPARK-22864: effectively disable the allocation schedule by setting the period to a // really long value. .set(TEST_SCHEDULE_INTERVAL, 10000L) @@ -1221,6 +1287,8 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { private val schedulerBacklogTimeout = 1L private val sustainedSchedulerBacklogTimeout = 2L private val executorIdleTimeout = 3L + private val cachedExecutorIdleTimeout = 4L + private val inactiveShuffleExecutorIdleTimeout = 5L private def createStageInfo( stageId: Int, diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index d86975964b558..26a0fb0657af2 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -329,4 +329,31 @@ class MapOutputTrackerSuite extends SparkFunSuite { rpcEnv.shutdown() } + test("correctly track executors and ExecutorShuffleStatus") { + val tracker = newTrackerMaster() + val bmId1 = BlockManagerId("exec1", "host1", 1000) + val bmId2 = BlockManagerId("exec2", "host2", 1000) + tracker.registerShuffle(11, 3) + tracker.registerMapOutput(11, 0, MapStatus(bmId1, Array(10))) + tracker.registerMapOutput(11, 1, MapStatus(bmId1, Array(100))) + tracker.registerMapOutput(11, 2, MapStatus(bmId2, Array(1000))) + + assert(tracker.hasOutputsOnExecutor("exec1")) + assert(tracker.getExecutorShuffleStatus.keySet.equals(Set("exec1", "exec2"))) + assert(!tracker.hasOutputsOnExecutor("exec3")) + + tracker.unregisterMapOutput(11, 0, bmId1) + assert(tracker.hasOutputsOnExecutor("exec1")) + tracker.unregisterMapOutput(11, 1, bmId1) + assert(!tracker.hasOutputsOnExecutor("exec1")) + + tracker.markShuffleInactive(11) + assert(tracker.hasOutputsOnExecutor("exec2")) + assert(!tracker.hasOutputsOnExecutor("exec2", activeOnly = true)) + + assert(tracker.getExecutorShuffleStatus == Map("exec2" -> ExecutorShuffleStatus.Inactive)) + tracker.markShuffleActive(11) + assert(tracker.getExecutorShuffleStatus == Map("exec2" -> ExecutorShuffleStatus.Active)) + } + } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index da3edfeca9b1f..10084da1348da 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -18,17 +18,6 @@ package org.apache.spark.scheduler.cluster.k8s import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} -import io.fabric8.kubernetes.api.model.PodBuilder -import io.fabric8.kubernetes.client.KubernetesClient -import scala.collection.mutable - -import org.apache.spark.{SecurityManager, SparkConf, SparkException} -import org.apache.spark.deploy.k8s.Config._ -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.KubernetesConf -import org.apache.spark.internal.Logging -import org.apache.spark.util.{Clock, Utils} - private[spark] class ExecutorPodsAllocator( conf: SparkConf, secMgr: SecurityManager, @@ -66,15 +55,24 @@ private[spark] class ExecutorPodsAllocator( // snapshot yet. Mapped to the timestamp when they were created. private val newlyCreatedExecutors = mutable.Map.empty[Long, Long] + private var latestSnapshot: Option[ExecutorPodsSnapshot] = None + + private var appId: Option[String] = None + def start(applicationId: String): Unit = { snapshotsStore.addSubscriber(podAllocationDelay) { onNewSnapshots(applicationId, _) } } - def setTotalExpectedExecutors(total: Int): Unit = totalExpectedExecutors.set(total) + def setTotalExpectedExecutors(total: Int): Unit = { + logDebug("Setting total expected executors", SafeArg.of("totalExpectedExecutors", total)) + totalExpectedExecutors.set(total) + appId.foreach { id => latestSnapshot.foreach { requestExecutorsIfNecessary(id, _) } } + } private def onNewSnapshots(applicationId: String, snapshots: Seq[ExecutorPodsSnapshot]): Unit = { + this.appId = Some(applicationId) newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys) // For all executors we've created against the API but have not seen in a snapshot // yet - check the current time. If the current time has exceeded some threshold, @@ -110,12 +108,18 @@ private[spark] class ExecutorPodsAllocator( if (snapshots.nonEmpty) { // Only need to examine the cluster as of the latest snapshot, the "current" state, to see if // we need to allocate more executors or not. - val latestSnapshot = snapshots.last - val currentRunningExecutors = latestSnapshot.executorPods.values.count { + latestSnapshot = Some(snapshots.last) + requestExecutorsIfNecessary(applicationId, snapshots.last) + } + } + + private def requestExecutorsIfNecessary(applicationId: String, + snapshot: ExecutorPodsSnapshot): Unit = { + val currentRunningExecutors = snapshot.executorPods.values.count { case PodRunning(_) => true case _ => false } - val currentPendingExecutors = latestSnapshot.executorPods.values.count { + val currentPendingExecutors = snapshot.executorPods.values.count { case PodPending(_) => true case _ => false } @@ -145,7 +149,6 @@ private[spark] class ExecutorPodsAllocator( .build() kubernetesClient.pods().create(podWithAttachedContainer) newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis() - logDebug(s"Requested executor with id $newExecutorId from Kubernetes.") } } else if (currentRunningExecutors >= currentTotalExpectedExecutors) { // TODO handle edge cases if we end up with more running executors than expected. diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 55d9adc212f92..2666db9fbc722 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -118,6 +118,27 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { verify(podOperations, times(podAllocationSize + 1)).create(any(classOf[Pod])) } + test("Immediately request more executors if possible when total expected executors changes") { + // Start with no executors. + podsAllocatorUnderTest.setTotalExpectedExecutors(0) + snapshotsStore.replaceSnapshot(Seq.empty[Pod]) + snapshotsStore.notifySubscribers() + verify(podOperations, never()).create(podWithAttachedContainerForId(1)) + + // Setting total to 1 immediately requests more. + podsAllocatorUnderTest.setTotalExpectedExecutors(1) + verify(podOperations).create(podWithAttachedContainerForId(1)) + + // Raising the total while executors are pending does not immediately request more. + podsAllocatorUnderTest.setTotalExpectedExecutors(2) + verify(podOperations, never()).create(podWithAttachedContainerForId(2)) + + // It will be requested like usual when the pending containers are running. + snapshotsStore.updatePod(runningExecutor(1)) + snapshotsStore.notifySubscribers() + verify(podOperations).create(podWithAttachedContainerForId(2)) + } + test("When a current batch reaches error states immediately, re-request" + " them on the next batch.") { podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DynamicAllocationTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DynamicAllocationTestsSuite.scala new file mode 100644 index 0000000000000..9bcec1fe9ec21 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DynamicAllocationTestsSuite.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import org.scalatest.concurrent.Eventually +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{k8sTestTag, INTERVAL, TIMEOUT} + +private[spark] trait DynamicAllocationTestsSuite { k8sSuite: KubernetesSuite => + + test("Run with dynamic allocation.", k8sTestTag) { + val labels = Map("spark-app-selector" -> driverPodName) + val driverPort = 7077 + val blockManagerPort = 10000 + val driverService = testBackend + .getKubernetesClient + .services() + .inNamespace(kubernetesTestComponents.namespace) + .createNew() + .withNewMetadata() + .withName(s"$driverPodName-svc") + .endMetadata() + .withNewSpec() + .withClusterIP("None") + .withSelector(labels.asJava) + .addNewPort() + .withName("driver-port") + .withPort(driverPort) + .withNewTargetPort(driverPort) + .endPort() + .addNewPort() + .withName("block-manager") + .withPort(blockManagerPort) + .withNewTargetPort(blockManagerPort) + .endPort() + .endSpec() + .done() + try { + val driverPod = testBackend + .getKubernetesClient + .pods() + .inNamespace(kubernetesTestComponents.namespace) + .createNew() + .withNewMetadata() + .withName(driverPodName) + .withLabels(labels.asJava) + .endMetadata() + .withNewSpec() + .withServiceAccountName(kubernetesTestComponents.serviceAccountName) + .addNewContainer() + .withName("spark-example") + .withImage(image) + .withImagePullPolicy("IfNotPresent") + .withCommand("/opt/spark/bin/run-example") + .addToArgs("--master", s"k8s://https://kubernetes.default.svc") + .addToArgs("--deploy-mode", "client") + .addToArgs("--conf", s"spark.kubernetes.container.image=$image") + .addToArgs( + "--conf", + s"spark.kubernetes.namespace=${kubernetesTestComponents.namespace}") + .addToArgs("--conf", "spark.kubernetes.authenticate.oauthTokenFile=" + + "/var/run/secrets/kubernetes.io/serviceaccount/token") + .addToArgs("--conf", "spark.kubernetes.authenticate.caCertFile=" + + "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") + .addToArgs("--conf", s"spark.kubernetes.driver.pod.name=$driverPodName") + .addToArgs("--conf", "spark.executor.memory=500m") + .addToArgs("--conf", "spark.executor.cores=1") + .addToArgs("--conf", "spark.dynamicAllocation.enabled=true") + .addToArgs("--conf", "spark.dynamicAllocation.minExecutors=0") + .addToArgs("--conf", "spark.dynamicAllocation.maxExecutors=1") + .addToArgs("--conf", + s"spark.driver.host=" + + s"${driverService.getMetadata.getName}.${kubernetesTestComponents.namespace}.svc") + .addToArgs("--conf", s"spark.driver.port=$driverPort") + .addToArgs("--conf", s"spark.driver.blockManager.port=$blockManagerPort") + .addToArgs("SparkPi") + .addToArgs("10") + .endContainer() + .endSpec() + .done() + Eventually.eventually(TIMEOUT, INTERVAL) { + assert(kubernetesTestComponents.kubernetesClient + .pods() + .withName(driverPodName) + .getLog + .contains("Pi is roughly 3"), "The application did not complete.") + } + } finally { + // Have to delete the service manually since it doesn't have an owner reference + kubernetesTestComponents + .kubernetesClient + .services() + .inNamespace(kubernetesTestComponents.namespace) + .delete(driverService) + } + } + +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index f8f4b4177f3bd..e0e10d3d8af54 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -40,8 +40,8 @@ import org.apache.spark.internal.config._ class KubernetesSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite - with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite - with Logging with Eventually with Matchers { + with PythonTestsSuite with ClientModeTestsSuite with DynamicAllocationTestsSuite + with PodTemplateSuite with Logging with Eventually with Matchers { import KubernetesSuite._ @@ -286,6 +286,7 @@ class KubernetesSuite extends SparkFunSuite } } } + protected def doBasicDriverPodCheck(driverPod: Pod): Unit = { assert(driverPod.getMetadata.getName === driverPodName) assert(driverPod.getSpec.getContainers.get(0).getImage === image) @@ -294,7 +295,6 @@ class KubernetesSuite extends SparkFunSuite === baseMemory) } - protected def doBasicDriverPyPodCheck(driverPod: Pod): Unit = { assert(driverPod.getMetadata.getName === driverPodName) assert(driverPod.getSpec.getContainers.get(0).getImage === pyImage) @@ -311,7 +311,6 @@ class KubernetesSuite extends SparkFunSuite === standardNonJVMMemory) } - protected def doBasicExecutorPodCheck(executorPod: Pod): Unit = { assert(executorPod.getSpec.getContainers.get(0).getImage === image) assert(executorPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-executor") diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 56b7dfc13699d..ce27f5fb16943 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.deploy.yarn import java.io.File -import java.net.URL import java.nio.charset.StandardCharsets import java.util.{HashMap => JHashMap} @@ -122,6 +121,26 @@ class YarnClusterSuite extends BaseYarnClusterSuite { )) } + test("run Spark in yarn-client mode with dynamic allocation") { + testBasicYarnApp(true, + Map( + "spark.dynamicAllocation.enabled" -> "true", + // Start with 0 executors to at least test that we will request more to run the job. + "spark.dynamicAllocation.initialExecutors" -> "0", + "spark.dynamicAllocation.maxExecutors" -> "1" + )) + } + + test("run Spark in yarn-cluster mode with dynamic allocation") { + testBasicYarnApp(false, + Map( + "spark.dynamicAllocation.enabled" -> "true", + // Start with 0 executors to at least test that we will request more to run the job. + "spark.dynamicAllocation.initialExecutors" -> "0", + "spark.dynamicAllocation.maxExecutors" -> "1" + )) + } + test("yarn-cluster should respect conf overrides in SparkHadoopUtil (SPARK-16414, SPARK-23630)") { // Create a custom hadoop config file, to make sure it's contents are propagated to the driver. val customConf = Utils.createTempDir() From 8473d16c306c6346d554953df483ebc07eba2836 Mon Sep 17 00:00:00 2001 From: Ryan Norris Date: Tue, 27 Nov 2018 21:30:50 +0800 Subject: [PATCH 2/8] Fix dynamic allocation with external shuffle service (#445) --- .../org/apache/spark/ExecutorAllocationManager.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 149318ea77999..bf0a6feba8554 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -25,7 +25,7 @@ import scala.util.control.{ControlThrowable, NonFatal} import com.codahale.metrics.{Gauge, MetricRegistry} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL import org.apache.spark.metrics.source.Source @@ -122,6 +122,8 @@ private[spark] class ExecutorAllocationManager( private val inactiveShuffleExecutorIdleTimeoutS = conf.getTimeAsSeconds( "spark.dynamicAllocation.inactiveShuffleExecutorIdleTimeout", s"${Integer.MAX_VALUE}s") + private val externalShuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED) + // During testing, the methods to actually kill and add executors are mocked out private val testing = conf.get(DYN_ALLOCATION_TESTING) @@ -605,7 +607,7 @@ private[spark] class ExecutorAllocationManager( */ private def onExecutorIdle(executorId: String): Unit = synchronized { if (executorIds.contains(executorId)) { - val hasActiveShuffleBlocks = + val hasActiveShuffleBlocks = !externalShuffleServiceEnabled && mapOutputTracker.hasOutputsOnExecutor(executorId, activeOnly = true) if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId) @@ -614,7 +616,8 @@ private[spark] class ExecutorAllocationManager( // blocks we are concerned with are reported to the driver. Note that this // does not include broadcast blocks. val hasCachedBlocks = blockManagerMaster.hasCachedBlocks(executorId) - val hasAnyShuffleBlocks = mapOutputTracker.hasOutputsOnExecutor(executorId) + val hasAnyShuffleBlocks = + !externalShuffleServiceEnabled && mapOutputTracker.hasOutputsOnExecutor(executorId) val now = clock.getTimeMillis() // Use the maximum of all the timeouts that apply. From 3e8b1f48e654ff1428cc581fba63e9aba64c0e3a Mon Sep 17 00:00:00 2001 From: Ryan Norris Date: Tue, 27 Nov 2018 22:01:25 +0800 Subject: [PATCH 3/8] Track active shuffles by stage (#446) --- .../spark/ExecutorAllocationManager.scala | 6 ++ .../apache/spark/scheduler/DAGScheduler.scala | 40 +++++++- .../spark/scheduler/DAGSchedulerSuite.scala | 98 ++++++++++++++++++- 3 files changed, 142 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index bf0a6feba8554..9f93ae0306991 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -683,6 +683,7 @@ private[spark] class ExecutorAllocationManager( override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { // At the end of a job, trigger the callbacks for idle executors again to clean up executors // which we were keeping around only because they held active shuffle blocks. + logDebug("Checking for idle executors at end of job") allocationManager.checkForIdleExecutors() } @@ -733,6 +734,11 @@ private[spark] class ExecutorAllocationManager( if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty) { allocationManager.onSchedulerQueueEmpty() } + + // Trigger the callbacks for idle executors again to clean up executors + // which we were keeping around only because they held active shuffle blocks. + logDebug("Checking for idle executors at end of stage") + allocationManager.checkForIdleExecutors() } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 8a668bc1f6b64..7c56c44010c1e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -25,7 +25,7 @@ import java.util.function.BiFunction import scala.annotation.tailrec import scala.collection.Map -import scala.collection.mutable.{ArrayStack, HashMap, HashSet} +import scala.collection.mutable.{ArrayStack, HashMap, HashSet, Set} import scala.concurrent.duration._ import scala.language.existentials import scala.language.postfixOps @@ -150,6 +150,13 @@ private[spark] class DAGScheduler( * the shuffle data will be in the MapOutputTracker). */ private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage] + + /** + * Mapping from shuffle dependency ID to the IDs of the stages which depend on the shuffle data. + * Used to track when shuffle data becomes no longer active. + */ + private[scheduler] val shuffleIdToDependentStages = new HashMap[Int, Set[Int]] + private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob] // Stages we need to run whose parents aren't done @@ -397,6 +404,7 @@ private[spark] class DAGScheduler( stageIdToStage(id) = stage shuffleIdToMapStage(shuffleDep.shuffleId) = stage updateJobIdStageIdMaps(jobId, stage) + updateShuffleDependenciesMap(stage) if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { mapOutputTracker.markShuffleActive(shuffleDep.shuffleId) @@ -456,6 +464,7 @@ private[spark] class DAGScheduler( val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) + updateShuffleDependenciesMap(stage) stage } @@ -602,6 +611,17 @@ private[spark] class DAGScheduler( updateJobIdStageIdMapsList(List(stage)) } + /** + * Registers the shuffle dependencies of the given stage. + */ + private def updateShuffleDependenciesMap(stage: Stage): Unit = { + getShuffleDependencies(stage.rdd).foreach { shuffleDep => + val shuffleId = shuffleDep.shuffleId + logDebug("Tracking that stage " + stage.id + " depends on shuffle " + shuffleId) + shuffleIdToDependentStages.getOrElseUpdate(shuffleId, Set.empty) += stage.id + } + } + /** * Removes state for job and any stages that are not needed by any other job. Does not * handle cancelling tasks or notifying the SparkListener about finished jobs/stages/tasks. @@ -1905,6 +1925,24 @@ private[spark] class DAGScheduler( case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0) case _ => "Unknown" } + + getShuffleDependencies(stage.rdd).foreach { shuffleDep => + val shuffleId = shuffleDep.shuffleId + if (!shuffleIdToDependentStages.contains(shuffleId)) { + logDebug("Stage finished with untracked shuffle dependency " + shuffleId) + } else { + var dependentStages = shuffleIdToDependentStages(shuffleId) + dependentStages -= stage.id; + logDebug("Stage " + stage.id + " finished. " + + "Shuffle " + shuffleId + " now has dependencies " + dependentStages) + if (dependentStages.isEmpty) { + logDebug("Shuffle " + shuffleId + " is no longer needed. Marking it inactive.") + shuffleIdToDependentStages.remove(shuffleId) + mapOutputTracker.markShuffleInactive(shuffleId) + } + } + } + if (errorMessage.isEmpty) { logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) stage.latestInfo.completionTime = Some(clock.getTimeMillis()) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index e17d264cced9f..98e20514700c3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.rdd.{DeterministicLevel, RDD} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException} import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} -import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils} +import org.apache.spark.util._ class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler) extends DAGSchedulerEventProcessLoop(dagScheduler) { @@ -2195,6 +2195,102 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assertDataStructuresEmpty() } + test("stage level active shuffle tracking") { + // We will have 3 stages depending on each other. + // The second stage is composed of 2 RDDs to check we're tracking shuffle up the chain. + val shuffleMapRdd1 = new MyRDD(sc, 2, Nil) + val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(1)) + val shuffleId1 = shuffleDep1.shuffleId + val shuffleMapRdd2 = new MyRDD(sc, 2, List(shuffleDep1), tracker = mapOutputTracker) + val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(1)) + val shuffleId2 = shuffleDep2.shuffleId + val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep2), tracker = mapOutputTracker) + val intermediateDep = new OneToOneDependency(intermediateRdd) + val reduceRdd = new MyRDD(sc, 1, List(intermediateDep), tracker = mapOutputTracker) + + // Submit the job. + // Both shuffles should become active. + submit(reduceRdd, Array(0)) + assert(mapOutputTracker.shuffleStatuses(shuffleId1).isActive === true) + assert(mapOutputTracker.shuffleStatuses(shuffleId2).isActive === true) + + // Complete the first stage. + // Both shuffles remain active. + completeShuffleMapStageSuccessfully(0, 0, 2) + assert(mapOutputTracker.shuffleStatuses(shuffleId1).isActive === true) + assert(mapOutputTracker.shuffleStatuses(shuffleId2).isActive === true) + + // Complete the second stage. + // Shuffle 1 is no longer needed and should become inactive. + completeShuffleMapStageSuccessfully(1, 0, 1) + assert(mapOutputTracker.shuffleStatuses(shuffleId1).isActive === false) + assert(mapOutputTracker.shuffleStatuses(shuffleId2).isActive === true) + + // Complete the results stage. + // Both shuffles are no longer needed and should become inactive. + completeNextResultStageWithSuccess(2, 0) + assert(mapOutputTracker.shuffleStatuses(shuffleId1).isActive === false) + assert(mapOutputTracker.shuffleStatuses(shuffleId2).isActive === false) + + // Double check results. + assert(results === Map(0 -> 42)) + results.clear() + assertDataStructuresEmpty() + } + + test("stage level active shuffle tracking with multiple dependents") { + // We will have a diamond shape dependency. + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) + val shuffleId = shuffleDep.shuffleId + val intermediateRdd1 = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) + val intermediateRdd2 = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) + val intermediateDep1 = new ShuffleDependency(intermediateRdd1, new HashPartitioner(1)) + val intermediateDep2 = new ShuffleDependency(intermediateRdd2, new HashPartitioner(1)) + val reduceRdd = + new MyRDD(sc, 1, List(intermediateDep1, intermediateDep2), tracker = mapOutputTracker) + + // Submit the job. + // Shuffle becomes active. + submit(reduceRdd, Array(0)) + assert(mapOutputTracker.shuffleStatuses(shuffleId).isActive === true) + + // Complete the shuffle stage. + // Shuffle remains active. + completeShuffleMapStageSuccessfully(0, 0, 2) + assert(mapOutputTracker.shuffleStatuses(shuffleId).isActive === true) + + // Complete first intermediate stage. + // Shuffle is still active. + val stageAttempt = taskSets(1) + checkStageId(1, 0, stageAttempt) + complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map { + case (task, idx) => + (Success, makeMapStatus("host" + ('A' + idx).toChar, 1)) + }.toSeq) + assert(mapOutputTracker.shuffleStatuses(shuffleId).isActive === true) + + // Complete second intermediate stage. + // Shuffle is no longer active. + val stageAttempt2 = taskSets(2) + checkStageId(2, 0, stageAttempt2) + complete(stageAttempt2, stageAttempt2.tasks.zipWithIndex.map { + case (task, idx) => + (Success, makeMapStatus("host" + ('A' + idx).toChar, 1)) + }.toSeq) + assert(mapOutputTracker.shuffleStatuses(shuffleId).isActive === false) + + // Complete the results stage. + // Shuffle is still inactive. + completeNextResultStageWithSuccess(3, 0) + assert(mapOutputTracker.shuffleStatuses(shuffleId).isActive === false) + + // Double check results. + assert(results === Map(0 -> 42)) + results.clear() + assertDataStructuresEmpty() + } + test("map stage submission with fetch failure") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) From 921d72f6cd91fa603624c2b12b9f83ba365bf80e Mon Sep 17 00:00:00 2001 From: Ryan Norris Date: Thu, 29 Nov 2018 04:45:14 +0800 Subject: [PATCH 4/8] Shuffle biased task scheduling (#447) --- .../org/apache/spark/MapOutputTracker.scala | 4 +- .../spark/scheduler/TaskSchedulerImpl.scala | 72 ++++++++++++++----- .../scheduler/TaskSchedulerImplSuite.scala | 38 +++++++++- .../DynamicAllocationTestsSuite.scala | 1 + 4 files changed, 96 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index f7466cf2862dc..83150960c70ab 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -655,8 +655,8 @@ private[spark] class MapOutputTrackerMaster( def getExecutorShuffleStatus: scala.collection.Map[String, ExecutorShuffleStatus] = { shuffleStatuses.values .flatMap(status => status.executorsWithOutputs().map(_ -> status.isActive)) - .groupBy(_._1) - .mapValues(_.exists(_._2)) + .groupBy(_._1) // group by executor ID + .mapValues(_.exists(_._2)) // true if any are Active .mapValues(if (_) ExecutorShuffleStatus.Active else ExecutorShuffleStatus.Inactive) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index db0fbfe29f65b..f4226b13d3946 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -87,6 +87,10 @@ private[spark] class TaskSchedulerImpl( private val speculationScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation") + // whether to prefer assigning tasks to executors that contain shuffle files + val shuffleBiasedTaskSchedulingEnabled = + conf.getBoolean("spark.scheduler.shuffleBiasedTaskScheduling.enabled", false) + // Threshold above which we warn user initial TaskSet may be starved val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s") @@ -414,11 +418,7 @@ private[spark] class TaskSchedulerImpl( } }.getOrElse(offers) - val shuffledOffers = shuffleOffers(filteredOffers) - // Build a list of tasks to assign to each worker. - val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)) - val availableCpus = shuffledOffers.map(o => o.cores).toArray - val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum + var tasks: Seq[Seq[TaskDescription]] = Nil val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( @@ -428,11 +428,36 @@ private[spark] class TaskSchedulerImpl( } } + // If shuffle-biased task scheduling is enabled, then first assign as many tasks as possible to + // executors containing active shuffle files, followed by assigning to executors with inactive + // shuffle files, and then finally to those without shuffle files. This bin packing allows for + // more efficient dynamic allocation in the absence of an external shuffle service. + val partitionedAndShuffledOffers = partitionAndShuffleOffers(filteredOffers) + for (shuffledOffers <- partitionedAndShuffledOffers.map(_._2)) { + tasks ++= doResourceOffers(shuffledOffers, sortedTaskSets) + } + + // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get + // launched within a configured time. + if (tasks.size > 0) { + hasLaunchedTask = true + } + return tasks + } + + private def doResourceOffers( + shuffledOffers: IndexedSeq[WorkerOffer], + sortedTaskSets: IndexedSeq[TaskSetManager]): Seq[Seq[TaskDescription]] = { + // Build a list of tasks to assign to each worker. + val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)) + val availableCpus = shuffledOffers.map(o => o.cores).toArray + val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum + // Take each TaskSet in our scheduling order, and then offer it each node in increasing order // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + // Skip the barrier taskSet if the available slots are less than the number of pending tasks if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { // Skip the launch process. // TODO SPARK-24819 If the job requires more slots than available (both busy and free @@ -520,18 +545,33 @@ private[spark] class TaskSchedulerImpl( .mkString(",") addressesWithDescs.foreach(_._2.properties.setProperty("addresses", addressesStr)) - logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for barrier " + - s"stage ${taskSet.stageId}.") + logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for " + + s"barrier stage ${taskSet.stageId}.") } } } + tasks + } - // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get - // launched within a configured time. - if (tasks.size > 0) { - hasLaunchedTask = true + /** + * Shuffle offers around to avoid always placing tasks on the same workers. + * If shuffle-biased task scheduling is enabled, this function partitions the offers based on + * whether they have active/inactive/no shuffle files present. + */ + def partitionAndShuffleOffers(offers: IndexedSeq[WorkerOffer]) + : IndexedSeq[(ExecutorShuffleStatus.Value, IndexedSeq[WorkerOffer])] = { + if (shuffleBiasedTaskSchedulingEnabled && offers.length > 1) { + // bias towards executors that have active shuffle outputs + val execShuffles = mapOutputTracker.getExecutorShuffleStatus + offers + .groupBy(offer => execShuffles.getOrElse(offer.executorId, ExecutorShuffleStatus.Unknown)) + .mapValues(doShuffleOffers) + .toStream + .sortBy(_._1) // order: Active, Inactive, Unknown + .toIndexedSeq + } else { + IndexedSeq((ExecutorShuffleStatus.Unknown, doShuffleOffers(offers))) } - return tasks } private def createUnschedulableTaskSetAbortTimer( @@ -552,10 +592,10 @@ private[spark] class TaskSchedulerImpl( } /** - * Shuffle offers around to avoid always placing tasks on the same workers. Exposed to allow - * overriding in tests, so it can be deterministic. + * Does the shuffling for [[partitionAndShuffleOffers()]]. Exposed to allow overriding in tests, + * so that it can be deterministic. */ - protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = { + protected def doShuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = { Random.shuffle(offers) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 1a81f556e0612..fb5ae5af79b18 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -29,8 +29,10 @@ import org.scalatest.concurrent.Eventually import org.scalatest.mockito.MockitoSugar import org.apache.spark._ +import org.apache.spark.ExecutorShuffleStatus._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config +import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.ManualClock class FakeSchedulerBackend extends SchedulerBackend { @@ -1030,7 +1032,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // We customize the task scheduler just to let us control the way offers are shuffled, so we // can be sure we try both permutations, and to control the clock on the tasksetmanager. val taskScheduler = new TaskSchedulerImpl(sc) { - override def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = { + override def doShuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = { // Don't shuffle the offers around for this test. Instead, we'll just pass in all // the permutations we care about directly. offers @@ -1067,6 +1069,40 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } } + test("Shuffle-biased task scheduling enabled should lead to non-random offer shuffling") { + setupScheduler("spark.scheduler.shuffleBiasedTaskScheduling.enabled" -> "true") + + // Make offers in different executors, so they can be a mix of active, inactive, unknown + val offers = IndexedSeq( + WorkerOffer("exec1", "host1", 2), // inactive + WorkerOffer("exec2", "host2", 2), // active + WorkerOffer("exec3", "host3", 2) // unknown + ) + val makeMapStatus = (offer: WorkerOffer) => + MapStatus(BlockManagerId(offer.executorId, offer.host, 1), Array(10)) + val mapOutputTracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + mapOutputTracker.registerShuffle(0, 2) + mapOutputTracker.registerShuffle(1, 1) + mapOutputTracker.registerMapOutput(0, 0, makeMapStatus(offers(0))) + mapOutputTracker.registerMapOutput(0, 1, makeMapStatus(offers(1))) + mapOutputTracker.registerMapOutput(1, 0, makeMapStatus(offers(1))) + mapOutputTracker.markShuffleInactive(0) + + val execStatus = mapOutputTracker.getExecutorShuffleStatus + assert(execStatus.equals(Map("exec1" -> Inactive, "exec2" -> Active))) + + assert(taskScheduler.partitionAndShuffleOffers(offers).map(_._1) + .equals(IndexedSeq(Active, Inactive, Unknown))) + assert(taskScheduler.partitionAndShuffleOffers(offers).flatMap(_._2).map(offers.indexOf(_)) + .equals(IndexedSeq(1, 0, 2))) + + taskScheduler.submitTasks(FakeTask.createTaskSet(3, stageId = 1, stageAttemptId = 0)) + // should go to active first, then inactive + val taskDescs = taskScheduler.resourceOffers(offers).flatten + assert(taskDescs.size === 3) + assert(taskDescs.map(_.executorId).equals(Seq("exec2", "exec2", "exec1"))) + } + test("With delay scheduling off, tasks can be run at any locality level immediately") { val conf = new SparkConf() .set(config.LOCALITY_WAIT.key, "0") diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DynamicAllocationTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DynamicAllocationTestsSuite.scala index 9bcec1fe9ec21..c74b9a803b43b 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DynamicAllocationTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DynamicAllocationTestsSuite.scala @@ -83,6 +83,7 @@ private[spark] trait DynamicAllocationTestsSuite { k8sSuite: KubernetesSuite => .addToArgs("--conf", "spark.dynamicAllocation.enabled=true") .addToArgs("--conf", "spark.dynamicAllocation.minExecutors=0") .addToArgs("--conf", "spark.dynamicAllocation.maxExecutors=1") + .addToArgs("--conf", "spark.scheduler.shuffleBiasedTaskScheduling.enabled=true") .addToArgs("--conf", s"spark.driver.host=" + s"${driverService.getMetadata.getName}.${kubernetesTestComponents.namespace}.svc") From 42de6cfaa99924c44b080ad4e49d8a496b63b81c Mon Sep 17 00:00:00 2001 From: Will Manning Date: Wed, 13 Mar 2019 18:02:15 +0000 Subject: [PATCH 5/8] improve comments and move config to standard location --- .../apache/spark/ExecutorAllocationManager.scala | 4 ++-- .../scala/org/apache/spark/MapOutputTracker.scala | 13 +++++++------ .../org/apache/spark/internal/config/package.scala | 4 ++++ .../spark/ExecutorAllocationManagerSuite.scala | 2 +- 4 files changed, 14 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 9f93ae0306991..861db5a0a36b1 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -119,8 +119,8 @@ private[spark] class ExecutorAllocationManager( private val cachedExecutorIdleTimeoutS = conf.get(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT) - private val inactiveShuffleExecutorIdleTimeoutS = conf.getTimeAsSeconds( - "spark.dynamicAllocation.inactiveShuffleExecutorIdleTimeout", s"${Integer.MAX_VALUE}s") + private val inactiveShuffleExecutorIdleTimeoutS = + conf.get(DYN_ALLOCATION_INACTIVE_SHUFFLE_EXECUTOR_IDLE_TIMEOUT) private val externalShuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 83150960c70ab..67bf83632f56b 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -63,9 +63,9 @@ private class ShuffleStatus(numPartitions: Int) { val mapStatuses = new Array[MapStatus](numPartitions) /** - * Whether an active job in the [[org.apache.spark.scheduler.DAGScheduler]] depends on this. - * If dynamic allocation is enabled, then executors that do not contain active shuffles may - * eventually be surrendered by the [[ExecutorAllocationManager]]. + * Whether an active/pending stage in the [[org.apache.spark.scheduler.DAGScheduler]] depends + * on this. If dynamic allocation is enabled, then executors that do not contain active shuffles + * may eventually be surrendered by the [[ExecutorAllocationManager]]. */ var isActive = true @@ -93,9 +93,10 @@ private class ShuffleStatus(numPartitions: Int) { private[this] var _numAvailableOutputs: Int = 0 /** - * Cached set of executorIds on which outputs exist. This is a performance optimization to avoid - * having to repeatedly iterate over ever element in the `mapStatuses` array and should be - * equivalent to `mapStatuses.map(_.location.executorId).groupBy(x => x).mapValues(_.length)`. + * Cached mapping of executorIds to the number of outputs associated with those executors. This + * is a performance optimization to avoid having to repeatedly iterate over ever element in the + * `mapStatuses` array and should be equivalent to + * `mapStatuses.map(_.location.executorId).groupBy(x => x).mapValues(_.length)`. */ private[this] val _numOutputsPerExecutorId = HashMap[String, Int]().withDefaultValue(0) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 850d6845684ba..d5659abc06ece 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -339,6 +339,10 @@ package object config { ConfigBuilder("spark.dynamicAllocation.cachedExecutorIdleTimeout") .timeConf(TimeUnit.SECONDS).createWithDefault(Integer.MAX_VALUE) + private[spark] val DYN_ALLOCATION_INACTIVE_SHUFFLE_EXECUTOR_IDLE_TIMEOUT = + ConfigBuilder("spark.dynamicAllocation.inactiveShuffleExecutorIdleTimeout") + .timeConf(TimeUnit.SECONDS).createWithDefault(Integer.MAX_VALUE) + private[spark] val DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT = ConfigBuilder("spark.dynamicAllocation.executorIdleTimeout") .timeConf(TimeUnit.SECONDS).createWithDefault(60) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index ea67bd1e4bd3e..e5f442c16d289 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -618,7 +618,7 @@ class ExecutorAllocationManagerSuite .set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, s"${executorIdleTimeout}s") .set(config.DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT.key, s"${cachedExecutorIdleTimeout}s") - .set("spark.dynamicAllocation.inactiveShuffleExecutorIdleTimeout", + .set(config.DYN_ALLOCATION_INACTIVE_SHUFFLE_EXECUTOR_IDLE_TIMEOUT.key, s"${inactiveShuffleExecutorIdleTimeout}s") val mockAllocationClient = mock(classOf[ExecutorAllocationClient]) val mockMOTM = mock(classOf[MapOutputTrackerMaster]) From 740632d1aef7afaeac4e174b9c5f4a07bd4775d7 Mon Sep 17 00:00:00 2001 From: Will Manning Date: Wed, 13 Mar 2019 18:33:51 +0000 Subject: [PATCH 6/8] fix error from cherry pick --- .../cluster/k8s/ExecutorPodsAllocator.scala | 89 +++++++++---------- 1 file changed, 44 insertions(+), 45 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 10084da1348da..527487f8f8577 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -115,52 +115,51 @@ private[spark] class ExecutorPodsAllocator( private def requestExecutorsIfNecessary(applicationId: String, snapshot: ExecutorPodsSnapshot): Unit = { - val currentRunningExecutors = snapshot.executorPods.values.count { - case PodRunning(_) => true - case _ => false - } - val currentPendingExecutors = snapshot.executorPods.values.count { - case PodPending(_) => true - case _ => false - } - val currentTotalExpectedExecutors = totalExpectedExecutors.get - logDebug(s"Currently have $currentRunningExecutors running executors and" + - s" $currentPendingExecutors pending executors. $newlyCreatedExecutors executors" + - s" have been requested but are pending appearance in the cluster.") - if (newlyCreatedExecutors.isEmpty - && currentPendingExecutors == 0 - && currentRunningExecutors < currentTotalExpectedExecutors) { - val numExecutorsToAllocate = math.min( - currentTotalExpectedExecutors - currentRunningExecutors, podAllocationSize) - logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes.") - for ( _ <- 0 until numExecutorsToAllocate) { - val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet() - val executorConf = KubernetesConf.createExecutorConf( - conf, - newExecutorId.toString, - applicationId, - driverPod) - val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr, - kubernetesClient) - val podWithAttachedContainer = new PodBuilder(executorPod.pod) - .editOrNewSpec() - .addToContainers(executorPod.container) - .endSpec() - .build() - kubernetesClient.pods().create(podWithAttachedContainer) - newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis() - } - } else if (currentRunningExecutors >= currentTotalExpectedExecutors) { - // TODO handle edge cases if we end up with more running executors than expected. - logDebug("Current number of running executors is equal to the number of requested" + - " executors. Not scaling up further.") - } else if (newlyCreatedExecutors.nonEmpty || currentPendingExecutors != 0) { - logDebug(s"Still waiting for ${newlyCreatedExecutors.size + currentPendingExecutors}" + - s" executors to begin running before requesting for more executors. # of executors in" + - s" pending status in the cluster: $currentPendingExecutors. # of executors that we have" + - s" created but we have not observed as being present in the cluster yet:" + - s" ${newlyCreatedExecutors.size}.") + val currentRunningExecutors = snapshot.executorPods.values.count { + case PodRunning(_) => true + case _ => false + } + val currentPendingExecutors = snapshot.executorPods.values.count { + case PodPending(_) => true + case _ => false + } + val currentTotalExpectedExecutors = totalExpectedExecutors.get + logDebug(s"Currently have $currentRunningExecutors running executors and" + + s" $currentPendingExecutors pending executors. $newlyCreatedExecutors executors" + + s" have been requested but are pending appearance in the cluster.") + if (newlyCreatedExecutors.isEmpty + && currentPendingExecutors == 0 + && currentRunningExecutors < currentTotalExpectedExecutors) { + val numExecutorsToAllocate = math.min( + currentTotalExpectedExecutors - currentRunningExecutors, podAllocationSize) + logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes.") + for (_ <- 0 until numExecutorsToAllocate) { + val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet() + val executorConf = KubernetesConf.createExecutorConf( + conf, + newExecutorId.toString, + applicationId, + driverPod) + val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr, + kubernetesClient) + val podWithAttachedContainer = new PodBuilder(executorPod.pod) + .editOrNewSpec() + .addToContainers(executorPod.container) + .endSpec() + .build() + kubernetesClient.pods().create(podWithAttachedContainer) + newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis() } + } else if (currentRunningExecutors >= currentTotalExpectedExecutors) { + // TODO handle edge cases if we end up with more running executors than expected. + logDebug("Current number of running executors is equal to the number of requested" + + " executors. Not scaling up further.") + } else if (newlyCreatedExecutors.nonEmpty || currentPendingExecutors != 0) { + logDebug(s"Still waiting for ${newlyCreatedExecutors.size + currentPendingExecutors}" + + s" executors to begin running before requesting for more executors. # of executors in" + + s" pending status in the cluster: $currentPendingExecutors. # of executors that we have" + + s" created but we have not observed as being present in the cluster yet:" + + s" ${newlyCreatedExecutors.size}.") } } } From 0fe8cdb4e332562b0741d1fff72f8bbb838770ed Mon Sep 17 00:00:00 2001 From: Will Manning Date: Wed, 13 Mar 2019 18:42:53 +0000 Subject: [PATCH 7/8] resolve more conflicts --- .../cluster/k8s/ExecutorPodsAllocator.scala | 30 +++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 527487f8f8577..b9d3e5e7aca96 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -18,6 +18,17 @@ package org.apache.spark.scheduler.cluster.k8s import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} +import io.fabric8.kubernetes.api.model.PodBuilder +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.mutable + +import org.apache.spark.{SecurityManager, SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging +import org.apache.spark.util.{Clock, Utils} + private[spark] class ExecutorPodsAllocator( conf: SparkConf, secMgr: SecurityManager, @@ -68,10 +79,16 @@ private[spark] class ExecutorPodsAllocator( def setTotalExpectedExecutors(total: Int): Unit = { logDebug("Setting total expected executors", SafeArg.of("totalExpectedExecutors", total)) totalExpectedExecutors.set(total) - appId.foreach { id => latestSnapshot.foreach { requestExecutorsIfNecessary(id, _) } } + appId.foreach { id => latestSnapshot.foreach { + requestExecutorsIfNecessary(id, _) + } + } } - private def onNewSnapshots(applicationId: String, snapshots: Seq[ExecutorPodsSnapshot]): Unit = { + private def onNewSnapshots( + applicationId: String, + snapshots: Seq[ExecutorPodsSnapshot] + ): Unit = { this.appId = Some(applicationId) newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys) // For all executors we've created against the API but have not seen in a snapshot @@ -113,8 +130,10 @@ private[spark] class ExecutorPodsAllocator( } } - private def requestExecutorsIfNecessary(applicationId: String, - snapshot: ExecutorPodsSnapshot): Unit = { + private def requestExecutorsIfNecessary( + applicationId: String, + snapshot: ExecutorPodsSnapshot + ): Unit = { val currentRunningExecutors = snapshot.executorPods.values.count { case PodRunning(_) => true case _ => false @@ -149,6 +168,7 @@ private[spark] class ExecutorPodsAllocator( .build() kubernetesClient.pods().create(podWithAttachedContainer) newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis() + logDebug(s"Requested executor with id $newExecutorId from Kubernetes.") } } else if (currentRunningExecutors >= currentTotalExpectedExecutors) { // TODO handle edge cases if we end up with more running executors than expected. @@ -162,4 +182,4 @@ private[spark] class ExecutorPodsAllocator( s" ${newlyCreatedExecutors.size}.") } } -} +} \ No newline at end of file From 9318f99c660308ddcd9277f9b2f9db437a6ed377 Mon Sep 17 00:00:00 2001 From: Will Manning Date: Wed, 13 Mar 2019 18:48:47 +0000 Subject: [PATCH 8/8] fix weird formatting --- .../cluster/k8s/ExecutorPodsAllocator.scala | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index b9d3e5e7aca96..d2b97f5c22037 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -79,16 +79,15 @@ private[spark] class ExecutorPodsAllocator( def setTotalExpectedExecutors(total: Int): Unit = { logDebug("Setting total expected executors", SafeArg.of("totalExpectedExecutors", total)) totalExpectedExecutors.set(total) - appId.foreach { id => latestSnapshot.foreach { - requestExecutorsIfNecessary(id, _) - } + appId.foreach { id => + latestSnapshot.foreach { + requestExecutorsIfNecessary(id, _) + } } } - private def onNewSnapshots( - applicationId: String, - snapshots: Seq[ExecutorPodsSnapshot] - ): Unit = { + private def onNewSnapshots(applicationId: String, + snapshots: Seq[ExecutorPodsSnapshot]): Unit = { this.appId = Some(applicationId) newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys) // For all executors we've created against the API but have not seen in a snapshot @@ -130,10 +129,8 @@ private[spark] class ExecutorPodsAllocator( } } - private def requestExecutorsIfNecessary( - applicationId: String, - snapshot: ExecutorPodsSnapshot - ): Unit = { + private def requestExecutorsIfNecessary(applicationId: String, + snapshot: ExecutorPodsSnapshot): Unit = { val currentRunningExecutors = snapshot.executorPods.values.count { case PodRunning(_) => true case _ => false @@ -182,4 +179,4 @@ private[spark] class ExecutorPodsAllocator( s" ${newlyCreatedExecutors.size}.") } } -} \ No newline at end of file +}