From 4002b74a3cbcc18a3305acc1e73fca6ec89c5a92 Mon Sep 17 00:00:00 2001 From: Arun Mahadevan Date: Thu, 16 May 2019 13:20:53 -0700 Subject: [PATCH 1/3] Introduce config for driver request cores Spark on k8s supports config for specifying the executor cpu requests (spark.kubernetes.executor.request.cores) but a similar config is missing for the driver. Apparently `spark.driver.cores` works but its not evident that this accepts fractional values (its defined as an Integer config but apparently accepts decimals). To keep in sync with the executor config a similar driver config can be introduced (spark.kubernetes.driver.request.cores) for explicitly specifying the driver CPU requests. If not provided, the value will default to `spark.driver.cores` as before. --- docs/running-on-kubernetes.md | 9 +++++++ .../org/apache/spark/deploy/k8s/Config.scala | 6 +++++ .../k8s/features/BasicDriverFeatureStep.scala | 7 ++++- .../BasicDriverFeatureStepSuite.scala | 27 +++++++++++++++++++ 4 files changed, 48 insertions(+), 1 deletion(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index f1000346cba0a..72833ccca6248 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -793,6 +793,15 @@ See the [configuration page](configuration.html) for information on Spark config Interval between reports of the current Spark job status in cluster mode. + + spark.kubernetes.driver.request.cores + (none) + + Specify the cpu request for the driver pod. Values conform to the Kubernetes convention. + Example values include 0.1, 500m, 1.5, 5, etc., with the definition of cpu units documented in CPU units. + This takes precedence over spark.driver.cores for specifying the driver pod cpu request if set. + + spark.kubernetes.driver.limit.cores (none) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index b33b125f5dd2e..cc1bfd9bb0d81 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -125,6 +125,12 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val KUBERNETES_DRIVER_REQUEST_CORES = + ConfigBuilder("spark.kubernetes.driver.request.cores") + .doc("Specify the cpu request for the driver pod") + .stringConf + .createOptional + val KUBERNETES_DRIVER_SUBMIT_CHECK = ConfigBuilder("spark.kubernetes.submitInDriver") .internal() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 17c00eb7c3a59..2f4921aa94382 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -44,6 +44,11 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) // CPU settings private val driverCpuCores = conf.get(DRIVER_CORES.key, "1") + private val driverCoresRequest = if (conf.contains(KUBERNETES_DRIVER_REQUEST_CORES)) { + conf.get(KUBERNETES_DRIVER_REQUEST_CORES).get + } else { + driverCpuCores + } private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES) // Memory settings @@ -77,7 +82,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) } val driverCpuQuantity = new QuantityBuilder(false) - .withAmount(driverCpuCores) + .withAmount(driverCoresRequest) .build() val driverMemoryQuantity = new QuantityBuilder(false) .withAmount(s"${driverMemoryWithOverheadMiB}Mi") diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 7cfc4d2774cff..5956d1a2b290a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -117,6 +117,33 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf) } + test("Check driver pod respects kubernetes driver request cores") { + val sparkConf = new SparkConf() + .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") + .set(CONTAINER_IMAGE, "spark-driver:latest") + + val basePod = SparkPod.initialPod() + val requests1 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf)) + .configurePod(basePod) + .container.getResources + .getRequests.asScala + assert(requests1("cpu").getAmount === "1") + + sparkConf.set(KUBERNETES_DRIVER_REQUEST_CORES, "0.1") + val requests2 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf)) + .configurePod(basePod) + .container.getResources + .getRequests.asScala + assert(requests2("cpu").getAmount === "0.1") + + sparkConf.set(KUBERNETES_DRIVER_REQUEST_CORES, "100m") + val requests3 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf)) + .configurePod(basePod) + .container.getResources + .getRequests.asScala + assert(requests3("cpu").getAmount === "100m") + } + test("Check appropriate entrypoint rerouting for various bindings") { val javaSparkConf = new SparkConf() .set(DRIVER_MEMORY.key, "4g") From 8ca63d3e7b9a427b089542029d297d731931fa8a Mon Sep 17 00:00:00 2001 From: Arun Mahadevan Date: Fri, 17 May 2019 09:59:11 -0700 Subject: [PATCH 2/3] review comments and test case --- .../k8s/features/BasicDriverFeatureStep.scala | 6 +---- .../BasicDriverFeatureStepSuite.scala | 23 +++++++++++-------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 2f4921aa94382..73a5280ceba54 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -44,11 +44,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) // CPU settings private val driverCpuCores = conf.get(DRIVER_CORES.key, "1") - private val driverCoresRequest = if (conf.contains(KUBERNETES_DRIVER_REQUEST_CORES)) { - conf.get(KUBERNETES_DRIVER_REQUEST_CORES).get - } else { - driverCpuCores - } + private val driverCoresRequest = conf.get(KUBERNETES_DRIVER_REQUEST_CORES.key, driverCpuCores) private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES) // Memory settings diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 5956d1a2b290a..92f46c67471b8 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -123,25 +123,30 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { .set(CONTAINER_IMAGE, "spark-driver:latest") val basePod = SparkPod.initialPod() + // if spark.driver.cores is not set default is 1 val requests1 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf)) .configurePod(basePod) .container.getResources .getRequests.asScala assert(requests1("cpu").getAmount === "1") - sparkConf.set(KUBERNETES_DRIVER_REQUEST_CORES, "0.1") + // if spark.driver.cores is set it should be used + sparkConf.set(DRIVER_CORES, 10) val requests2 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf)) .configurePod(basePod) .container.getResources .getRequests.asScala - assert(requests2("cpu").getAmount === "0.1") - - sparkConf.set(KUBERNETES_DRIVER_REQUEST_CORES, "100m") - val requests3 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf)) - .configurePod(basePod) - .container.getResources - .getRequests.asScala - assert(requests3("cpu").getAmount === "100m") + assert(requests2("cpu").getAmount === "10") + + // spark.kubernetes.driver.request.cores should be preferred over spark.driver.cores + Seq("0.1", "100m").foreach { value => + sparkConf.set(KUBERNETES_DRIVER_REQUEST_CORES, value) + val requests3 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf)) + .configurePod(basePod) + .container.getResources + .getRequests.asScala + assert(requests3("cpu").getAmount === value) + } } test("Check appropriate entrypoint rerouting for various bindings") { From ff688c23de7c716cd0d9484846cbd39cafc17a21 Mon Sep 17 00:00:00 2001 From: Arun Mahadevan Date: Fri, 17 May 2019 20:12:16 -0700 Subject: [PATCH 3/3] fix default value --- .../spark/deploy/k8s/features/BasicDriverFeatureStep.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 73a5280ceba54..61fc67f4dc561 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -43,8 +43,10 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) .getOrElse(throw new SparkException("Must specify the driver container image")) // CPU settings - private val driverCpuCores = conf.get(DRIVER_CORES.key, "1") - private val driverCoresRequest = conf.get(KUBERNETES_DRIVER_REQUEST_CORES.key, driverCpuCores) + private val driverCpuCores = conf.get(DRIVER_CORES) + private val driverCoresRequest = conf + .get(KUBERNETES_DRIVER_REQUEST_CORES) + .getOrElse(driverCpuCores.toString) private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES) // Memory settings