Skip to content

Commit

Permalink
[SPARK-27754][K8S] Introduce additional config (spark.kubernetes.driv…
Browse files Browse the repository at this point in the history
…er.request.cores) for driver request cores for spark on k8s

## What changes were proposed in this pull request?

Spark on k8s supports config for specifying the executor cpu requests
(spark.kubernetes.executor.request.cores) but a similar config is missing
for the driver. Instead, currently `spark.driver.cores` value is used for integer value.

Although `pod spec` can have `cpu` for the fine-grained control like the following, this PR proposes additional configuration `spark.kubernetes.driver.request.cores` for driver request cores.
```
resources:
  requests:
    memory: "64Mi"
    cpu: "250m"
```

## How was this patch tested?

Unit tests

Closes #24630 from arunmahadevan/SPARK-27754.

Authored-by: Arun Mahadevan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
arunmahadevan authored and dongjoon-hyun committed May 19, 2019
1 parent 9bca99b commit 1a8c093
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 2 deletions.
9 changes: 9 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,15 @@ See the [configuration page](configuration.html) for information on Spark config
Interval between reports of the current Spark job status in cluster mode.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.request.cores</code></td>
<td>(none)</td>
<td>
Specify the cpu request for the driver pod. Values conform to the Kubernetes <a href="https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu">convention</a>.
Example values include 0.1, 500m, 1.5, 5, etc., with the definition of cpu units documented in <a href="https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#cpu-units">CPU units</a>.
This takes precedence over <code>spark.driver.cores</code> for specifying the driver pod cpu request if set.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.limit.cores</code></td>
<td>(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_DRIVER_REQUEST_CORES =
ConfigBuilder("spark.kubernetes.driver.request.cores")
.doc("Specify the cpu request for the driver pod")
.stringConf
.createOptional

val KUBERNETES_DRIVER_SUBMIT_CHECK =
ConfigBuilder("spark.kubernetes.submitInDriver")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
.getOrElse(throw new SparkException("Must specify the driver container image"))

// CPU settings
private val driverCpuCores = conf.get(DRIVER_CORES.key, "1")
private val driverCpuCores = conf.get(DRIVER_CORES)
private val driverCoresRequest = conf
.get(KUBERNETES_DRIVER_REQUEST_CORES)
.getOrElse(driverCpuCores.toString)
private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES)

// Memory settings
Expand Down Expand Up @@ -77,7 +80,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
}

val driverCpuQuantity = new QuantityBuilder(false)
.withAmount(driverCpuCores)
.withAmount(driverCoresRequest)
.build()
val driverMemoryQuantity = new QuantityBuilder(false)
.withAmount(s"${driverMemoryWithOverheadMiB}Mi")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,38 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf)
}

test("Check driver pod respects kubernetes driver request cores") {
val sparkConf = new SparkConf()
.set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
.set(CONTAINER_IMAGE, "spark-driver:latest")

val basePod = SparkPod.initialPod()
// if spark.driver.cores is not set default is 1
val requests1 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf))
.configurePod(basePod)
.container.getResources
.getRequests.asScala
assert(requests1("cpu").getAmount === "1")

// if spark.driver.cores is set it should be used
sparkConf.set(DRIVER_CORES, 10)
val requests2 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf))
.configurePod(basePod)
.container.getResources
.getRequests.asScala
assert(requests2("cpu").getAmount === "10")

// spark.kubernetes.driver.request.cores should be preferred over spark.driver.cores
Seq("0.1", "100m").foreach { value =>
sparkConf.set(KUBERNETES_DRIVER_REQUEST_CORES, value)
val requests3 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf))
.configurePod(basePod)
.container.getResources
.getRequests.asScala
assert(requests3("cpu").getAmount === value)
}
}

test("Check appropriate entrypoint rerouting for various bindings") {
val javaSparkConf = new SparkConf()
.set(DRIVER_MEMORY.key, "4g")
Expand Down

0 comments on commit 1a8c093

Please sign in to comment.