From 8b3df2ae9a6202dd418a2548be46a2e51de751c8 Mon Sep 17 00:00:00 2001 From: Arun Mahadevan Date: Sat, 18 May 2019 21:28:46 -0700 Subject: [PATCH] [SPARK-27754][K8S] Introduce additional config (spark.kubernetes.driver.request.cores) for driver request cores for spark on k8s Spark on k8s supports config for specifying the executor cpu requests (spark.kubernetes.executor.request.cores) but a similar config is missing for the driver. Instead, currently `spark.driver.cores` value is used for integer value. Although `pod spec` can have `cpu` for the fine-grained control like the following, this PR proposes additional configuration `spark.kubernetes.driver.request.cores` for driver request cores. ``` resources: requests: memory: "64Mi" cpu: "250m" ``` Unit tests Closes #24630 from arunmahadevan/SPARK-27754. Authored-by: Arun Mahadevan Signed-off-by: Dongjoon Hyun --- docs/running-on-kubernetes.md | 9 ++++++ .../org/apache/spark/deploy/k8s/Config.scala | 6 ++++ .../k8s/features/BasicDriverFeatureStep.scala | 7 ++-- .../BasicDriverFeatureStepSuite.scala | 32 +++++++++++++++++++ 4 files changed, 52 insertions(+), 2 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index e86b23bd334ca..b289b898297c8 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -749,6 +749,15 @@ See the [configuration page](configuration.html) for information on Spark config Interval between reports of the current Spark job status in cluster mode. + + spark.kubernetes.driver.request.cores + (none) + + Specify the cpu request for the driver pod. Values conform to the Kubernetes convention. + Example values include 0.1, 500m, 1.5, 5, etc., with the definition of cpu units documented in CPU units. + This takes precedence over spark.driver.cores for specifying the driver pod cpu request if set. + + spark.kubernetes.driver.limit.cores (none) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 2edabd03b5813..45385b4809588 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -125,6 +125,12 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val KUBERNETES_DRIVER_REQUEST_CORES = + ConfigBuilder("spark.kubernetes.driver.request.cores") + .doc("Specify the cpu request for the driver pod") + .stringConf + .createOptional + val KUBERNETES_DRIVER_SUBMIT_CHECK = ConfigBuilder("spark.kubernetes.submitInDriver") .internal() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 769e96f678cac..ff43921b19781 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -42,7 +42,10 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) .getOrElse(throw new SparkException("Must specify the driver container image")) // CPU settings - private val driverCpuCores = conf.get(DRIVER_CORES.key, "1") + private val driverCpuCores = conf.get(DRIVER_CORES) + private val driverCoresRequest = conf + .get(KUBERNETES_DRIVER_REQUEST_CORES) + .getOrElse(driverCpuCores.toString) private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES) // Memory settings @@ -75,7 +78,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) .build() } - val driverCpuQuantity = new Quantity(driverCpuCores) + val driverCpuQuantity = new Quantity(driverCoresRequest) val driverMemoryQuantity = new Quantity(s"${driverMemoryWithOverheadMiB}Mi") val maybeCpuLimitQuantity = driverLimitCores.map { limitCores => ("cpu", new Quantity(limitCores)) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index cb40c7ce4cad2..06f0a00f2a952 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -117,6 +117,38 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf) } + test("Check driver pod respects kubernetes driver request cores") { + val sparkConf = new SparkConf() + .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") + .set(CONTAINER_IMAGE, "spark-driver:latest") + + val basePod = SparkPod.initialPod() + // if spark.driver.cores is not set default is 1 + val requests1 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf)) + .configurePod(basePod) + .container.getResources + .getRequests.asScala + assert(amountAndFormat(requests1("cpu")) === "1") + + // if spark.driver.cores is set it should be used + sparkConf.set(DRIVER_CORES, 10) + val requests2 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf)) + .configurePod(basePod) + .container.getResources + .getRequests.asScala + assert(amountAndFormat(requests2("cpu")) === "10") + + // spark.kubernetes.driver.request.cores should be preferred over spark.driver.cores + Seq("0.1", "100m").foreach { value => + sparkConf.set(KUBERNETES_DRIVER_REQUEST_CORES, value) + val requests3 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf)) + .configurePod(basePod) + .container.getResources + .getRequests.asScala + assert(amountAndFormat(requests3("cpu")) === value) + } + } + test("Check appropriate entrypoint rerouting for various bindings") { val javaSparkConf = new SparkConf() .set(DRIVER_MEMORY.key, "4g")