diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 58aa6d951a204..24edc60684376 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -185,7 +185,6 @@ object SparkSubmit { if (clusterManager == STANDALONE) { val existingJars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq()) sysProps.put("spark.jars", (existingJars ++ Seq(appArgs.primaryResource)).mkString(",")) - println("SPARK JARS" + sysProps.get("spark.jars")) } if (deployOnCluster && clusterManager == STANDALONE) { diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index c545b093ac82e..58d9e9add764a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -21,14 +21,15 @@ import java.io.{File, FileInputStream, IOException} import java.util.Properties import scala.collection.JavaConversions._ -import scala.collection.mutable.{HashMap, ArrayBuffer} +import scala.collection.mutable.{ArrayBuffer, HashMap} import org.apache.spark.SparkException +import org.apache.spark.util.Utils /** * Parses and encapsulates arguments from the spark-submit script. */ -private[spark] class SparkSubmitArguments(args: Array[String]) { +private[spark] class SparkSubmitArguments(args: Seq[String]) { var master: String = null var deployMode: String = null var executorMemory: String = null @@ -118,8 +119,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) { if (master.startsWith("yarn")) { val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR") - val testing = sys.env.contains("SPARK_TESTING") - if (!hasHadoopEnv && !testing) { + if (!hasHadoopEnv && !Utils.isTesting) { throw new Exception(s"When running with master '$master' " + "either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.") } @@ -156,119 +156,121 @@ private[spark] class SparkSubmitArguments(args: Array[String]) { """.stripMargin } - private def parseOpts(opts: List[String]): Unit = opts match { - case ("--name") :: value :: tail => - name = value - parseOpts(tail) + /** Fill in values by parsing user options. */ + private def parseOpts(opts: Seq[String]): Unit = { + // Delineates parsing of Spark options from parsing of user options. + var inSparkOpts = true + parse(opts) - case ("--master") :: value :: tail => - master = value - parseOpts(tail) + def parse(opts: Seq[String]): Unit = opts match { + case ("--name") :: value :: tail => + name = value + parse(tail) - case ("--class") :: value :: tail => - mainClass = value - parseOpts(tail) + case ("--master") :: value :: tail => + master = value + parse(tail) - case ("--deploy-mode") :: value :: tail => - if (value != "client" && value != "cluster") { - SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"") - } - deployMode = value - parseOpts(tail) - - case ("--num-executors") :: value :: tail => - numExecutors = value - parseOpts(tail) - - case ("--total-executor-cores") :: value :: tail => - totalExecutorCores = value - parseOpts(tail) - - case ("--executor-cores") :: value :: tail => - executorCores = value - parseOpts(tail) - - case ("--executor-memory") :: value :: tail => - executorMemory = value - parseOpts(tail) - - case ("--driver-memory") :: value :: tail => - driverMemory = value - parseOpts(tail) - - case ("--driver-cores") :: value :: tail => - driverCores = value - parseOpts(tail) - - case ("--driver-class-path") :: value :: tail => - driverExtraClassPath = value - parseOpts(tail) - - case ("--driver-java-options") :: value :: tail => - driverExtraJavaOptions = value - parseOpts(tail) - - case ("--driver-library-path") :: value :: tail => - driverExtraLibraryPath = value - parseOpts(tail) - - case ("--properties-file") :: value :: tail => - propertiesFile = value - parseOpts(tail) - - case ("--supervise") :: tail => - supervise = true - parseOpts(tail) - - case ("--queue") :: value :: tail => - queue = value - parseOpts(tail) - - case ("--files") :: value :: tail => - files = value - parseOpts(tail) - - case ("--archives") :: value :: tail => - archives = value - parseOpts(tail) - - case ("--arg") :: value :: tail => - childArgs += value - parseOpts(tail) - - case ("--jars") :: value :: tail => - jars = value - parseOpts(tail) - - case ("--help" | "-h") :: tail => - printUsageAndExit(0) - - case ("--verbose" | "-v") :: tail => - verbose = true - parseOpts(tail) - - case value :: tail => - if (value.startsWith("-")) { - val errMessage = s"Unrecognized option '$value'." - val suggestion: Option[String] = value match { - case v if v.startsWith("--") && v.contains("=") => - val parts = v.split("=") - Some(s"Perhaps you want '${parts(0)} ${parts(1)}'?") - case _ => - None + case ("--class") :: value :: tail => + mainClass = value + parse(tail) + + case ("--deploy-mode") :: value :: tail => + if (value != "client" && value != "cluster") { + SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"") + } + deployMode = value + parse(tail) + + case ("--num-executors") :: value :: tail => + numExecutors = value + parse(tail) + + case ("--total-executor-cores") :: value :: tail => + totalExecutorCores = value + parse(tail) + + case ("--executor-cores") :: value :: tail => + executorCores = value + parse(tail) + + case ("--executor-memory") :: value :: tail => + executorMemory = value + parse(tail) + + case ("--driver-memory") :: value :: tail => + driverMemory = value + parse(tail) + + case ("--driver-cores") :: value :: tail => + driverCores = value + parse(tail) + + case ("--driver-class-path") :: value :: tail => + driverExtraClassPath = value + parse(tail) + + case ("--driver-java-options") :: value :: tail => + driverExtraJavaOptions = value + parse(tail) + + case ("--driver-library-path") :: value :: tail => + driverExtraLibraryPath = value + parse(tail) + + case ("--properties-file") :: value :: tail => + propertiesFile = value + parse(tail) + + case ("--supervise") :: tail => + supervise = true + parse(tail) + + case ("--queue") :: value :: tail => + queue = value + parse(tail) + + case ("--files") :: value :: tail => + files = value + parse(tail) + + case ("--archives") :: value :: tail => + archives = value + parse(tail) + + case ("--jars") :: value :: tail => + jars = value + parse(tail) + + case ("--help" | "-h") :: tail => + printUsageAndExit(0) + + case ("--verbose" | "-v") :: tail => + verbose = true + parse(tail) + + case value :: tail => + if (inSparkOpts) { + value match { + // convert --foo=bar to --foo bar + case v if v.startsWith("--") && v.contains("=") && v.split("=").size == 2 => + val parts = v.split("=") + parse(Seq(parts(0), parts(1)) ++ tail) + case v if v.startsWith("-") => + val errMessage = s"Unrecognized option '$value'." + SparkSubmit.printErrorAndExit(errMessage) + case v => + primaryResource = v + inSparkOpts = false + parse(tail) + } + } else { + childArgs += value + parse(tail) } - SparkSubmit.printErrorAndExit(errMessage + suggestion.map(" " + _).getOrElse("")) - } - if (primaryResource != null) { - val error = s"Found two conflicting resources, $value and $primaryResource." + - " Expecting only one resource." - SparkSubmit.printErrorAndExit(error) + case Nil => } - primaryResource = value - parseOpts(tail) - - case Nil => } private def printUsageAndExit(exitCode: Int, unknownParam: Any = null) { @@ -277,7 +279,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) { outStream.println("Unknown/unsupported param " + unknownParam) } outStream.println( - """Usage: spark-submit [options] + """Usage: spark-submit [options] [app options] |Options: | --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local. | --deploy-mode DEPLOY_MODE Mode to deploy the app in, either 'client' or 'cluster'. diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 95e71de2d3f1d..c9a52e0366d93 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -19,6 +19,8 @@ package org.apache.spark.storage import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} +import org.apache.spark.annotation.DeveloperApi + /** * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, * or Tachyon, whether to drop the RDD to disk if it falls out of memory or Tachyon , whether to @@ -142,21 +144,37 @@ object StorageLevel { val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(false, false, true, false) - /** Create a new StorageLevel object without setting useOffHeap */ + /** + * :: DeveloperApi :: + * Create a new StorageLevel object without setting useOffHeap + */ + @DeveloperApi def apply(useDisk: Boolean, useMemory: Boolean, useOffHeap: Boolean, deserialized: Boolean, replication: Int) = getCachedStorageLevel( new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication)) - /** Create a new StorageLevel object */ + /** + * :: DeveloperApi :: + * Create a new StorageLevel object + */ + @DeveloperApi def apply(useDisk: Boolean, useMemory: Boolean, deserialized: Boolean, replication: Int = 1) = getCachedStorageLevel( new StorageLevel(useDisk, useMemory, false, deserialized, replication)) - /** Create a new StorageLevel object from its integer representation */ + /** + * :: DeveloperApi :: + * Create a new StorageLevel object from its integer representation + */ + @DeveloperApi def apply(flags: Int, replication: Int): StorageLevel = getCachedStorageLevel(new StorageLevel(flags, replication)) - /** Read StorageLevel object from ObjectInput stream */ + /** + * :: DeveloperApi :: + * Read StorageLevel object from ObjectInput stream + */ + @DeveloperApi def apply(in: ObjectInput): StorageLevel = { val obj = new StorageLevel() obj.readExternal(in) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index cabea5ac03e88..75cf7d38f6c51 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1062,4 +1062,10 @@ private[spark] object Utils extends Logging { def isWindows = Option(System.getProperty("os.name")). map(_.startsWith("Windows")).getOrElse(false) + /** + * Indicates whether Spark is currently running unit tests. + */ + private[spark] def isTesting = { + sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + } } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 657b44668d385..10a65c75cc621 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -28,6 +28,9 @@ import org.scalatest.FunSuite import org.scalatest.matchers.ShouldMatchers class SparkSubmitSuite extends FunSuite with ShouldMatchers { + def beforeAll() { + System.setProperty("spark.testing", "true") + } val noOpOutputStream = new OutputStream { def write(b: Int) = {} @@ -74,33 +77,35 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { testPrematureExit(Array("--help"), "Usage: spark-submit") } - test("prints error with unrecognized option") { + test("prints error with unrecognized options") { testPrematureExit(Array("--blarg"), "Unrecognized option '--blarg'") testPrematureExit(Array("-bleg"), "Unrecognized option '-bleg'") - testPrematureExit(Array("--master=abc"), - "Unrecognized option '--master=abc'. Perhaps you want '--master abc'?") } - test("handles multiple binary definitions") { - val adjacentJars = Array("foo.jar", "bar.jar") - testPrematureExit(adjacentJars, "error: Found two conflicting resources") + test("handle binary specified but not class") { + testPrematureExit(Array("foo.jar"), "Must specify a main class") + } - val nonAdjacentJars = - Array("foo.jar", "--master", "123", "--class", "abc", "bar.jar") - testPrematureExit(nonAdjacentJars, "error: Found two conflicting resources") + test("handles arguments with --key=val") { + val clArgs = Seq("--jars=one.jar,two.jar,three.jar", "--name=myApp") + val appArgs = new SparkSubmitArguments(clArgs) + appArgs.jars should be ("one.jar,two.jar,three.jar") + appArgs.name should be ("myApp") } - test("handle binary specified but not class") { - testPrematureExit(Array("foo.jar"), "Must specify a main class") + test("handles arguments to user program") { + val clArgs = Seq("--name", "myApp", "userjar.jar", "some", "--random", "args", "here") + val appArgs = new SparkSubmitArguments(clArgs) + appArgs.childArgs should be (Seq("some", "--random", "args", "here")) } test("handles YARN cluster mode") { - val clArgs = Array("thejar.jar", "--deploy-mode", "cluster", + val clArgs = Seq("--deploy-mode", "cluster", "--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5", "--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar", - "--arg", "arg1", "--arg", "arg2", "--driver-memory", "4g", - "--queue", "thequeue", "--files", "file1.txt,file2.txt", - "--archives", "archive1.txt,archive2.txt", "--num-executors", "6") + "--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt", + "--archives", "archive1.txt,archive2.txt", "--num-executors", "6", + "thejar.jar", "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) val childArgsStr = childArgs.mkString(" ") @@ -121,12 +126,12 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { } test("handles YARN client mode") { - val clArgs = Array("thejar.jar", "--deploy-mode", "client", + val clArgs = Seq("--deploy-mode", "client", "--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5", "--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar", - "--arg", "arg1", "--arg", "arg2", "--driver-memory", "4g", - "--queue", "thequeue", "--files", "file1.txt,file2.txt", - "--archives", "archive1.txt,archive2.txt", "--num-executors", "6") + "--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt", + "--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "thejar.jar", + "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) childArgs.mkString(" ") should be ("arg1 arg2") @@ -144,9 +149,9 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { } test("handles standalone cluster mode") { - val clArgs = Array("thejar.jar", "--deploy-mode", "cluster", - "--master", "spark://h:p", "--class", "org.SomeClass", "--arg", "arg1", "--arg", "arg2", - "--supervise", "--driver-memory", "4g", "--driver-cores", "5") + val clArgs = Seq("--deploy-mode", "cluster", + "--master", "spark://h:p", "--class", "org.SomeClass", + "--supervise", "--driver-memory", "4g", "--driver-cores", "5", "thejar.jar", "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) val childArgsStr = childArgs.mkString(" ") @@ -158,10 +163,9 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { } test("handles standalone client mode") { - val clArgs = Array("thejar.jar", "--deploy-mode", "client", + val clArgs = Seq("--deploy-mode", "client", "--master", "spark://h:p", "--executor-memory", "5g", "--total-executor-cores", "5", - "--class", "org.SomeClass", "--arg", "arg1", "--arg", "arg2", - "--driver-memory", "4g") + "--class", "org.SomeClass", "--driver-memory", "4g", "thejar.jar", "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) childArgs.mkString(" ") should be ("arg1 arg2") @@ -172,10 +176,9 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { } test("handles mesos client mode") { - val clArgs = Array("thejar.jar", "--deploy-mode", "client", + val clArgs = Seq("--deploy-mode", "client", "--master", "mesos://h:p", "--executor-memory", "5g", "--total-executor-cores", "5", - "--class", "org.SomeClass", "--arg", "arg1", "--arg", "arg2", - "--driver-memory", "4g") + "--class", "org.SomeClass", "--driver-memory", "4g", "thejar.jar", "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) childArgs.mkString(" ") should be ("arg1 arg2") @@ -187,22 +190,24 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { test("launch simple application with spark-submit") { runSparkSubmit( - Seq("unUsed.jar", + Seq( "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"), "--name", "testApp", - "--master", "local")) + "--master", "local", + "unUsed.jar")) } test("spark submit includes jars passed in through --jar") { val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA")) val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB")) val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",") - runSparkSubmit( - Seq("unUsed.jar", - "--class", JarCreationTest.getClass.getName.stripSuffix("$"), - "--name", "testApp", - "--master", "local-cluster[2,1,512]", - "--jars", jarsString)) + val args = Seq( + "--class", JarCreationTest.getClass.getName.stripSuffix("$"), + "--name", "testApp", + "--master", "local-cluster[2,1,512]", + "--jars", jarsString, + "unused.jar") + runSparkSubmit(args) } // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md index dcc063042628c..b011679fede2d 100644 --- a/docs/cluster-overview.md +++ b/docs/cluster-overview.md @@ -73,30 +73,34 @@ the bin directory. This script takes care of setting up the classpath with Spark dependencies, and can support different cluster managers and deploy modes that Spark supports. It's usage is - ./bin/spark-submit --class path.to.your.Class [other options..] + ./bin/spark-submit --class path.to.your.Class [options] [app options] -To enumerate all options available to `spark-submit` run it with the `--help` flag. -Here are a few examples of common options: +When calling `spark-submit`, `[app options]` will be passed along to your application's +main class. To enumerate all options available to `spark-submit` run it with +the `--help` flag. Here are a few examples of common options: {% highlight bash %} # Run application locally -./bin/spark-submit my-app.jar \ +./bin/spark-submit \ --class my.main.ClassName - --master local[8] + --master local[8] \ + my-app.jar # Run on a Spark cluster -./bin/spark-submit my-app.jar \ +./bin/spark-submit \ --class my.main.ClassName --master spark://mycluster:7077 \ --executor-memory 20G \ - --total-executor-cores 100 + --total-executor-cores 100 \ + my-app.jar # Run on a YARN cluster -HADOOP_CONF_DIR=XX /bin/spark-submit my-app.jar \ +HADOOP_CONF_DIR=XX /bin/spark-submit \ --class my.main.ClassName --master yarn-cluster \ # can also be `yarn-client` for client mode --executor-memory 20G \ - --num-executors 50 + --num-executors 50 \ + my-app.jar {% endhighlight %} ### Loading Configurations from a File diff --git a/docs/quick-start.md b/docs/quick-start.md index 68afa6e1bff95..64996b52e0404 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -179,9 +179,10 @@ $ sbt package [info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar # Use spark-submit to run your application -$ YOUR_SPARK_HOME/bin/spark-submit target/scala-2.10/simple-project_2.10-1.0.jar \ +$ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ - --master local[4] + --master local[4] \ + target/scala-2.10/simple-project_2.10-1.0.jar ... Lines with a: 46, Lines with b: 23 {% endhighlight %} @@ -272,9 +273,10 @@ $ mvn package [INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar # Use spark-submit to run your application -$ YOUR_SPARK_HOME/bin/spark-submit target/simple-project-1.0.jar \ +$ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ - --master local[4] + --master local[4] \ + target/simple-project-1.0.jar ... Lines with a: 46, Lines with b: 23 {% endhighlight %}