From 3404a73f4cf7be37e574026d08ad5cf82cfac871 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 2 Nov 2018 13:58:08 -0700 Subject: [PATCH] [SPARK-25875][K8S] Merge code to set up driver command into a single step. Right now there are 3 different classes dealing with building the driver command to run inside the pod, one for each "binding" supported by Spark. This has two main shortcomings: - the code in the 3 classes is very similar; changing things in one place would probably mean making a similar change in the others. - it gives the false impression that the step implementation is the only place where binding-specific logic is needed. That is not true; there was code in KubernetesConf that was binding-specific, and there's also code in the executor-specific config step. So the 3 classes weren't really working as a language-specific abstraction. On top of that, the current code was propagating command line parameters in a different way depending on the binding. That doesn't seem necessary, and in fact using environment variables for command line parameters is in general a really bad idea, since you can't handle special characters (e.g. spaces) that way. This change merges the 3 different code paths for Java, Python and R into a single step, and also merges the 3 code paths to start the Spark driver in the k8s entry point script. This increases the amount of shared code, and also moves more feature logic into the step itself, so it doesn't live in KubernetesConf. Note that not all logic related to setting up the driver lives in that step. For example, the memory overhead calculation still lives separately, except it now happens in the driver config step instead of outside the step hierarchy altogether. Some of the noise in the diff is because of changes to KubernetesConf, which will be addressed in a separate change. Tested with new and updated unit tests + integration tests. Author: Marcelo Vanzin Closes #22897 from vanzin/SPARK-25875. --- .../org/apache/spark/deploy/k8s/Config.scala | 30 +--- .../apache/spark/deploy/k8s/Constants.scala | 10 +- .../spark/deploy/k8s/KubernetesConf.scala | 70 ++------- .../k8s/features/BasicDriverFeatureStep.scala | 42 +++-- .../features/BasicExecutorFeatureStep.scala | 21 +-- .../features/DriverCommandFeatureStep.scala | 134 ++++++++++++++++ .../KubernetesFeatureConfigStep.scala | 4 +- .../features/PodTemplateConfigMapStep.scala | 4 +- .../bindings/JavaDriverFeatureStep.scala | 46 ------ .../bindings/PythonDriverFeatureStep.scala | 76 --------- .../bindings/RDriverFeatureStep.scala | 62 -------- .../submit/KubernetesClientApplication.scala | 10 +- .../k8s/submit/KubernetesDriverBuilder.scala | 26 +--- .../deploy/k8s/submit/MainAppResource.scala | 3 +- .../deploy/k8s/KubernetesConfSuite.scala | 103 +------------ .../BasicDriverFeatureStepSuite.scala | 69 +++++++-- .../BasicExecutorFeatureStepSuite.scala | 4 - .../DriverCommandFeatureStepSuite.scala | 144 ++++++++++++++++++ ...ubernetesCredentialsFeatureStepSuite.scala | 3 - .../DriverServiceFeatureStepSuite.scala | 19 +-- .../features/EnvSecretsFeatureStepSuite.scala | 1 - .../features/LocalDirsFeatureStepSuite.scala | 4 +- .../MountSecretsFeatureStepSuite.scala | 1 - .../MountVolumesFeatureStepSuite.scala | 4 +- .../PodTemplateConfigMapStepSuite.scala | 4 +- .../bindings/JavaDriverFeatureStepSuite.scala | 61 -------- .../PythonDriverFeatureStepSuite.scala | 112 -------------- .../bindings/RDriverFeatureStepSuite.scala | 64 -------- .../spark/deploy/k8s/submit/ClientSuite.scala | 4 +- .../submit/KubernetesDriverBuilderSuite.scala | 136 +++-------------- .../k8s/KubernetesExecutorBuilderSuite.scala | 6 - .../src/main/dockerfiles/spark/entrypoint.sh | 16 -- 32 files changed, 438 insertions(+), 855 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala delete mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala delete mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala delete mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala delete mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala delete mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala delete mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 862f1d63ed39f..a32bd93bb65bc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.k8s import java.util.concurrent.TimeUnit +import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.ConfigBuilder @@ -125,34 +126,6 @@ private[spark] object Config extends Logging { .stringConf .createOptional - val KUBERNETES_PYSPARK_MAIN_APP_RESOURCE = - ConfigBuilder("spark.kubernetes.python.mainAppResource") - .doc("The main app resource for pyspark jobs") - .internal() - .stringConf - .createOptional - - val KUBERNETES_PYSPARK_APP_ARGS = - ConfigBuilder("spark.kubernetes.python.appArgs") - .doc("The app arguments for PySpark Jobs") - .internal() - .stringConf - .createOptional - - val KUBERNETES_R_MAIN_APP_RESOURCE = - ConfigBuilder("spark.kubernetes.r.mainAppResource") - .doc("The main app resource for SparkR jobs") - .internal() - .stringConf - .createOptional - - val KUBERNETES_R_APP_ARGS = - ConfigBuilder("spark.kubernetes.r.appArgs") - .doc("The app arguments for SparkR Jobs") - .internal() - .stringConf - .createOptional - val KUBERNETES_ALLOCATION_BATCH_SIZE = ConfigBuilder("spark.kubernetes.allocation.batch.size") .doc("Number of pods to launch at once in each round of executor allocation.") @@ -267,6 +240,7 @@ private[spark] object Config extends Logging { .doc("This sets the resource type internally") .internal() .stringConf + .checkValues(Set(APP_RESOURCE_TYPE_JAVA, APP_RESOURCE_TYPE_PYTHON, APP_RESOURCE_TYPE_R)) .createOptional val KUBERNETES_LOCAL_DIRS_TMPFS = diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index 1c6d53c16871e..85917b88e912a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -69,12 +69,8 @@ private[spark] object Constants { val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION" // BINDINGS - val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY" val ENV_PYSPARK_FILES = "PYSPARK_FILES" - val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS" val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION" - val ENV_R_PRIMARY = "R_PRIMARY" - val ENV_R_ARGS = "R_APP_ARGS" // Pod spec templates val EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME = "pod-spec-template.yml" @@ -88,6 +84,7 @@ private[spark] object Constants { val DEFAULT_DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" val DEFAULT_EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor" val MEMORY_OVERHEAD_MIN_MIB = 384L + val NON_JVM_MEMORY_OVERHEAD_FACTOR = 0.4d // Hadoop Configuration val HADOOP_FILE_VOLUME = "hadoop-properties" @@ -113,4 +110,9 @@ private[spark] object Constants { // Hadoop credentials secrets for the Spark app. val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials" val SPARK_APP_HADOOP_SECRET_VOLUME_NAME = "hadoop-secret" + + // Application resource types. + val APP_RESOURCE_TYPE_JAVA = "java" + val APP_RESOURCE_TYPE_PYTHON = "python" + val APP_RESOURCE_TYPE_R = "r" } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 066547dcbb408..ebb81540bbbbe 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -28,6 +28,7 @@ import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManag import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._ import org.apache.spark.internal.config.ConfigEntry +import org.apache.spark.util.Utils private[spark] sealed trait KubernetesRoleSpecificConf @@ -36,10 +37,15 @@ private[spark] sealed trait KubernetesRoleSpecificConf * Structure containing metadata for Kubernetes logic that builds a Spark driver. */ private[spark] case class KubernetesDriverSpecificConf( - mainAppResource: Option[MainAppResource], + mainAppResource: MainAppResource, mainClass: String, appName: String, - appArgs: Seq[String]) extends KubernetesRoleSpecificConf + appArgs: Seq[String], + pyFiles: Seq[String] = Nil) extends KubernetesRoleSpecificConf { + + require(mainAppResource != null, "Main resource must be provided.") + +} /* * Structure containing metadata for Kubernetes logic that builds a Spark executor. @@ -70,7 +76,6 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( roleSecretEnvNamesToKeyRefs: Map[String, String], roleEnvs: Map[String, String], roleVolumes: Iterable[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]], - sparkFiles: Seq[String], hadoopConfSpec: Option[HadoopConfSpec]) { def hadoopConfigMapName: String = s"$appResourceNamePrefix-hadoop-config" @@ -82,23 +87,6 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE) - def sparkJars(): Seq[String] = sparkConf - .getOption("spark.jars") - .map(str => str.split(",").toSeq) - .getOrElse(Seq.empty[String]) - - def pyFiles(): Option[String] = sparkConf - .get(KUBERNETES_PYSPARK_PY_FILES) - - def pySparkMainResource(): Option[String] = sparkConf - .get(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE) - - def pySparkPythonVersion(): String = sparkConf - .get(PYSPARK_MAJOR_PYTHON_VERSION) - - def sparkRMainResource(): Option[String] = sparkConf - .get(KUBERNETES_R_MAIN_APP_RESOURCE) - def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY) def imagePullSecrets(): Seq[LocalObjectReference] = { @@ -130,38 +118,11 @@ private[spark] object KubernetesConf { appName: String, appResourceNamePrefix: String, appId: String, - mainAppResource: Option[MainAppResource], + mainAppResource: MainAppResource, mainClass: String, appArgs: Array[String], maybePyFiles: Option[String], hadoopConfDir: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { - val sparkConfWithMainAppJar = sparkConf.clone() - val additionalFiles = mutable.ArrayBuffer.empty[String] - mainAppResource.foreach { - case JavaMainAppResource(res) => - val previousJars = sparkConf - .getOption("spark.jars") - .map(_.split(",")) - .getOrElse(Array.empty) - if (!previousJars.contains(res)) { - sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) - } - // The function of this outer match is to account for multiple nonJVM - // bindings that will all have increased default MEMORY_OVERHEAD_FACTOR to 0.4 - case nonJVM: NonJVMResource => - nonJVM match { - case PythonMainAppResource(res) => - additionalFiles += res - maybePyFiles.foreach{maybePyFiles => - additionalFiles.appendAll(maybePyFiles.split(","))} - sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res) - case RMainAppResource(res) => - additionalFiles += res - sparkConfWithMainAppJar.set(KUBERNETES_R_MAIN_APP_RESOURCE, res) - } - sparkConfWithMainAppJar.setIfMissing(MEMORY_OVERHEAD_FACTOR, 0.4) - } - val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX) require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " + @@ -188,11 +149,6 @@ private[spark] object KubernetesConf { KubernetesVolumeUtils.parseVolumesWithPrefix( sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get) - val sparkFiles = sparkConf - .getOption("spark.files") - .map(str => str.split(",").toSeq) - .getOrElse(Seq.empty[String]) ++ additionalFiles - val hadoopConfigMapName = sparkConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP) KubernetesUtils.requireNandDefined( hadoopConfDir, @@ -205,10 +161,12 @@ private[spark] object KubernetesConf { } else { None } + val pyFiles = maybePyFiles.map(Utils.stringToSeq).getOrElse(Nil) + KubernetesConf( - sparkConfWithMainAppJar, - KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs), + sparkConf.clone(), + KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs, pyFiles), appResourceNamePrefix, appId, driverLabels, @@ -217,7 +175,6 @@ private[spark] object KubernetesConf { driverSecretEnvNamesToKeyRefs, driverEnvs, driverVolumes, - sparkFiles, hadoopConfSpec) } @@ -274,7 +231,6 @@ private[spark] object KubernetesConf { executorEnvSecrets, executorEnv, executorVolumes, - Seq.empty[String], None) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 96b14a0d82b4c..5ddf73cb16a6f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -28,6 +28,7 @@ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.internal.config._ import org.apache.spark.ui.SparkUI +import org.apache.spark.util.Utils private[spark] class BasicDriverFeatureStep( conf: KubernetesConf[KubernetesDriverSpecificConf]) @@ -47,10 +48,23 @@ private[spark] class BasicDriverFeatureStep( // Memory settings private val driverMemoryMiB = conf.get(DRIVER_MEMORY) + + // The memory overhead factor to use. If the user has not set it, then use a different + // value for non-JVM apps. This value is propagated to executors. + private val overheadFactor = + if (conf.roleSpecificConf.mainAppResource.isInstanceOf[NonJVMResource]) { + if (conf.sparkConf.contains(MEMORY_OVERHEAD_FACTOR)) { + conf.get(MEMORY_OVERHEAD_FACTOR) + } else { + NON_JVM_MEMORY_OVERHEAD_FACTOR + } + } else { + conf.get(MEMORY_OVERHEAD_FACTOR) + } + private val memoryOverheadMiB = conf .get(DRIVER_MEMORY_OVERHEAD) - .getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR) * driverMemoryMiB).toInt, - MEMORY_OVERHEAD_MIN_MIB)) + .getOrElse(math.max((overheadFactor * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB override def configurePod(pod: SparkPod): SparkPod = { @@ -134,20 +148,18 @@ private[spark] class BasicDriverFeatureStep( KUBERNETES_DRIVER_POD_NAME.key -> driverPodName, "spark.app.id" -> conf.appId, KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.appResourceNamePrefix, - KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true") - - val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath( - conf.sparkJars()) - val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath( - conf.sparkFiles) - if (resolvedSparkJars.nonEmpty) { - additionalProps.put("spark.jars", resolvedSparkJars.mkString(",")) - } - if (resolvedSparkFiles.nonEmpty) { - additionalProps.put("spark.files", resolvedSparkFiles.mkString(",")) + KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true", + MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString) + + Seq("spark.jars", "spark.files").foreach { key => + conf.getOption(key).foreach { value => + val resolved = KubernetesUtils.resolveFileUrisAndPath(Utils.stringToSeq(value)) + if (resolved.nonEmpty) { + additionalProps.put(key, resolved.mkString(",")) + } + } } + additionalProps.toMap } - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 1dab2a834f3e7..7f397e6e84fa5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -58,16 +58,13 @@ private[spark] class BasicExecutorFeatureStep( (kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB - private val executorMemoryTotal = kubernetesConf.sparkConf - .getOption(APP_RESOURCE_TYPE.key).map{ res => - val additionalPySparkMemory = res match { - case "python" => - kubernetesConf.sparkConf - .get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0) - case _ => 0 - } - executorMemoryWithOverhead + additionalPySparkMemory - }.getOrElse(executorMemoryWithOverhead) + private val executorMemoryTotal = + if (kubernetesConf.get(APP_RESOURCE_TYPE) == Some(APP_RESOURCE_TYPE_PYTHON)) { + executorMemoryWithOverhead + + kubernetesConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0) + } else { + executorMemoryWithOverhead + } private val executorCores = kubernetesConf.sparkConf.getInt("spark.executor.cores", 1) private val executorCoresRequest = @@ -187,8 +184,4 @@ private[spark] class BasicExecutorFeatureStep( SparkPod(executorPod, containerWithLimitCores) } - - override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala new file mode 100644 index 0000000000000..8b8f0d01d49f7 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder} + +import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit._ +import org.apache.spark.internal.config._ +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.util.Utils + +/** + * Creates the driver command for running the user app, and propagates needed configuration so + * executors can also find the app code. + */ +private[spark] class DriverCommandFeatureStep(conf: KubernetesConf[KubernetesDriverSpecificConf]) + extends KubernetesFeatureConfigStep { + + private val driverConf = conf.roleSpecificConf + + override def configurePod(pod: SparkPod): SparkPod = { + driverConf.mainAppResource match { + case JavaMainAppResource(_) => + configureForJava(pod) + + case PythonMainAppResource(res) => + configureForPython(pod, res) + + case RMainAppResource(res) => + configureForR(pod, res) + } + } + + override def getAdditionalPodSystemProperties(): Map[String, String] = { + driverConf.mainAppResource match { + case JavaMainAppResource(res) => + res.map(additionalJavaProperties).getOrElse(Map.empty) + + case PythonMainAppResource(res) => + additionalPythonProperties(res) + + case RMainAppResource(res) => + additionalRProperties(res) + } + } + + private def configureForJava(pod: SparkPod): SparkPod = { + // The user application jar is merged into the spark.jars list and managed through that + // property, so use a "blank" resource for the Java driver. + val driverContainer = baseDriverContainer(pod, SparkLauncher.NO_RESOURCE).build() + SparkPod(pod.pod, driverContainer) + } + + private def configureForPython(pod: SparkPod, res: String): SparkPod = { + val maybePythonFiles = if (driverConf.pyFiles.nonEmpty) { + // Delineation by ":" is to append the PySpark Files to the PYTHONPATH + // of the respective PySpark pod + val resolved = KubernetesUtils.resolveFileUrisAndPath(driverConf.pyFiles) + Some(new EnvVarBuilder() + .withName(ENV_PYSPARK_FILES) + .withValue(resolved.mkString(":")) + .build()) + } else { + None + } + val pythonEnvs = + Seq(new EnvVarBuilder() + .withName(ENV_PYSPARK_MAJOR_PYTHON_VERSION) + .withValue(conf.sparkConf.get(PYSPARK_MAJOR_PYTHON_VERSION)) + .build()) ++ + maybePythonFiles + + val pythonContainer = baseDriverContainer(pod, KubernetesUtils.resolveFileUri(res)) + .addAllToEnv(pythonEnvs.asJava) + .build() + + SparkPod(pod.pod, pythonContainer) + } + + private def configureForR(pod: SparkPod, res: String): SparkPod = { + val rContainer = baseDriverContainer(pod, KubernetesUtils.resolveFileUri(res)).build() + SparkPod(pod.pod, rContainer) + } + + private def baseDriverContainer(pod: SparkPod, resource: String): ContainerBuilder = { + new ContainerBuilder(pod.container) + .addToArgs("driver") + .addToArgs("--properties-file", SPARK_CONF_PATH) + .addToArgs("--class", driverConf.mainClass) + .addToArgs(resource) + .addToArgs(driverConf.appArgs: _*) + } + + private def additionalJavaProperties(resource: String): Map[String, String] = { + resourceType(APP_RESOURCE_TYPE_JAVA) ++ mergeFileList("spark.jars", Seq(resource)) + } + + private def additionalPythonProperties(resource: String): Map[String, String] = { + resourceType(APP_RESOURCE_TYPE_PYTHON) ++ + mergeFileList("spark.files", Seq(resource) ++ driverConf.pyFiles) + } + + private def additionalRProperties(resource: String): Map[String, String] = { + resourceType(APP_RESOURCE_TYPE_R) ++ mergeFileList("spark.files", Seq(resource)) + } + + private def mergeFileList(key: String, filesToAdd: Seq[String]): Map[String, String] = { + val existing = Utils.stringToSeq(conf.sparkConf.get(key, "")) + Map(key -> (existing ++ filesToAdd).distinct.mkString(",")) + } + + private def resourceType(resType: String): Map[String, String] = { + Map(APP_RESOURCE_TYPE.key -> resType) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala index 4c1be3bb13293..58cdaa3cadd6b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala @@ -61,11 +61,11 @@ private[spark] trait KubernetesFeatureConfigStep { /** * Return any system properties that should be set on the JVM in accordance to this feature. */ - def getAdditionalPodSystemProperties(): Map[String, String] + def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty /** * Return any additional Kubernetes resources that should be added to support this feature. Only * applicable when creating the driver in cluster mode. */ - def getAdditionalKubernetesResources(): Seq[HasMetadata] + def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala index 96a8013246b74..28e2d1726ae27 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala @@ -54,11 +54,11 @@ private[spark] class PodTemplateConfigMapStep( SparkPod(podWithVolume, containerWithVolume) } - def getAdditionalPodSystemProperties(): Map[String, String] = Map[String, String]( + override def getAdditionalPodSystemProperties(): Map[String, String] = Map[String, String]( KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key -> (EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" + EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)) - def getAdditionalKubernetesResources(): Seq[HasMetadata] = { + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { require(conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) val podTemplateFile = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get val podTemplateString = Files.toString(new File(podTemplateFile), StandardCharsets.UTF_8) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala deleted file mode 100644 index 6f063b253cd73..0000000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.bindings - -import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata} - -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} -import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE -import org.apache.spark.deploy.k8s.Constants.SPARK_CONF_PATH -import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep -import org.apache.spark.launcher.SparkLauncher - -private[spark] class JavaDriverFeatureStep( - kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) - extends KubernetesFeatureConfigStep { - override def configurePod(pod: SparkPod): SparkPod = { - val withDriverArgs = new ContainerBuilder(pod.container) - .addToArgs("driver") - .addToArgs("--properties-file", SPARK_CONF_PATH) - .addToArgs("--class", kubernetesConf.roleSpecificConf.mainClass) - // The user application jar is merged into the spark.jars list and managed through that - // property, so there is no need to reference it explicitly here. - .addToArgs(SparkLauncher.NO_RESOURCE) - .addToArgs(kubernetesConf.roleSpecificConf.appArgs: _*) - .build() - SparkPod(pod.pod, withDriverArgs) - } - override def getAdditionalPodSystemProperties(): Map[String, String] = - Map(APP_RESOURCE_TYPE.key -> "java") - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala deleted file mode 100644 index cf0c03b22bd7e..0000000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.bindings - -import scala.collection.JavaConverters._ - -import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata} - -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod} -import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep - -private[spark] class PythonDriverFeatureStep( - kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) - extends KubernetesFeatureConfigStep { - override def configurePod(pod: SparkPod): SparkPod = { - val roleConf = kubernetesConf.roleSpecificConf - require(roleConf.mainAppResource.isDefined, "PySpark Main Resource must be defined") - // Delineation is done by " " because that is input into PythonRunner - val maybePythonArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map( - pyArgs => - new EnvVarBuilder() - .withName(ENV_PYSPARK_ARGS) - .withValue(pyArgs.mkString(" ")) - .build()) - val maybePythonFiles = kubernetesConf.pyFiles().map( - // Dilineation by ":" is to append the PySpark Files to the PYTHONPATH - // of the respective PySpark pod - pyFiles => - new EnvVarBuilder() - .withName(ENV_PYSPARK_FILES) - .withValue(KubernetesUtils.resolveFileUrisAndPath(pyFiles.split(",")) - .mkString(":")) - .build()) - val envSeq = - Seq(new EnvVarBuilder() - .withName(ENV_PYSPARK_PRIMARY) - .withValue(KubernetesUtils.resolveFileUri(kubernetesConf.pySparkMainResource().get)) - .build(), - new EnvVarBuilder() - .withName(ENV_PYSPARK_MAJOR_PYTHON_VERSION) - .withValue(kubernetesConf.pySparkPythonVersion()) - .build()) - val pythonEnvs = envSeq ++ - maybePythonArgs.toSeq ++ - maybePythonFiles.toSeq - - val withPythonPrimaryContainer = new ContainerBuilder(pod.container) - .addAllToEnv(pythonEnvs.asJava) - .addToArgs("driver-py") - .addToArgs("--properties-file", SPARK_CONF_PATH) - .addToArgs("--class", roleConf.mainClass) - .build() - - SparkPod(pod.pod, withPythonPrimaryContainer) - } - override def getAdditionalPodSystemProperties(): Map[String, String] = - Map(APP_RESOURCE_TYPE.key -> "python") - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala deleted file mode 100644 index 1a7ef52fefe70..0000000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.bindings - -import scala.collection.JavaConverters._ - -import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata} - -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod} -import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep - -private[spark] class RDriverFeatureStep( - kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) - extends KubernetesFeatureConfigStep { - override def configurePod(pod: SparkPod): SparkPod = { - val roleConf = kubernetesConf.roleSpecificConf - require(roleConf.mainAppResource.isDefined, "R Main Resource must be defined") - // Delineation is done by " " because that is input into RRunner - val maybeRArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map( - rArgs => - new EnvVarBuilder() - .withName(ENV_R_ARGS) - .withValue(rArgs.mkString(" ")) - .build()) - val envSeq = - Seq(new EnvVarBuilder() - .withName(ENV_R_PRIMARY) - .withValue(KubernetesUtils.resolveFileUri(kubernetesConf.sparkRMainResource().get)) - .build()) - val rEnvs = envSeq ++ - maybeRArgs.toSeq - - val withRPrimaryContainer = new ContainerBuilder(pod.container) - .addAllToEnv(rEnvs.asJava) - .addToArgs("driver-r") - .addToArgs("--properties-file", SPARK_CONF_PATH) - .addToArgs("--class", roleConf.mainClass) - .build() - - SparkPod(pod.pod, withRPrimaryContainer) - } - override def getAdditionalPodSystemProperties(): Map[String, String] = - Map(APP_RESOURCE_TYPE.key -> "r") - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 4b58f8ba3c9bd..543d6b16d6ae2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -44,7 +44,7 @@ import org.apache.spark.util.Utils * @param maybePyFiles additional Python files via --py-files */ private[spark] case class ClientArguments( - mainAppResource: Option[MainAppResource], + mainAppResource: MainAppResource, mainClass: String, driverArgs: Array[String], maybePyFiles: Option[String], @@ -53,18 +53,18 @@ private[spark] case class ClientArguments( private[spark] object ClientArguments { def fromCommandLineArgs(args: Array[String]): ClientArguments = { - var mainAppResource: Option[MainAppResource] = None + var mainAppResource: MainAppResource = JavaMainAppResource(None) var mainClass: Option[String] = None val driverArgs = mutable.ArrayBuffer.empty[String] var maybePyFiles : Option[String] = None args.sliding(2, 2).toList.foreach { case Array("--primary-java-resource", primaryJavaResource: String) => - mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) + mainAppResource = JavaMainAppResource(Some(primaryJavaResource)) case Array("--primary-py-file", primaryPythonResource: String) => - mainAppResource = Some(PythonMainAppResource(primaryPythonResource)) + mainAppResource = PythonMainAppResource(primaryPythonResource) case Array("--primary-r-file", primaryRFile: String) => - mainAppResource = Some(RMainAppResource(primaryRFile)) + mainAppResource = RMainAppResource(primaryRFile) case Array("--other-py-files", pyFiles: String) => maybePyFiles = Some(pyFiles) case Array("--main-class", clazz: String) => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index 5565cd74280e6..be4daec3b1bb9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -23,7 +23,6 @@ import io.fabric8.kubernetes.client.KubernetesClient import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.{Config, KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, KubernetesUtils, SparkPod} import org.apache.spark.deploy.k8s.features._ -import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep} private[spark] class KubernetesDriverBuilder( provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => BasicDriverFeatureStep = @@ -45,18 +44,10 @@ private[spark] class KubernetesDriverBuilder( provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] => MountVolumesFeatureStep) = new MountVolumesFeatureStep(_), - providePythonStep: ( + provideDriverCommandStep: ( KubernetesConf[KubernetesDriverSpecificConf] - => PythonDriverFeatureStep) = - new PythonDriverFeatureStep(_), - provideRStep: ( - KubernetesConf[KubernetesDriverSpecificConf] - => RDriverFeatureStep) = - new RDriverFeatureStep(_), - provideJavaStep: ( - KubernetesConf[KubernetesDriverSpecificConf] - => JavaDriverFeatureStep) = - new JavaDriverFeatureStep(_), + => DriverCommandFeatureStep) = + new DriverCommandFeatureStep(_), provideHadoopGlobalStep: ( KubernetesConf[KubernetesDriverSpecificConf] => KerberosConfDriverFeatureStep) = @@ -88,21 +79,14 @@ private[spark] class KubernetesDriverBuilder( Seq(providePodTemplateConfigMapStep(kubernetesConf)) } else Nil - val bindingsStep = kubernetesConf.roleSpecificConf.mainAppResource.map { - case JavaMainAppResource(_) => - provideJavaStep(kubernetesConf) - case PythonMainAppResource(_) => - providePythonStep(kubernetesConf) - case RMainAppResource(_) => - provideRStep(kubernetesConf)} - .getOrElse(provideJavaStep(kubernetesConf)) + val driverCommandStep = provideDriverCommandStep(kubernetesConf) val maybeHadoopConfigStep = kubernetesConf.hadoopConfSpec.map { _ => provideHadoopGlobalStep(kubernetesConf)} val allFeatures: Seq[KubernetesFeatureConfigStep] = - (baseFeatures :+ bindingsStep) ++ + baseFeatures ++ Seq(driverCommandStep) ++ secretFeature ++ envSecretFeature ++ volumesFeature ++ maybeHadoopConfigStep.toSeq ++ podTemplateFeature diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala index dd5a4549743df..a2e01fa2d9a0e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala @@ -20,7 +20,8 @@ private[spark] sealed trait MainAppResource private[spark] sealed trait NonJVMResource -private[spark] case class JavaMainAppResource(primaryResource: String) extends MainAppResource +private[spark] case class JavaMainAppResource(primaryResource: Option[String]) + extends MainAppResource private[spark] case class PythonMainAppResource(primaryResource: String) extends MainAppResource with NonJVMResource diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala index bb2b94f9976e2..41ca8d186c17b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala @@ -56,7 +56,7 @@ class KubernetesConfSuite extends SparkFunSuite { APP_NAME, RESOURCE_NAME_PREFIX, APP_ID, - mainAppResource = None, + mainAppResource = JavaMainAppResource(None), MAIN_CLASS, APP_ARGS, maybePyFiles = None, @@ -65,109 +65,10 @@ class KubernetesConfSuite extends SparkFunSuite { assert(conf.sparkConf.getAll.toMap === sparkConf.getAll.toMap) assert(conf.appResourceNamePrefix === RESOURCE_NAME_PREFIX) assert(conf.roleSpecificConf.appName === APP_NAME) - assert(conf.roleSpecificConf.mainAppResource.isEmpty) assert(conf.roleSpecificConf.mainClass === MAIN_CLASS) assert(conf.roleSpecificConf.appArgs === APP_ARGS) } - test("Creating driver conf with and without the main app jar influences spark.jars") { - val sparkConf = new SparkConf(false) - .setJars(Seq("local:///opt/spark/jar1.jar")) - val mainAppJar = Some(JavaMainAppResource("local:///opt/spark/main.jar")) - val kubernetesConfWithMainJar = KubernetesConf.createDriverConf( - sparkConf, - APP_NAME, - RESOURCE_NAME_PREFIX, - APP_ID, - mainAppJar, - MAIN_CLASS, - APP_ARGS, - maybePyFiles = None, - hadoopConfDir = None) - assert(kubernetesConfWithMainJar.sparkConf.get("spark.jars") - .split(",") - === Array("local:///opt/spark/jar1.jar", "local:///opt/spark/main.jar")) - val kubernetesConfWithoutMainJar = KubernetesConf.createDriverConf( - sparkConf, - APP_NAME, - RESOURCE_NAME_PREFIX, - APP_ID, - mainAppResource = None, - MAIN_CLASS, - APP_ARGS, - maybePyFiles = None, - hadoopConfDir = None) - assert(kubernetesConfWithoutMainJar.sparkConf.get("spark.jars").split(",") - === Array("local:///opt/spark/jar1.jar")) - assert(kubernetesConfWithoutMainJar.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.1) - } - - test("Creating driver conf with a python primary file") { - val mainResourceFile = "local:///opt/spark/main.py" - val inputPyFiles = Array("local:///opt/spark/example2.py", "local:///example3.py") - val sparkConf = new SparkConf(false) - .setJars(Seq("local:///opt/spark/jar1.jar")) - .set("spark.files", "local:///opt/spark/example4.py") - val mainAppResource = Some(PythonMainAppResource(mainResourceFile)) - val kubernetesConfWithMainResource = KubernetesConf.createDriverConf( - sparkConf, - APP_NAME, - RESOURCE_NAME_PREFIX, - APP_ID, - mainAppResource, - MAIN_CLASS, - APP_ARGS, - Some(inputPyFiles.mkString(",")), - hadoopConfDir = None) - assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",") - === Array("local:///opt/spark/jar1.jar")) - assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.4) - assert(kubernetesConfWithMainResource.sparkFiles - === Array("local:///opt/spark/example4.py", mainResourceFile) ++ inputPyFiles) - } - - test("Creating driver conf with a r primary file") { - val mainResourceFile = "local:///opt/spark/main.R" - val sparkConf = new SparkConf(false) - .setJars(Seq("local:///opt/spark/jar1.jar")) - .set("spark.files", "local:///opt/spark/example2.R") - val mainAppResource = Some(RMainAppResource(mainResourceFile)) - val kubernetesConfWithMainResource = KubernetesConf.createDriverConf( - sparkConf, - APP_NAME, - RESOURCE_NAME_PREFIX, - APP_ID, - mainAppResource, - MAIN_CLASS, - APP_ARGS, - maybePyFiles = None, - hadoopConfDir = None) - assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",") - === Array("local:///opt/spark/jar1.jar")) - assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.4) - assert(kubernetesConfWithMainResource.sparkFiles - === Array("local:///opt/spark/example2.R", mainResourceFile)) - } - - test("Testing explicit setting of memory overhead on non-JVM tasks") { - val sparkConf = new SparkConf(false) - .set(MEMORY_OVERHEAD_FACTOR, 0.3) - - val mainResourceFile = "local:///opt/spark/main.py" - val mainAppResource = Some(PythonMainAppResource(mainResourceFile)) - val conf = KubernetesConf.createDriverConf( - sparkConf, - APP_NAME, - RESOURCE_NAME_PREFIX, - APP_ID, - mainAppResource, - MAIN_CLASS, - APP_ARGS, - maybePyFiles = None, - hadoopConfDir = None) - assert(conf.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.3) - } - test("Resolve driver labels, annotations, secret mount paths, envs, and memory overhead") { val sparkConf = new SparkConf(false) .set(MEMORY_OVERHEAD_FACTOR, 0.3) @@ -192,7 +93,7 @@ class KubernetesConfSuite extends SparkFunSuite { APP_NAME, RESOURCE_NAME_PREFIX, APP_ID, - mainAppResource = None, + mainAppResource = JavaMainAppResource(None), MAIN_CLASS, APP_ARGS, maybePyFiles = None, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 5c6bcc72158be..1e7dfbeffdb24 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -24,8 +24,8 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.submit.JavaMainAppResource -import org.apache.spark.deploy.k8s.submit.PythonMainAppResource +import org.apache.spark.deploy.k8s.submit._ +import org.apache.spark.internal.config._ import org.apache.spark.ui.SparkUI class BasicDriverFeatureStepSuite extends SparkFunSuite { @@ -52,7 +52,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { new LocalObjectReferenceBuilder().withName(secret).build() } private val emptyDriverSpecificConf = KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), APP_NAME, MAIN_CLASS, APP_ARGS) @@ -62,8 +62,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") .set("spark.driver.cores", "2") .set(KUBERNETES_DRIVER_LIMIT_CORES, "4") - .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "256M") - .set(org.apache.spark.internal.config.DRIVER_MEMORY_OVERHEAD, 200L) + .set(DRIVER_MEMORY.key, "256M") + .set(DRIVER_MEMORY_OVERHEAD, 200L) .set(CONTAINER_IMAGE, "spark-driver:latest") .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS.mkString(",")) val kubernetesConf = KubernetesConf( @@ -77,7 +77,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Map.empty, DRIVER_ENVS, Nil, - Seq.empty[String], hadoopConfSpec = None) val featureStep = new BasicDriverFeatureStep(kubernetesConf) @@ -130,21 +129,22 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod", "spark.app.id" -> APP_ID, KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX, - "spark.kubernetes.submitInDriver" -> "true") + "spark.kubernetes.submitInDriver" -> "true", + MEMORY_OVERHEAD_FACTOR.key -> MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString) assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf) } test("Check appropriate entrypoint rerouting for various bindings") { val javaSparkConf = new SparkConf() - .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "4g") + .set(DRIVER_MEMORY.key, "4g") .set(CONTAINER_IMAGE, "spark-driver:latest") val pythonSparkConf = new SparkConf() - .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "4g") + .set(DRIVER_MEMORY.key, "4g") .set(CONTAINER_IMAGE, "spark-driver-py:latest") val javaKubernetesConf = KubernetesConf( javaSparkConf, KubernetesDriverSpecificConf( - Some(JavaMainAppResource("")), + JavaMainAppResource(None), APP_NAME, PY_MAIN_CLASS, APP_ARGS), @@ -156,13 +156,12 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Map.empty, DRIVER_ENVS, Nil, - Seq.empty[String], hadoopConfSpec = None) val pythonKubernetesConf = KubernetesConf( pythonSparkConf, KubernetesDriverSpecificConf( - Some(PythonMainAppResource("")), + PythonMainAppResource(""), APP_NAME, PY_MAIN_CLASS, APP_ARGS), @@ -174,7 +173,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Map.empty, DRIVER_ENVS, Nil, - Seq.empty[String], hadoopConfSpec = None) val javaFeatureStep = new BasicDriverFeatureStep(javaKubernetesConf) val pythonFeatureStep = new BasicDriverFeatureStep(pythonKubernetesConf) @@ -204,7 +202,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Map.empty, DRIVER_ENVS, Nil, - allFiles, hadoopConfSpec = None) val step = new BasicDriverFeatureStep(kubernetesConf) @@ -215,10 +212,52 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX, "spark.kubernetes.submitInDriver" -> "true", "spark.jars" -> "/opt/spark/jar1.jar,hdfs:///opt/spark/jar2.jar", - "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt") + "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt", + MEMORY_OVERHEAD_FACTOR.key -> MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString) assert(additionalProperties === expectedSparkConf) } + // Memory overhead tests. Tuples are: + // test name, main resource, overhead factor, expected factor + Seq( + ("java", JavaMainAppResource(None), None, MEMORY_OVERHEAD_FACTOR.defaultValue.get), + ("python default", PythonMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR), + ("python w/ override", PythonMainAppResource(null), Some(0.9d), 0.9d), + ("r default", RMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR) + ).foreach { case (name, resource, factor, expectedFactor) => + test(s"memory overhead factor: $name") { + // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB + val driverMem = MEMORY_OVERHEAD_MIN_MIB / MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 + + // main app resource, overhead factor + val sparkConf = new SparkConf(false) + .set(CONTAINER_IMAGE, "spark-driver:latest") + .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m") + factor.foreach { value => sparkConf.set(MEMORY_OVERHEAD_FACTOR, value) } + val driverConf = emptyDriverSpecificConf.copy(mainAppResource = resource) + val conf = KubernetesConf( + sparkConf, + driverConf, + RESOURCE_NAME_PREFIX, + APP_ID, + DRIVER_LABELS, + DRIVER_ANNOTATIONS, + Map.empty, + Map.empty, + DRIVER_ENVS, + Nil, + hadoopConfSpec = None) + val step = new BasicDriverFeatureStep(conf) + val pod = step.configurePod(SparkPod.initialPod()) + val mem = pod.container.getResources.getRequests.get("memory").getAmount() + val expected = (driverMem + driverMem * expectedFactor).toInt + assert(mem === s"${expected}Mi") + + val systemProperties = step.getAdditionalPodSystemProperties() + assert(systemProperties(MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString) + } + } + def containerPort(name: String, portNumber: Int): ContainerPort = new ContainerPortBuilder() .withName(name) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 41f34bd45cd5b..e9a16aab6ccc2 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -91,7 +91,6 @@ class BasicExecutorFeatureStepSuite Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) val executor = step.configurePod(SparkPod.initialPod()) @@ -132,7 +131,6 @@ class BasicExecutorFeatureStepSuite Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63) } @@ -154,7 +152,6 @@ class BasicExecutorFeatureStepSuite Map.empty, Map("qux" -> "quux"), Nil, - Seq.empty[String], hadoopConfSpec = None)) val executor = step.configurePod(SparkPod.initialPod()) @@ -182,7 +179,6 @@ class BasicExecutorFeatureStepSuite Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) val executor = step.configurePod(SparkPod.initialPod()) // This is checking that basic executor + executorMemory = 1408 + 42 = 1450 diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala new file mode 100644 index 0000000000000..30672952aaf6f --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit._ +import org.apache.spark.util.Utils + +class DriverCommandFeatureStepSuite extends SparkFunSuite { + + private val MAIN_CLASS = "mainClass" + + test("java resource") { + val mainResource = "local:///main.jar" + val spec = applyFeatureStep( + JavaMainAppResource(Some(mainResource)), + appArgs = Array("5", "7")) + assert(spec.pod.container.getArgs.asScala === List( + "driver", + "--properties-file", SPARK_CONF_PATH, + "--class", MAIN_CLASS, + "spark-internal", "5", "7")) + + val jars = Utils.stringToSeq(spec.systemProperties("spark.jars")) + assert(jars.toSet === Set(mainResource)) + } + + test("python resource with no extra files") { + val mainResource = "local:///main.py" + val sparkConf = new SparkConf(false) + .set(PYSPARK_MAJOR_PYTHON_VERSION, "3") + + val spec = applyFeatureStep( + PythonMainAppResource(mainResource), + conf = sparkConf) + assert(spec.pod.container.getArgs.asScala === List( + "driver", + "--properties-file", SPARK_CONF_PATH, + "--class", MAIN_CLASS, + "/main.py")) + val envs = spec.pod.container.getEnv.asScala + .map { env => (env.getName, env.getValue) } + .toMap + assert(envs(ENV_PYSPARK_MAJOR_PYTHON_VERSION) === "3") + + val files = Utils.stringToSeq(spec.systemProperties("spark.files")) + assert(files.toSet === Set(mainResource)) + } + + test("python resource with extra files") { + val expectedMainResource = "/main.py" + val expectedPySparkFiles = "/example2.py:/example3.py" + val filesInConf = Set("local:///example.py") + + val mainResource = s"local://$expectedMainResource" + val pyFiles = Seq("local:///example2.py", "local:///example3.py") + + val sparkConf = new SparkConf(false) + .set("spark.files", filesInConf.mkString(",")) + .set(PYSPARK_MAJOR_PYTHON_VERSION, "2") + val spec = applyFeatureStep( + PythonMainAppResource(mainResource), + conf = sparkConf, + appArgs = Array("5", "7", "9"), + pyFiles = pyFiles) + + assert(spec.pod.container.getArgs.asScala === List( + "driver", + "--properties-file", SPARK_CONF_PATH, + "--class", MAIN_CLASS, + "/main.py", "5", "7", "9")) + + val envs = spec.pod.container.getEnv.asScala + .map { env => (env.getName, env.getValue) } + .toMap + val expected = Map( + ENV_PYSPARK_FILES -> expectedPySparkFiles, + ENV_PYSPARK_MAJOR_PYTHON_VERSION -> "2") + assert(envs === expected) + + val files = Utils.stringToSeq(spec.systemProperties("spark.files")) + assert(files.toSet === pyFiles.toSet ++ filesInConf ++ Set(mainResource)) + } + + test("R resource") { + val expectedMainResource = "/main.R" + val mainResource = s"local://$expectedMainResource" + + val spec = applyFeatureStep( + RMainAppResource(mainResource), + appArgs = Array("5", "7", "9")) + + assert(spec.pod.container.getArgs.asScala === List( + "driver", + "--properties-file", SPARK_CONF_PATH, + "--class", MAIN_CLASS, + "/main.R", "5", "7", "9")) + } + + private def applyFeatureStep( + resource: MainAppResource, + conf: SparkConf = new SparkConf(false), + appArgs: Array[String] = Array(), + pyFiles: Seq[String] = Nil): KubernetesDriverSpec = { + val driverConf = new KubernetesDriverSpecificConf( + resource, MAIN_CLASS, "appName", appArgs, pyFiles = pyFiles) + val kubernetesConf = KubernetesConf( + conf, + driverConf, + "resource-prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + hadoopConfSpec = None) + val step = new DriverCommandFeatureStep(kubernetesConf) + val pod = step.configurePod(SparkPod.initialPod()) + val props = step.getAdditionalPodSystemProperties() + KubernetesDriverSpec(pod, Nil, props) + } + +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala index 8675ceb48cf6d..36c6616a87b0a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala @@ -62,7 +62,6 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD) @@ -95,7 +94,6 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) @@ -135,7 +133,6 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) val resolvedProperties = kubernetesCredentialsStep.getAdditionalPodSystemProperties() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala index 5c3e801501513..3c46667c3042e 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource import org.apache.spark.util.Clock class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { @@ -59,7 +60,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), SHORT_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -68,7 +69,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod()) assert(configurationStep.getAdditionalKubernetesResources().size === 1) @@ -92,7 +92,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080) .set(KUBERNETES_NAMESPACE, "my-namespace"), KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), SHORT_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -101,7 +101,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX + DriverServiceFeatureStep.DRIVER_SVC_POSTFIX @@ -115,7 +114,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), SHORT_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -124,7 +123,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) val resolvedService = configurationStep .getAdditionalKubernetesResources() @@ -147,7 +145,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { KubernetesConf( sparkConf.set(KUBERNETES_NAMESPACE, "my-namespace"), KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), LONG_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -156,7 +154,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None), clock) val driverService = configurationStep @@ -176,7 +173,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { KubernetesConf( sparkConf.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, "host"), KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), LONG_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -185,7 +182,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None), clock) fail("The driver bind address should not be allowed.") @@ -203,7 +199,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), LONG_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -212,7 +208,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None), clock) fail("The driver host address should not be allowed.") diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala index 43796b77efdc7..3d253079c3ce7 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala @@ -46,7 +46,6 @@ class EnvSecretsFeatureStepSuite extends SparkFunSuite{ envVarsToKeys, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) val step = new EnvSecretsFeatureStep(kubernetesConf) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala index 3a4e60547d7f2..894d824999aac 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala @@ -24,6 +24,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { private val defaultLocalDir = "/var/data/default-local-dir" @@ -36,7 +37,7 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { kubernetesConf = KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "app-name", "main", Seq.empty), @@ -48,7 +49,6 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala index 18e3d773f690d..1555f6a9c6527 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala @@ -44,7 +44,6 @@ class MountSecretsFeatureStepSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) val step = new MountSecretsFeatureStep(kubernetesConf) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index 0d0a5fb951f64..2a957460ca8e0 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -18,13 +18,14 @@ package org.apache.spark.deploy.k8s.features import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource class MountVolumesFeatureStepSuite extends SparkFunSuite { private val sparkConf = new SparkConf(false) private val emptyKubernetesConf = KubernetesConf( sparkConf = sparkConf, roleSpecificConf = KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "app-name", "main", Seq.empty), @@ -36,7 +37,6 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { roleSecretEnvNamesToKeyRefs = Map.empty, roleEnvs = Map.empty, roleVolumes = Nil, - sparkFiles = Nil, hadoopConfSpec = None) test("Mounts hostPath volumes") { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala index d7bbbd121af72..370948c9502e4 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala @@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter { private var sparkConf: SparkConf = _ @@ -36,7 +37,7 @@ class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter { kubernetesConf = KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "app-name", "main", Seq.empty), @@ -48,7 +49,6 @@ class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], Option.empty) templateFile = Files.createTempFile("pod-template", "yml").toFile templateFile.deleteOnExit() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala deleted file mode 100644 index 9172e0c3dc408..0000000000000 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.bindings - -import scala.collection.JavaConverters._ - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.submit.PythonMainAppResource - -class JavaDriverFeatureStepSuite extends SparkFunSuite { - - test("Java Step modifies container correctly") { - val baseDriverPod = SparkPod.initialPod() - val sparkConf = new SparkConf(false) - val kubernetesConf = KubernetesConf( - sparkConf, - KubernetesDriverSpecificConf( - Some(PythonMainAppResource("local:///main.jar")), - "test-class", - "java-runner", - Seq("5 7")), - appResourceNamePrefix = "", - appId = "", - roleLabels = Map.empty, - roleAnnotations = Map.empty, - roleSecretNamesToMountPaths = Map.empty, - roleSecretEnvNamesToKeyRefs = Map.empty, - roleEnvs = Map.empty, - roleVolumes = Nil, - sparkFiles = Seq.empty[String], - hadoopConfSpec = None) - - val step = new JavaDriverFeatureStep(kubernetesConf) - val driverPod = step.configurePod(baseDriverPod).pod - val driverContainerwithJavaStep = step.configurePod(baseDriverPod).container - assert(driverContainerwithJavaStep.getArgs.size === 7) - val args = driverContainerwithJavaStep - .getArgs.asScala - assert(args === List( - "driver", - "--properties-file", SPARK_CONF_PATH, - "--class", "test-class", - "spark-internal", "5 7")) - } -} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala deleted file mode 100644 index 2bcc6465b79d6..0000000000000 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.bindings - -import scala.collection.JavaConverters._ - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} -import org.apache.spark.deploy.k8s.Config._ -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.submit.PythonMainAppResource - -class PythonDriverFeatureStepSuite extends SparkFunSuite { - - test("Python Step modifies container correctly") { - val expectedMainResource = "/main.py" - val mainResource = "local:///main.py" - val pyFiles = Seq("local:///example2.py", "local:///example3.py") - val expectedPySparkFiles = - "/example2.py:/example3.py" - val baseDriverPod = SparkPod.initialPod() - val sparkConf = new SparkConf(false) - .set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, mainResource) - .set(KUBERNETES_PYSPARK_PY_FILES, pyFiles.mkString(",")) - .set("spark.files", "local:///example.py") - .set(PYSPARK_MAJOR_PYTHON_VERSION, "2") - val kubernetesConf = KubernetesConf( - sparkConf, - KubernetesDriverSpecificConf( - Some(PythonMainAppResource("local:///main.py")), - "test-app", - "python-runner", - Seq("5", "7", "9")), - appResourceNamePrefix = "", - appId = "", - roleLabels = Map.empty, - roleAnnotations = Map.empty, - roleSecretNamesToMountPaths = Map.empty, - roleSecretEnvNamesToKeyRefs = Map.empty, - roleEnvs = Map.empty, - roleVolumes = Nil, - sparkFiles = Seq.empty[String], - hadoopConfSpec = None) - - val step = new PythonDriverFeatureStep(kubernetesConf) - val driverPod = step.configurePod(baseDriverPod).pod - val driverContainerwithPySpark = step.configurePod(baseDriverPod).container - assert(driverContainerwithPySpark.getEnv.size === 4) - val envs = driverContainerwithPySpark - .getEnv - .asScala - .map(env => (env.getName, env.getValue)) - .toMap - assert(envs(ENV_PYSPARK_PRIMARY) === expectedMainResource) - assert(envs(ENV_PYSPARK_FILES) === expectedPySparkFiles) - assert(envs(ENV_PYSPARK_ARGS) === "5 7 9") - assert(envs(ENV_PYSPARK_MAJOR_PYTHON_VERSION) === "2") - } - test("Python Step testing empty pyfiles") { - val mainResource = "local:///main.py" - val baseDriverPod = SparkPod.initialPod() - val sparkConf = new SparkConf(false) - .set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, mainResource) - .set(PYSPARK_MAJOR_PYTHON_VERSION, "3") - val kubernetesConf = KubernetesConf( - sparkConf, - KubernetesDriverSpecificConf( - Some(PythonMainAppResource("local:///main.py")), - "test-class-py", - "python-runner", - Seq.empty[String]), - appResourceNamePrefix = "", - appId = "", - roleLabels = Map.empty, - roleAnnotations = Map.empty, - roleSecretNamesToMountPaths = Map.empty, - roleSecretEnvNamesToKeyRefs = Map.empty, - roleEnvs = Map.empty, - roleVolumes = Nil, - sparkFiles = Seq.empty[String], - hadoopConfSpec = None) - val step = new PythonDriverFeatureStep(kubernetesConf) - val driverContainerwithPySpark = step.configurePod(baseDriverPod).container - val args = driverContainerwithPySpark - .getArgs.asScala - assert(driverContainerwithPySpark.getArgs.size === 5) - assert(args === List( - "driver-py", - "--properties-file", SPARK_CONF_PATH, - "--class", "test-class-py")) - val envs = driverContainerwithPySpark - .getEnv - .asScala - .map(env => (env.getName, env.getValue)) - .toMap - assert(envs(ENV_PYSPARK_MAJOR_PYTHON_VERSION) === "3") - } -} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala deleted file mode 100644 index 17af6011a17d5..0000000000000 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.bindings - -import scala.collection.JavaConverters._ - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} -import org.apache.spark.deploy.k8s.Config._ -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.submit.RMainAppResource - -class RDriverFeatureStepSuite extends SparkFunSuite { - - test("R Step modifies container correctly") { - val expectedMainResource = "/main.R" - val mainResource = "local:///main.R" - val baseDriverPod = SparkPod.initialPod() - val sparkConf = new SparkConf(false) - .set(KUBERNETES_R_MAIN_APP_RESOURCE, mainResource) - val kubernetesConf = KubernetesConf( - sparkConf, - KubernetesDriverSpecificConf( - Some(RMainAppResource(mainResource)), - "test-app", - "r-runner", - Seq("5", "7", "9")), - appResourceNamePrefix = "", - appId = "", - roleLabels = Map.empty, - roleAnnotations = Map.empty, - roleSecretNamesToMountPaths = Map.empty, - roleSecretEnvNamesToKeyRefs = Map.empty, - roleEnvs = Map.empty, - roleVolumes = Seq.empty, - sparkFiles = Seq.empty[String], - hadoopConfSpec = None) - - val step = new RDriverFeatureStep(kubernetesConf) - val driverContainerwithR = step.configurePod(baseDriverPod).container - assert(driverContainerwithR.getEnv.size === 2) - val envs = driverContainerwithR - .getEnv - .asScala - .map(env => (env.getName, env.getValue)) - .toMap - assert(envs(ENV_R_PRIMARY) === expectedMainResource) - assert(envs(ENV_R_ARGS) === "5 7 9") - } -} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index ae13df39b7a76..81e3822389f30 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Fabric8Aliases._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource class ClientSuite extends SparkFunSuite with BeforeAndAfter { @@ -133,7 +134,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { sparkConf = new SparkConf(false) kubernetesConf = KubernetesConf[KubernetesDriverSpecificConf]( sparkConf, - KubernetesDriverSpecificConf(None, MAIN_CLASS, APP_NAME, APP_ARGS), + KubernetesDriverSpecificConf(JavaMainAppResource(None), MAIN_CLASS, APP_NAME, APP_ARGS), KUBERNETES_RESOURCE_PREFIX, APP_ID, Map.empty, @@ -142,7 +143,6 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) when(driverBuilder.buildFromFeatures(kubernetesConf)).thenReturn(BUILT_KUBERNETES_SPEC) when(kubernetesClient.pods()).thenReturn(podOperations) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index 84968c3523fc0..fe900fda6e545 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -24,7 +24,6 @@ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config.{CONTAINER_IMAGE, KUBERNETES_DRIVER_PODTEMPLATE_FILE, KUBERNETES_EXECUTOR_PODTEMPLATE_FILE} import org.apache.spark.deploy.k8s.features._ -import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep} class KubernetesDriverBuilderSuite extends SparkFunSuite { @@ -33,9 +32,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val SERVICE_STEP_TYPE = "service" private val LOCAL_DIRS_STEP_TYPE = "local-dirs" private val SECRETS_STEP_TYPE = "mount-secrets" - private val JAVA_STEP_TYPE = "java-bindings" - private val R_STEP_TYPE = "r-bindings" - private val PYSPARK_STEP_TYPE = "pyspark-bindings" + private val DRIVER_CMD_STEP_TYPE = "driver-command" private val ENV_SECRETS_STEP_TYPE = "env-secrets" private val HADOOP_GLOBAL_STEP_TYPE = "hadoop-global" private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes" @@ -56,14 +53,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val secretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep]) - private val javaStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - JAVA_STEP_TYPE, classOf[JavaDriverFeatureStep]) - - private val pythonStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - PYSPARK_STEP_TYPE, classOf[PythonDriverFeatureStep]) - - private val rStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - R_STEP_TYPE, classOf[RDriverFeatureStep]) + private val driverCommandStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( + DRIVER_CMD_STEP_TYPE, classOf[DriverCommandFeatureStep]) private val envSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep]) @@ -87,9 +78,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { _ => envSecretsStep, _ => localDirsStep, _ => mountVolumesStep, - _ => pythonStep, - _ => rStep, - _ => javaStep, + _ => driverCommandStep, _ => hadoopGlobalStep, _ => templateVolumeStep) @@ -97,7 +86,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val conf = KubernetesConf( new SparkConf(false), KubernetesDriverSpecificConf( - Some(JavaMainAppResource("example.jar")), + JavaMainAppResource(Some("example.jar")), "test-app", "main", Seq.empty), @@ -109,7 +98,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -117,14 +105,14 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { CREDENTIALS_STEP_TYPE, SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, - JAVA_STEP_TYPE) + DRIVER_CMD_STEP_TYPE) } test("Apply secrets step if secrets are present.") { val conf = KubernetesConf( new SparkConf(false), KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "test-app", "main", Seq.empty), @@ -136,7 +124,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map("EnvName" -> "SecretName:secretKey"), Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -146,61 +133,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { LOCAL_DIRS_STEP_TYPE, SECRETS_STEP_TYPE, ENV_SECRETS_STEP_TYPE, - JAVA_STEP_TYPE) - } - - test("Apply Java step if main resource is none.") { - val conf = KubernetesConf( - new SparkConf(false), - KubernetesDriverSpecificConf( - None, - "test-app", - "main", - Seq.empty), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - Seq.empty[String], - hadoopConfSpec = None) - validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf), - BASIC_STEP_TYPE, - CREDENTIALS_STEP_TYPE, - SERVICE_STEP_TYPE, - LOCAL_DIRS_STEP_TYPE, - JAVA_STEP_TYPE) - } - - test("Apply Python step if main resource is python.") { - val conf = KubernetesConf( - new SparkConf(false), - KubernetesDriverSpecificConf( - Some(PythonMainAppResource("example.py")), - "test-app", - "main", - Seq.empty), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - Seq.empty[String], - hadoopConfSpec = None) - validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf), - BASIC_STEP_TYPE, - CREDENTIALS_STEP_TYPE, - SERVICE_STEP_TYPE, - LOCAL_DIRS_STEP_TYPE, - PYSPARK_STEP_TYPE) + DRIVER_CMD_STEP_TYPE) } test("Apply volumes step if mounts are present.") { @@ -212,7 +145,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val conf = KubernetesConf( new SparkConf(false), KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "test-app", "main", Seq.empty), @@ -224,7 +157,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, volumeSpec :: Nil, - Seq.empty[String], hadoopConfSpec = None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -233,34 +165,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, MOUNT_VOLUMES_STEP_TYPE, - JAVA_STEP_TYPE) - } - - test("Apply R step if main resource is R.") { - val conf = KubernetesConf( - new SparkConf(false), - KubernetesDriverSpecificConf( - Some(RMainAppResource("example.R")), - "test-app", - "main", - Seq.empty), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - Seq.empty[String], - hadoopConfSpec = None) - validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf), - BASIC_STEP_TYPE, - CREDENTIALS_STEP_TYPE, - SERVICE_STEP_TYPE, - LOCAL_DIRS_STEP_TYPE, - R_STEP_TYPE) + DRIVER_CMD_STEP_TYPE) } test("Apply template volume step if executor template is present.") { @@ -270,7 +175,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val conf = KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - Some(JavaMainAppResource("example.jar")), + JavaMainAppResource(Some("example.jar")), "test-app", "main", Seq.empty), @@ -282,7 +187,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], Option.empty) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -290,7 +194,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { CREDENTIALS_STEP_TYPE, SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, - JAVA_STEP_TYPE, + DRIVER_CMD_STEP_TYPE, TEMPLATE_VOLUME_STEP_TYPE) } @@ -298,7 +202,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val conf = KubernetesConf( new SparkConf(false), KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "test-app", "main", Seq.empty), @@ -310,7 +214,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = Some( HadoopConfSpec( Some("/var/hadoop-conf"), @@ -321,7 +224,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { CREDENTIALS_STEP_TYPE, SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, - JAVA_STEP_TYPE, + DRIVER_CMD_STEP_TYPE, HADOOP_GLOBAL_STEP_TYPE) } @@ -329,7 +232,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val conf = KubernetesConf( new SparkConf(false), KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "test-app", "main", Seq.empty), @@ -341,7 +244,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = Some( HadoopConfSpec( None, @@ -352,7 +254,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { CREDENTIALS_STEP_TYPE, SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, - JAVA_STEP_TYPE, + DRIVER_CMD_STEP_TYPE, HADOOP_GLOBAL_STEP_TYPE) } @@ -381,7 +283,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val kubernetesConf = new KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - Some(JavaMainAppResource("example.jar")), + JavaMainAppResource(Some("example.jar")), "test-app", "main", Seq.empty), @@ -393,7 +295,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], Option.empty) val driverSpec = KubernetesDriverBuilder .apply(kubernetesClient, sparkConf) @@ -414,7 +315,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val kubernetesConf = new KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - Some(JavaMainAppResource("example.jar")), + JavaMainAppResource(Some("example.jar")), "test-app", "main", Seq.empty), @@ -426,7 +327,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], Option.empty) val exception = intercept[SparkException] { KubernetesDriverBuilder diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index fb2509fc1bda5..1fea08c37ccc6 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -76,7 +76,6 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE) @@ -95,7 +94,6 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map("secret-name" -> "secret-key"), Map.empty, Nil, - Seq.empty[String], None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -123,7 +121,6 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, volumeSpec :: Nil, - Seq.empty[String], None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -152,7 +149,6 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], Some(HadoopConfSpec(Some("/var/hadoop-conf"), None))) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -180,7 +176,6 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], Some(HadoopConfSpec(None, Some("pre-defined-onfigMapName")))) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -225,7 +220,6 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], Option.empty) val sparkPod = KubernetesExecutorBuilder .apply(kubernetesClient, sparkConf) diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index 4958b7363fee0..2b2a4e4cf6bcc 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -96,22 +96,6 @@ case "$SPARK_K8S_CMD" in "$@" ) ;; - driver-py) - CMD=( - "$SPARK_HOME/bin/spark-submit" - --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" - --deploy-mode client - "$@" $PYSPARK_PRIMARY $PYSPARK_ARGS - ) - ;; - driver-r) - CMD=( - "$SPARK_HOME/bin/spark-submit" - --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" - --deploy-mode client - "$@" $R_PRIMARY $R_ARGS - ) - ;; executor) CMD=( ${JAVA_HOME}/bin/java