From 997fa42a0d530c0c24828419be496aeb6ee16690 Mon Sep 17 00:00:00 2001 From: Julien Cuquemelle Date: Thu, 30 Nov 2017 16:28:06 +0000 Subject: [PATCH 1/2] [SPARK-22683][CORE] Allow tuning the number of dynamically allocated executors let's say an executor has spark.executor.cores / spark.task.cpus taskSlots The current dynamic allocation policy allocates enough executors to have each taskSlot execute a single task, which wastes resources when tasks are small regarding executor allocation overhead. By adding the tasksPerExecutorSlot, it is made possible to specify how many tasks a single slot should ideally execute to mitigate the overhead of executor allocation. --- .../spark/ExecutorAllocationManager.scala | 6 +++- .../ExecutorAllocationManagerSuite.scala | 33 +++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 632d5f260b737..f98bbc6be10ed 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -114,9 +114,13 @@ private[spark] class ExecutorAllocationManager( // TODO: The default value of 1 for spark.executor.cores works right now because dynamic // allocation is only supported for YARN and the default number of cores per executor in YARN is // 1, but it might need to be attained differently for different cluster managers - private val tasksPerExecutor = + private val taskSlotPerExecutor = conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1) + private val tasksPerExecutorSlot = conf.getInt("spark.dynamicAllocation.tasksPerExecutorSlot", 1) + + private val tasksPerExecutor = tasksPerExecutorSlot * taskSlotPerExecutor + validateSettings() // Number of executors to add in the next round diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index b9ce71a0c5254..593efb112f9ae 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -137,6 +137,34 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) } + test("tasksPerExecutorSlot is correctly handled") { + val conf = new SparkConf() + .setMaster("myDummyLocalExternalClusterManager") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + + val sc0 = new SparkContext(conf) + contexts += sc0 + var manager = sc0.executorAllocationManager.get + assert(tasksPerExecutor(manager) === 1) + sc0.stop() + + val conf1 = conf.clone.set("spark.dynamicAllocation.tasksPerExecutorSlot", "2") + val sc1 = new SparkContext(conf1) + contexts += sc1 + manager = sc1.executorAllocationManager.get + assert(tasksPerExecutor(manager) === 2) + sc1.stop() + + val conf2 = conf1.clone.set("spark.executor.cores", "2") + val sc2 = new SparkContext(conf2) + contexts += sc2 + manager = sc2.executorAllocationManager.get + assert(tasksPerExecutor(manager) === 4) + sc2.stop() + } + test("add executors capped by num pending tasks") { sc = createSparkContext(0, 10, 0) val manager = sc.executorAllocationManager.get @@ -1061,6 +1089,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { private val _onExecutorBusy = PrivateMethod[Unit]('onExecutorBusy) private val _localityAwareTasks = PrivateMethod[Int]('localityAwareTasks) private val _hostToLocalTaskCount = PrivateMethod[Map[String, Int]]('hostToLocalTaskCount) + private val _tasksPerExecutor = PrivateMethod[Int]('tasksPerExecutor) private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = { manager invokePrivate _numExecutorsToAdd() @@ -1143,6 +1172,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { private def hostToLocalTaskCount(manager: ExecutorAllocationManager): Map[String, Int] = { manager invokePrivate _hostToLocalTaskCount() } + + private def tasksPerExecutor(manager: ExecutorAllocationManager): Int = { + manager invokePrivate _tasksPerExecutor() + } } /** From e480d69cbd1a42e492b46e6936496304ec2d5b2d Mon Sep 17 00:00:00 2001 From: Julien Cuquemelle Date: Thu, 30 Nov 2017 16:28:06 +0000 Subject: [PATCH 2/2] [SPARK-22683][DOC] document tasksPerExecutorSlot parameter --- docs/configuration.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/docs/configuration.md b/docs/configuration.md index c39c7580fd75a..081704f0fe061 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1584,6 +1584,7 @@ Apart from these, the following properties are also available, and may be useful spark.dynamicAllocation.minExecutors, spark.dynamicAllocation.maxExecutors, and spark.dynamicAllocation.initialExecutors + spark.dynamicAllocation.tasksPerExecutorSlots @@ -1628,6 +1629,19 @@ Apart from these, the following properties are also available, and may be useful Lower bound for the number of executors if dynamic allocation is enabled. + + spark.dynamicAllocation.tasksPerSlot + 1 + + Each executor can process a certain number of tasks in parallel (task slots). + The number of task slots per executor is: executor.cores / task.cpus. + The ExecutorAllocationManager will set a target number of running executors equal to: + nbCurrentTask / (taskSlots * tasksPerSlot), with nbCurrentTask being the total number + of running and backlogged tasks. With the default value of 1, each available task slot + will compute a single task in average, which gives the best latency. With small tasks + however, this setting wastes a lot of resources due to executor allocation overhead + + spark.dynamicAllocation.schedulerBacklogTimeout 1s