Skip to content

Commit

Permalink
Python arguments patch + tests + docs
Browse files Browse the repository at this point in the history
  • Loading branch information
ifilonenko committed Jun 30, 2017
1 parent e103225 commit 4533df2
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ private[spark] object ClientArguments {
mainAppResource = Some(PythonMainAppResource(mainPyFile))
case Array("--primary-java-resource", primaryJavaResource: String) =>
mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
case Array("--main-class", clazz: String) =>
mainClass = Some(clazz)
case Array("--main-class", m_class: String) =>
mainClass = Some(m_class)
case Array("--other-py-files", pyFiles: String) =>
otherPyFiles = pyFiles.split(",")
case Array("--arg", arg: String) =>
Expand Down Expand Up @@ -77,7 +77,13 @@ private[spark] class Client(

private val driverJavaOptions = submissionSparkConf.get(
org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)

/**
* Run command that initalizes a DriverSpec that will be updated
* after each KubernetesSubmissionStep in the sequence that is passed in.
* The final driver-spec will be used to build the Driver Container,
* Driver Pod, and Kubernetes Resources
*
*/
def run(): Unit = {
var currentDriverSpec = new KubernetesDriverSpec(
driverPod = new PodBuilder().build(),
Expand Down Expand Up @@ -146,6 +152,8 @@ private[spark] object Client {
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
val master = resolveK8sMaster(sparkConf.get("spark.master"))
// This orchestrator determines which steps are necessary to take to resolve varying
// client arguments that are passed in. Use cases include: Scala/Java and Python submission
val submissionStepsOrchestrator = new KubernetesSubmissionStepsOrchestrator(
namespace,
kubernetesAppId,
Expand Down Expand Up @@ -177,7 +185,13 @@ private[spark] object Client {
loggingPodStatusWatcher).run()
}
}

/**
* Entry point from SparkSubmit in spark-core
*
*
* @param args Array of strings that have interchanging values that will be
* parsed by ClientArguments with the identifiers that preceed the values
*/
def main(args: Array[String]): Unit = {
val parsedArguments = ClientArguments.fromCommandLineArgs(args)
val sparkConf = new SparkConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.ConfigurationUtils
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseSubmissionStep, DependencyResolutionStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, KubernetesSubmissionStep, PythonStep}
import org.apache.spark.deploy.kubernetes.submit.submitsteps._
import org.apache.spark.deploy.kubernetes.submit.submitsteps.initcontainer.InitContainerStepsOrchestrator
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -91,10 +91,10 @@ private[spark] class KubernetesSubmissionStepsOrchestrator(
submissionSparkConf)
val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
submissionSparkConf, kubernetesAppId)
val pythonStep = mainAppResource match {
val pythonResolverStep = mainAppResource match {
case PythonMainAppResource(mainPyResource) =>
Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath))
case _ => Option.empty[KubernetesSubmissionStep]
Option(new PythonStep(mainPyResource, additionalPythonFiles, appArgs, filesDownloadPath))
case _ => Option(new NonPythonArgumentResolver(appArgs))
}
val initContainerBootstrapStep = if ((sparkJars ++ sparkFiles).exists { uri =>
Option(Utils.resolveURI(uri).getScheme).getOrElse("file") != "local"
Expand Down Expand Up @@ -130,6 +130,6 @@ private[spark] class KubernetesSubmissionStepsOrchestrator(
kubernetesCredentialsStep,
dependencyResolutionStep) ++
initContainerBootstrapStep.toSeq ++
pythonStep.toSeq
pythonResolverStep.toSeq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ private[spark] class BaseSubmissionStep(
.withName(ENV_DRIVER_MAIN_CLASS)
.withValue(mainClass)
.endEnv()
.addNewEnv()
.withName(ENV_DRIVER_ARGS)
.withValue(appArgs.mkString(" "))
.endEnv()
.withNewResources()
.addToRequests("cpu", driverCpuQuantity)
.addToRequests("memory", driverMemoryQuantity)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.submitsteps

import io.fabric8.kubernetes.api.model.ContainerBuilder

import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.KubernetesFileUtils

private[spark] class NonPythonArgumentResolver(
appArgs: Array[String]) extends KubernetesSubmissionStep {

override def prepareSubmission(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val withNonPythonArgumentsResolvedContainer = new ContainerBuilder(driverSpec.driverContainer)
.addNewEnv()
.withName(ENV_DRIVER_ARGS)
.withValue(appArgs.mkString(" "))
.endEnv()
driverSpec.copy(driverContainer = withNonPythonArgumentsResolvedContainer.build())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,32 @@
*/
package org.apache.spark.deploy.kubernetes.submit.submitsteps

import io.fabric8.kubernetes.api.model.ContainerBuilder
import org.apache.spark.internal.Logging

import io.fabric8.kubernetes.api.model.ContainerBuilder
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.KubernetesFileUtils


private[spark] class PythonStep(
primaryPyFile: String,
otherPyFiles: Seq[String],
appArgs: Array[String],
filesDownloadPath: String) extends KubernetesSubmissionStep {

override def prepareSubmission(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val arguments : Array[String] = otherPyFiles.toList match {
case Nil => null +: appArgs
case a :: b => a match {
case _ if a == "" && b == Nil => null +: appArgs
case _ => appArgs
}
}
val withPythonPrimaryFileContainer = new ContainerBuilder(driverSpec.driverContainer)
.addNewEnv()
.withName(ENV_DRIVER_ARGS)
.withValue(arguments.mkString(" "))
.endEnv()
.addNewEnv()
.withName(ENV_PYSPARK_PRIMARY)
.withValue(KubernetesFileUtils.resolveFilePath(primaryPyFile, filesDownloadPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
kubernetesTestComponents.deleteNamespace()
}

test("Run PySpark Job on file from SUBMITTER") {
test("Run PySpark Job on file from SUBMITTER with --py-files") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

launchStagingServer(SSLOptions(), None)
Expand All @@ -82,7 +82,10 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
.set(EXECUTOR_DOCKER_IMAGE,
System.getProperty("spark.docker.test.executorImage", "spark-executor-py:latest"))

runPySparkPiAndVerifyCompletion(PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION, Seq.empty[String])
runPySparkPiAndVerifyCompletion(
PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION,
Seq(PYSPARK_SORT_CONTAINER_LOCAL_FILE_LOCATION)
)
}

test("Run PySpark Job on file from CONTAINER with spark.jar defined") {
Expand Down Expand Up @@ -154,7 +157,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
runSparkApplicationAndVerifyCompletion(
JavaMainAppResource(SUBMITTER_LOCAL_MAIN_APP_RESOURCE),
GROUP_BY_MAIN_CLASS,
"The Result is",
Array("The Result is"),
Array.empty[String],
Seq.empty[String])
}
Expand Down Expand Up @@ -218,7 +221,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
runSparkApplicationAndVerifyCompletion(
JavaMainAppResource(SUBMITTER_LOCAL_MAIN_APP_RESOURCE),
FILE_EXISTENCE_MAIN_CLASS,
s"File found at /opt/spark/${testExistenceFile.getName} with correct contents.",
Array(s"File found at /opt/spark/${testExistenceFile.getName} with correct contents."),
Array(testExistenceFile.getName, TEST_EXISTENCE_FILE_CONTENTS),
Seq.empty[String])
}
Expand Down Expand Up @@ -250,7 +253,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
runSparkApplicationAndVerifyCompletion(
JavaMainAppResource(appResource),
SPARK_PI_MAIN_CLASS,
"Pi is roughly 3",
Array("Pi is roughly 3"),
Array.empty[String],
Seq.empty[String])
}
Expand All @@ -260,15 +263,15 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
runSparkApplicationAndVerifyCompletion(
PythonMainAppResource(appResource),
PYSPARK_PI_MAIN_CLASS,
"Pi is roughly 3",
Array("5"),
Array("(10/10)", "Pi is roughly 3"),
Array("10"),
otherPyFiles)
}

private def runSparkApplicationAndVerifyCompletion(
appResource: MainAppResource,
mainClass: String,
expectedLogOnCompletion: String,
expectedLogOnCompletion: Array[String],
appArgs: Array[String],
otherPyFiles: Seq[String]): Unit = {
val clientArguments = ClientArguments(
Expand All @@ -284,11 +287,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
.getItems
.get(0)
Eventually.eventually(TIMEOUT, INTERVAL) {
assert(kubernetesTestComponents.kubernetesClient
.pods()
.withName(driverPod.getMetadata.getName)
.getLog
.contains(expectedLogOnCompletion), "The application did not complete.")
expectedLogOnCompletion.foreach { e =>
assert(kubernetesTestComponents.kubernetesClient
.pods()
.withName(driverPod.getMetadata.getName)
.getLog
.contains(e), "The application did not complete.")
}
}
}

Expand Down Expand Up @@ -357,6 +362,8 @@ private[spark] object KubernetesSuite {
val PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION =
"local:///opt/spark/examples/src/main/python/pi.py"
val PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION = "src/test/python/pi.py"
val PYSPARK_SORT_CONTAINER_LOCAL_FILE_LOCATION =
"local:///opt/spark/examples/src/main/python/sort.py"
val FILE_EXISTENCE_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" +
".integrationtest.jobs.FileExistenceTest"
val GROUP_BY_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" +
Expand Down

0 comments on commit 4533df2

Please sign in to comment.