Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27754][K8S] Introduce additional config (spark.kubernetes.driver.request.cores) for driver request cores for spark on k8s #24630

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,15 @@ See the [configuration page](configuration.html) for information on Spark config
Interval between reports of the current Spark job status in cluster mode.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.request.cores</code></td>
<td>(none)</td>
<td>
Specify the cpu request for the driver pod. Values conform to the Kubernetes <a href="https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu">convention</a>.
Example values include 0.1, 500m, 1.5, 5, etc., with the definition of cpu units documented in <a href="https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#cpu-units">CPU units</a>.
This takes precedence over <code>spark.driver.cores</code> for specifying the driver pod cpu request if set.
dongjoon-hyun marked this conversation as resolved.
Show resolved Hide resolved
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.limit.cores</code></td>
<td>(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_DRIVER_REQUEST_CORES =
ConfigBuilder("spark.kubernetes.driver.request.cores")
.doc("Specify the cpu request for the driver pod")
.stringConf
.createOptional

val KUBERNETES_DRIVER_SUBMIT_CHECK =
ConfigBuilder("spark.kubernetes.submitInDriver")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
.getOrElse(throw new SparkException("Must specify the driver container image"))

// CPU settings
private val driverCpuCores = conf.get(DRIVER_CORES.key, "1")
private val driverCpuCores = conf.get(DRIVER_CORES)
private val driverCoresRequest = conf
.get(KUBERNETES_DRIVER_REQUEST_CORES)
.getOrElse(driverCpuCores.toString)
private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES)

// Memory settings
Expand Down Expand Up @@ -77,7 +80,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
}

val driverCpuQuantity = new QuantityBuilder(false)
.withAmount(driverCpuCores)
.withAmount(driverCoresRequest)
.build()
val driverMemoryQuantity = new QuantityBuilder(false)
.withAmount(s"${driverMemoryWithOverheadMiB}Mi")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,38 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf)
}

test("Check driver pod respects kubernetes driver request cores") {
val sparkConf = new SparkConf()
.set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
.set(CONTAINER_IMAGE, "spark-driver:latest")

val basePod = SparkPod.initialPod()
// if spark.driver.cores is not set default is 1
val requests1 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf))
.configurePod(basePod)
.container.getResources
.getRequests.asScala
assert(requests1("cpu").getAmount === "1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid this assumption? You had better get the default value of DRIVER_CORES and compare with that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, as per the existing logic the default value would be "1" if spark.driver.cores is not set and not the default value of spark.driver.cores which also happens to be 1. I did not want to change that logic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arunmahadevan . What I meant was this constant assertion, === "1". If we change the default value of that configuration, this test will fails. We had better get and use the default value from the conf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun , the logic in BasicDriverFeatureStep
private val driverCpuCores = conf.get(DRIVER_CORES.key, "1")
, sets the value of driverCpuCores to "1" if spark.driver.cores is not set in the spark conf. i.e. we don't use the DRIVER_CORES.defaultValue there. Let me know if my understanding is correct.
If so I cannot do something like assert(requests1("cpu").getAmount === DRIVER_CORES.defaultValue) here (assume this is what you are suggesting).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be the following because we should not have a magic number in the code.

-  private val driverCpuCores = conf.get(DRIVER_CORES.key, "1")
+  private val driverCpuCores = conf.get(DRIVER_CORES)
     val driverCpuQuantity = new QuantityBuilder(false)
-      .withAmount(driverCpuCores)
+      .withAmount(driverCpuCores.toString)

Please fix like the above. And don't use magic numbers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.


// if spark.driver.cores is set it should be used
sparkConf.set(DRIVER_CORES, 10)
val requests2 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf))
.configurePod(basePod)
.container.getResources
.getRequests.asScala
assert(requests2("cpu").getAmount === "10")

// spark.kubernetes.driver.request.cores should be preferred over spark.driver.cores
Seq("0.1", "100m").foreach { value =>
sparkConf.set(KUBERNETES_DRIVER_REQUEST_CORES, value)
val requests3 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf))
.configurePod(basePod)
.container.getResources
.getRequests.asScala
assert(requests3("cpu").getAmount === value)
}
}

test("Check appropriate entrypoint rerouting for various bindings") {
val javaSparkConf = new SparkConf()
.set(DRIVER_MEMORY.key, "4g")
Expand Down