diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 632d5f260b737..f98bbc6be10ed 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -114,9 +114,13 @@ private[spark] class ExecutorAllocationManager(
// TODO: The default value of 1 for spark.executor.cores works right now because dynamic
// allocation is only supported for YARN and the default number of cores per executor in YARN is
// 1, but it might need to be attained differently for different cluster managers
- private val tasksPerExecutor =
+ private val taskSlotPerExecutor =
conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)
+ private val tasksPerExecutorSlot = conf.getInt("spark.dynamicAllocation.tasksPerExecutorSlot", 1)
+
+ private val tasksPerExecutor = tasksPerExecutorSlot * taskSlotPerExecutor
+
validateSettings()
// Number of executors to add in the next round
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index b9ce71a0c5254..593efb112f9ae 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -137,6 +137,34 @@ class ExecutorAllocationManagerSuite
assert(numExecutorsToAdd(manager) === 1)
}
+ test("tasksPerExecutorSlot is correctly handled") {
+ val conf = new SparkConf()
+ .setMaster("myDummyLocalExternalClusterManager")
+ .setAppName("test-executor-allocation-manager")
+ .set("spark.dynamicAllocation.enabled", "true")
+ .set("spark.dynamicAllocation.testing", "true")
+
+ val sc0 = new SparkContext(conf)
+ contexts += sc0
+ var manager = sc0.executorAllocationManager.get
+ assert(tasksPerExecutor(manager) === 1)
+ sc0.stop()
+
+ val conf1 = conf.clone.set("spark.dynamicAllocation.tasksPerExecutorSlot", "2")
+ val sc1 = new SparkContext(conf1)
+ contexts += sc1
+ manager = sc1.executorAllocationManager.get
+ assert(tasksPerExecutor(manager) === 2)
+ sc1.stop()
+
+ val conf2 = conf1.clone.set("spark.executor.cores", "2")
+ val sc2 = new SparkContext(conf2)
+ contexts += sc2
+ manager = sc2.executorAllocationManager.get
+ assert(tasksPerExecutor(manager) === 4)
+ sc2.stop()
+ }
+
test("add executors capped by num pending tasks") {
sc = createSparkContext(0, 10, 0)
val manager = sc.executorAllocationManager.get
@@ -1061,6 +1089,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private val _onExecutorBusy = PrivateMethod[Unit]('onExecutorBusy)
private val _localityAwareTasks = PrivateMethod[Int]('localityAwareTasks)
private val _hostToLocalTaskCount = PrivateMethod[Map[String, Int]]('hostToLocalTaskCount)
+ private val _tasksPerExecutor = PrivateMethod[Int]('tasksPerExecutor)
private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = {
manager invokePrivate _numExecutorsToAdd()
@@ -1143,6 +1172,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private def hostToLocalTaskCount(manager: ExecutorAllocationManager): Map[String, Int] = {
manager invokePrivate _hostToLocalTaskCount()
}
+
+ private def tasksPerExecutor(manager: ExecutorAllocationManager): Int = {
+ manager invokePrivate _tasksPerExecutor()
+ }
}
/**
diff --git a/docs/configuration.md b/docs/configuration.md
index c39c7580fd75a..081704f0fe061 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1584,6 +1584,7 @@ Apart from these, the following properties are also available, and may be useful
spark.dynamicAllocation.minExecutors
,
spark.dynamicAllocation.maxExecutors
, and
spark.dynamicAllocation.initialExecutors
+ spark.dynamicAllocation.tasksPerExecutorSlots
spark.dynamicAllocation.tasksPerSlot
spark.dynamicAllocation.schedulerBacklogTimeout