Skip to content

Commit

Permalink
Introduce config for driver request cores
Browse files Browse the repository at this point in the history
Spark on k8s supports config for specifying the executor cpu requests
(spark.kubernetes.executor.request.cores) but a similar config is missing
for the driver. Apparently `spark.driver.cores` works but its not evident that this accepts
fractional values (its defined as an Integer config but apparently
accepts decimals). To keep in sync
with the executor config a similar driver config can be
introduced (spark.kubernetes.driver.request.cores) for explicitly specifying
the driver CPU requests. If not provided, the value will default to `spark.driver.cores` as before.
  • Loading branch information
arunmahadevan committed May 16, 2019
1 parent c6a45e6 commit 4002b74
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 1 deletion.
9 changes: 9 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,15 @@ See the [configuration page](configuration.html) for information on Spark config
Interval between reports of the current Spark job status in cluster mode.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.request.cores</code></td>
<td>(none)</td>
<td>
Specify the cpu request for the driver pod. Values conform to the Kubernetes <a href="https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu">convention</a>.
Example values include 0.1, 500m, 1.5, 5, etc., with the definition of cpu units documented in <a href="https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#cpu-units">CPU units</a>.
This takes precedence over <code>spark.driver.cores</code> for specifying the driver pod cpu request if set.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.limit.cores</code></td>
<td>(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_DRIVER_REQUEST_CORES =
ConfigBuilder("spark.kubernetes.driver.request.cores")
.doc("Specify the cpu request for the driver pod")
.stringConf
.createOptional

val KUBERNETES_DRIVER_SUBMIT_CHECK =
ConfigBuilder("spark.kubernetes.submitInDriver")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)

// CPU settings
private val driverCpuCores = conf.get(DRIVER_CORES.key, "1")
private val driverCoresRequest = if (conf.contains(KUBERNETES_DRIVER_REQUEST_CORES)) {
conf.get(KUBERNETES_DRIVER_REQUEST_CORES).get
} else {
driverCpuCores
}
private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES)

// Memory settings
Expand Down Expand Up @@ -77,7 +82,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
}

val driverCpuQuantity = new QuantityBuilder(false)
.withAmount(driverCpuCores)
.withAmount(driverCoresRequest)
.build()
val driverMemoryQuantity = new QuantityBuilder(false)
.withAmount(s"${driverMemoryWithOverheadMiB}Mi")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,33 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf)
}

test("Check driver pod respects kubernetes driver request cores") {
val sparkConf = new SparkConf()
.set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
.set(CONTAINER_IMAGE, "spark-driver:latest")

val basePod = SparkPod.initialPod()
val requests1 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf))
.configurePod(basePod)
.container.getResources
.getRequests.asScala
assert(requests1("cpu").getAmount === "1")

sparkConf.set(KUBERNETES_DRIVER_REQUEST_CORES, "0.1")
val requests2 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf))
.configurePod(basePod)
.container.getResources
.getRequests.asScala
assert(requests2("cpu").getAmount === "0.1")

sparkConf.set(KUBERNETES_DRIVER_REQUEST_CORES, "100m")
val requests3 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf))
.configurePod(basePod)
.container.getResources
.getRequests.asScala
assert(requests3("cpu").getAmount === "100m")
}

test("Check appropriate entrypoint rerouting for various bindings") {
val javaSparkConf = new SparkConf()
.set(DRIVER_MEMORY.key, "4g")
Expand Down

0 comments on commit 4002b74

Please sign in to comment.