diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index e5b753d458b34..1776210200ed6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -47,8 +47,8 @@ private[spark] object ClientArguments { mainAppResource = Some(PythonMainAppResource(mainPyFile)) case Array("--primary-java-resource", primaryJavaResource: String) => mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) - case Array("--main-class", clazz: String) => - mainClass = Some(clazz) + case Array("--main-class", m_class: String) => + mainClass = Some(m_class) case Array("--other-py-files", pyFiles: String) => otherPyFiles = pyFiles.split(",") case Array("--arg", arg: String) => @@ -77,7 +77,13 @@ private[spark] class Client( private val driverJavaOptions = submissionSparkConf.get( org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) - + /** + * Run command that initalizes a DriverSpec that will be updated + * after each KubernetesSubmissionStep in the sequence that is passed in. + * The final driver-spec will be used to build the Driver Container, + * Driver Pod, and Kubernetes Resources + * + */ def run(): Unit = { var currentDriverSpec = new KubernetesDriverSpec( driverPod = new PodBuilder().build(), @@ -146,6 +152,8 @@ private[spark] object Client { val appName = sparkConf.getOption("spark.app.name").getOrElse("spark") val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}" val master = resolveK8sMaster(sparkConf.get("spark.master")) + // This orchestrator determines which steps are necessary to take to resolve varying + // client arguments that are passed in. Use cases include: Scala/Java and Python submission val submissionStepsOrchestrator = new KubernetesSubmissionStepsOrchestrator( namespace, kubernetesAppId, @@ -177,7 +185,13 @@ private[spark] object Client { loggingPodStatusWatcher).run() } } - + /** + * Entry point from SparkSubmit in spark-core + * + * + * @param args Array of strings that have interchanging values that will be + * parsed by ClientArguments with the identifiers that preceed the values + */ def main(args: Array[String]): Unit = { val parsedArguments = ClientArguments.fromCommandLineArgs(args) val sparkConf = new SparkConf() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/KubernetesSubmissionStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/KubernetesSubmissionStepsOrchestrator.scala index b0c98b2575607..9628632fae3d7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/KubernetesSubmissionStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/KubernetesSubmissionStepsOrchestrator.scala @@ -20,7 +20,7 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.ConfigurationUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseSubmissionStep, DependencyResolutionStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, KubernetesSubmissionStep, PythonStep} +import org.apache.spark.deploy.kubernetes.submit.submitsteps._ import org.apache.spark.deploy.kubernetes.submit.submitsteps.initcontainer.InitContainerStepsOrchestrator import org.apache.spark.launcher.SparkLauncher import org.apache.spark.util.Utils @@ -91,10 +91,10 @@ private[spark] class KubernetesSubmissionStepsOrchestrator( submissionSparkConf) val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep( submissionSparkConf, kubernetesAppId) - val pythonStep = mainAppResource match { + val pythonResolverStep = mainAppResource match { case PythonMainAppResource(mainPyResource) => - Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath)) - case _ => Option.empty[KubernetesSubmissionStep] + Option(new PythonStep(mainPyResource, additionalPythonFiles, appArgs, filesDownloadPath)) + case _ => Option(new NonPythonArgumentResolver(appArgs)) } val initContainerBootstrapStep = if ((sparkJars ++ sparkFiles).exists { uri => Option(Utils.resolveURI(uri).getScheme).getOrElse("file") != "local" @@ -130,6 +130,6 @@ private[spark] class KubernetesSubmissionStepsOrchestrator( kubernetesCredentialsStep, dependencyResolutionStep) ++ initContainerBootstrapStep.toSeq ++ - pythonStep.toSeq + pythonResolverStep.toSeq } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseSubmissionStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseSubmissionStep.scala index e49262c20c745..74c6de927568c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseSubmissionStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseSubmissionStep.scala @@ -89,10 +89,6 @@ private[spark] class BaseSubmissionStep( .withName(ENV_DRIVER_MAIN_CLASS) .withValue(mainClass) .endEnv() - .addNewEnv() - .withName(ENV_DRIVER_ARGS) - .withValue(appArgs.mkString(" ")) - .endEnv() .withNewResources() .addToRequests("cpu", driverCpuQuantity) .addToRequests("memory", driverMemoryQuantity) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/NonPythonArgumentResolver.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/NonPythonArgumentResolver.scala new file mode 100644 index 0000000000000..74f45fc58b8bd --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/NonPythonArgumentResolver.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit.submitsteps + +import io.fabric8.kubernetes.api.model.ContainerBuilder + +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.kubernetes.submit.KubernetesFileUtils + +private[spark] class NonPythonArgumentResolver( + appArgs: Array[String]) extends KubernetesSubmissionStep { + + override def prepareSubmission(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { + val withNonPythonArgumentsResolvedContainer = new ContainerBuilder(driverSpec.driverContainer) + .addNewEnv() + .withName(ENV_DRIVER_ARGS) + .withValue(appArgs.mkString(" ")) + .endEnv() + driverSpec.copy(driverContainer = withNonPythonArgumentsResolvedContainer.build()) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/PythonStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/PythonStep.scala index 484f57087b36e..dd0981c3802ad 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/PythonStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/PythonStep.scala @@ -16,18 +16,32 @@ */ package org.apache.spark.deploy.kubernetes.submit.submitsteps -import io.fabric8.kubernetes.api.model.ContainerBuilder +import org.apache.spark.internal.Logging +import io.fabric8.kubernetes.api.model.ContainerBuilder import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.deploy.kubernetes.submit.KubernetesFileUtils + private[spark] class PythonStep( primaryPyFile: String, otherPyFiles: Seq[String], + appArgs: Array[String], filesDownloadPath: String) extends KubernetesSubmissionStep { override def prepareSubmission(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { + val arguments : Array[String] = otherPyFiles.toList match { + case Nil => null +: appArgs + case a :: b => a match { + case _ if a == "" && b == Nil => null +: appArgs + case _ => appArgs + } + } val withPythonPrimaryFileContainer = new ContainerBuilder(driverSpec.driverContainer) + .addNewEnv() + .withName(ENV_DRIVER_ARGS) + .withValue(arguments.mkString(" ")) + .endEnv() .addNewEnv() .withName(ENV_PYSPARK_PRIMARY) .withValue(KubernetesFileUtils.resolveFilePath(primaryPyFile, filesDownloadPath)) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 650ec4feb6a2b..d1e6637ee2ea4 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -72,7 +72,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { kubernetesTestComponents.deleteNamespace() } - test("Run PySpark Job on file from SUBMITTER") { + test("Run PySpark Job on file from SUBMITTER with --py-files") { assume(testBackend.name == MINIKUBE_TEST_BACKEND) launchStagingServer(SSLOptions(), None) @@ -82,7 +82,10 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { .set(EXECUTOR_DOCKER_IMAGE, System.getProperty("spark.docker.test.executorImage", "spark-executor-py:latest")) - runPySparkPiAndVerifyCompletion(PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION, Seq.empty[String]) + runPySparkPiAndVerifyCompletion( + PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION, + Seq(PYSPARK_SORT_CONTAINER_LOCAL_FILE_LOCATION) + ) } test("Run PySpark Job on file from CONTAINER with spark.jar defined") { @@ -154,7 +157,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { runSparkApplicationAndVerifyCompletion( JavaMainAppResource(SUBMITTER_LOCAL_MAIN_APP_RESOURCE), GROUP_BY_MAIN_CLASS, - "The Result is", + Array("The Result is"), Array.empty[String], Seq.empty[String]) } @@ -218,7 +221,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { runSparkApplicationAndVerifyCompletion( JavaMainAppResource(SUBMITTER_LOCAL_MAIN_APP_RESOURCE), FILE_EXISTENCE_MAIN_CLASS, - s"File found at /opt/spark/${testExistenceFile.getName} with correct contents.", + Array(s"File found at /opt/spark/${testExistenceFile.getName} with correct contents."), Array(testExistenceFile.getName, TEST_EXISTENCE_FILE_CONTENTS), Seq.empty[String]) } @@ -250,7 +253,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { runSparkApplicationAndVerifyCompletion( JavaMainAppResource(appResource), SPARK_PI_MAIN_CLASS, - "Pi is roughly 3", + Array("Pi is roughly 3"), Array.empty[String], Seq.empty[String]) } @@ -260,15 +263,15 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { runSparkApplicationAndVerifyCompletion( PythonMainAppResource(appResource), PYSPARK_PI_MAIN_CLASS, - "Pi is roughly 3", - Array("5"), + Array("(10/10)", "Pi is roughly 3"), + Array("10"), otherPyFiles) } private def runSparkApplicationAndVerifyCompletion( appResource: MainAppResource, mainClass: String, - expectedLogOnCompletion: String, + expectedLogOnCompletion: Array[String], appArgs: Array[String], otherPyFiles: Seq[String]): Unit = { val clientArguments = ClientArguments( @@ -284,11 +287,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { .getItems .get(0) Eventually.eventually(TIMEOUT, INTERVAL) { - assert(kubernetesTestComponents.kubernetesClient - .pods() - .withName(driverPod.getMetadata.getName) - .getLog - .contains(expectedLogOnCompletion), "The application did not complete.") + expectedLogOnCompletion.foreach { e => + assert(kubernetesTestComponents.kubernetesClient + .pods() + .withName(driverPod.getMetadata.getName) + .getLog + .contains(e), "The application did not complete.") + } } } @@ -357,6 +362,8 @@ private[spark] object KubernetesSuite { val PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION = "local:///opt/spark/examples/src/main/python/pi.py" val PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION = "src/test/python/pi.py" + val PYSPARK_SORT_CONTAINER_LOCAL_FILE_LOCATION = + "local:///opt/spark/examples/src/main/python/sort.py" val FILE_EXISTENCE_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + ".integrationtest.jobs.FileExistenceTest" val GROUP_BY_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" +