Skip to content

Commit

Permalink
[SPARK-27754][K8S] Introduce additional config (spark.kubernetes.driv…
Browse files Browse the repository at this point in the history
…er.request.cores) for driver request cores for spark on k8s

Spark on k8s supports config for specifying the executor cpu requests
(spark.kubernetes.executor.request.cores) but a similar config is missing
for the driver. Instead, currently `spark.driver.cores` value is used for integer value.

Although `pod spec` can have `cpu` for the fine-grained control like the following, this PR proposes additional configuration `spark.kubernetes.driver.request.cores` for driver request cores.
```
resources:
  requests:
    memory: "64Mi"
    cpu: "250m"
```

Unit tests

Closes apache#24630 from arunmahadevan/SPARK-27754.

Authored-by: Arun Mahadevan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
arunmahadevan authored and rshkv committed Nov 17, 2020
1 parent 9461e71 commit c4821da
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 2 deletions.
9 changes: 9 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,15 @@ See the [configuration page](configuration.html) for information on Spark config
Interval between reports of the current Spark job status in cluster mode.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.request.cores</code></td>
<td>(none)</td>
<td>
Specify the cpu request for the driver pod. Values conform to the Kubernetes <a href="https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu">convention</a>.
Example values include 0.1, 500m, 1.5, 5, etc., with the definition of cpu units documented in <a href="https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#cpu-units">CPU units</a>.
This takes precedence over <code>spark.driver.cores</code> for specifying the driver pod cpu request if set.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.limit.cores</code></td>
<td>(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_DRIVER_REQUEST_CORES =
ConfigBuilder("spark.kubernetes.driver.request.cores")
.doc("Specify the cpu request for the driver pod")
.stringConf
.createOptional

val KUBERNETES_DRIVER_SUBMIT_CHECK =
ConfigBuilder("spark.kubernetes.submitInDriver")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
.getOrElse(throw new SparkException("Must specify the driver container image"))

// CPU settings
private val driverCpuCores = conf.get(DRIVER_CORES.key, "1")
private val driverCpuCores = conf.get(DRIVER_CORES)
private val driverCoresRequest = conf
.get(KUBERNETES_DRIVER_REQUEST_CORES)
.getOrElse(driverCpuCores.toString)
private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES)

// Memory settings
Expand Down Expand Up @@ -75,7 +78,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
.build()
}

val driverCpuQuantity = new Quantity(driverCpuCores)
val driverCpuQuantity = new Quantity(driverCoresRequest)
val driverMemoryQuantity = new Quantity(s"${driverMemoryWithOverheadMiB}Mi")
val maybeCpuLimitQuantity = driverLimitCores.map { limitCores =>
("cpu", new Quantity(limitCores))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,38 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf)
}

test("Check driver pod respects kubernetes driver request cores") {
val sparkConf = new SparkConf()
.set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
.set(CONTAINER_IMAGE, "spark-driver:latest")

val basePod = SparkPod.initialPod()
// if spark.driver.cores is not set default is 1
val requests1 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf))
.configurePod(basePod)
.container.getResources
.getRequests.asScala
assert(amountAndFormat(requests1("cpu")) === "1")

// if spark.driver.cores is set it should be used
sparkConf.set(DRIVER_CORES, 10)
val requests2 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf))
.configurePod(basePod)
.container.getResources
.getRequests.asScala
assert(amountAndFormat(requests2("cpu")) === "10")

// spark.kubernetes.driver.request.cores should be preferred over spark.driver.cores
Seq("0.1", "100m").foreach { value =>
sparkConf.set(KUBERNETES_DRIVER_REQUEST_CORES, value)
val requests3 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf))
.configurePod(basePod)
.container.getResources
.getRequests.asScala
assert(amountAndFormat(requests3("cpu")) === value)
}
}

test("Check appropriate entrypoint rerouting for various bindings") {
val javaSparkConf = new SparkConf()
.set(DRIVER_MEMORY.key, "4g")
Expand Down

0 comments on commit c4821da

Please sign in to comment.