diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index f1000346cba0a..72833ccca6248 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -793,6 +793,15 @@ See the [configuration page](configuration.html) for information on Spark config Interval between reports of the current Spark job status in cluster mode. + + spark.kubernetes.driver.request.cores + (none) + + Specify the cpu request for the driver pod. Values conform to the Kubernetes convention. + Example values include 0.1, 500m, 1.5, 5, etc., with the definition of cpu units documented in CPU units. + This takes precedence over spark.driver.cores for specifying the driver pod cpu request if set. + + spark.kubernetes.driver.limit.cores (none) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index b33b125f5dd2e..cc1bfd9bb0d81 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -125,6 +125,12 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val KUBERNETES_DRIVER_REQUEST_CORES = + ConfigBuilder("spark.kubernetes.driver.request.cores") + .doc("Specify the cpu request for the driver pod") + .stringConf + .createOptional + val KUBERNETES_DRIVER_SUBMIT_CHECK = ConfigBuilder("spark.kubernetes.submitInDriver") .internal() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 17c00eb7c3a59..2f4921aa94382 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -44,6 +44,11 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) // CPU settings private val driverCpuCores = conf.get(DRIVER_CORES.key, "1") + private val driverCoresRequest = if (conf.contains(KUBERNETES_DRIVER_REQUEST_CORES)) { + conf.get(KUBERNETES_DRIVER_REQUEST_CORES).get + } else { + driverCpuCores + } private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES) // Memory settings @@ -77,7 +82,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) } val driverCpuQuantity = new QuantityBuilder(false) - .withAmount(driverCpuCores) + .withAmount(driverCoresRequest) .build() val driverMemoryQuantity = new QuantityBuilder(false) .withAmount(s"${driverMemoryWithOverheadMiB}Mi") diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 7cfc4d2774cff..5956d1a2b290a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -117,6 +117,33 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf) } + test("Check driver pod respects kubernetes driver request cores") { + val sparkConf = new SparkConf() + .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") + .set(CONTAINER_IMAGE, "spark-driver:latest") + + val basePod = SparkPod.initialPod() + val requests1 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf)) + .configurePod(basePod) + .container.getResources + .getRequests.asScala + assert(requests1("cpu").getAmount === "1") + + sparkConf.set(KUBERNETES_DRIVER_REQUEST_CORES, "0.1") + val requests2 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf)) + .configurePod(basePod) + .container.getResources + .getRequests.asScala + assert(requests2("cpu").getAmount === "0.1") + + sparkConf.set(KUBERNETES_DRIVER_REQUEST_CORES, "100m") + val requests3 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf)) + .configurePod(basePod) + .container.getResources + .getRequests.asScala + assert(requests3("cpu").getAmount === "100m") + } + test("Check appropriate entrypoint rerouting for various bindings") { val javaSparkConf = new SparkConf() .set(DRIVER_MEMORY.key, "4g")