Skip to content

Commit

Permalink
merge mastet
Browse files Browse the repository at this point in the history
  • Loading branch information
witgo committed Apr 27, 2014
2 parents 49e248e + aa9a7f5 commit fcaafd7
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ object SparkSubmit {
if (clusterManager == STANDALONE) {
val existingJars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
sysProps.put("spark.jars", (existingJars ++ Seq(appArgs.primaryResource)).mkString(","))
println("SPARK JARS" + sysProps.get("spark.jars"))
}

if (deployOnCluster && clusterManager == STANDALONE) {
Expand Down
226 changes: 114 additions & 112 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import java.io.{File, FileInputStream, IOException}
import java.util.Properties

import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, ArrayBuffer}
import scala.collection.mutable.{ArrayBuffer, HashMap}

import org.apache.spark.SparkException
import org.apache.spark.util.Utils

/**
* Parses and encapsulates arguments from the spark-submit script.
*/
private[spark] class SparkSubmitArguments(args: Array[String]) {
private[spark] class SparkSubmitArguments(args: Seq[String]) {
var master: String = null
var deployMode: String = null
var executorMemory: String = null
Expand Down Expand Up @@ -118,8 +119,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {

if (master.startsWith("yarn")) {
val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR")
val testing = sys.env.contains("SPARK_TESTING")
if (!hasHadoopEnv && !testing) {
if (!hasHadoopEnv && !Utils.isTesting) {
throw new Exception(s"When running with master '$master' " +
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
}
Expand Down Expand Up @@ -156,119 +156,121 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
""".stripMargin
}

private def parseOpts(opts: List[String]): Unit = opts match {
case ("--name") :: value :: tail =>
name = value
parseOpts(tail)
/** Fill in values by parsing user options. */
private def parseOpts(opts: Seq[String]): Unit = {
// Delineates parsing of Spark options from parsing of user options.
var inSparkOpts = true
parse(opts)

case ("--master") :: value :: tail =>
master = value
parseOpts(tail)
def parse(opts: Seq[String]): Unit = opts match {
case ("--name") :: value :: tail =>
name = value
parse(tail)

case ("--class") :: value :: tail =>
mainClass = value
parseOpts(tail)
case ("--master") :: value :: tail =>
master = value
parse(tail)

case ("--deploy-mode") :: value :: tail =>
if (value != "client" && value != "cluster") {
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
}
deployMode = value
parseOpts(tail)

case ("--num-executors") :: value :: tail =>
numExecutors = value
parseOpts(tail)

case ("--total-executor-cores") :: value :: tail =>
totalExecutorCores = value
parseOpts(tail)

case ("--executor-cores") :: value :: tail =>
executorCores = value
parseOpts(tail)

case ("--executor-memory") :: value :: tail =>
executorMemory = value
parseOpts(tail)

case ("--driver-memory") :: value :: tail =>
driverMemory = value
parseOpts(tail)

case ("--driver-cores") :: value :: tail =>
driverCores = value
parseOpts(tail)

case ("--driver-class-path") :: value :: tail =>
driverExtraClassPath = value
parseOpts(tail)

case ("--driver-java-options") :: value :: tail =>
driverExtraJavaOptions = value
parseOpts(tail)

case ("--driver-library-path") :: value :: tail =>
driverExtraLibraryPath = value
parseOpts(tail)

case ("--properties-file") :: value :: tail =>
propertiesFile = value
parseOpts(tail)

case ("--supervise") :: tail =>
supervise = true
parseOpts(tail)

case ("--queue") :: value :: tail =>
queue = value
parseOpts(tail)

case ("--files") :: value :: tail =>
files = value
parseOpts(tail)

case ("--archives") :: value :: tail =>
archives = value
parseOpts(tail)

case ("--arg") :: value :: tail =>
childArgs += value
parseOpts(tail)

case ("--jars") :: value :: tail =>
jars = value
parseOpts(tail)

case ("--help" | "-h") :: tail =>
printUsageAndExit(0)

case ("--verbose" | "-v") :: tail =>
verbose = true
parseOpts(tail)

case value :: tail =>
if (value.startsWith("-")) {
val errMessage = s"Unrecognized option '$value'."
val suggestion: Option[String] = value match {
case v if v.startsWith("--") && v.contains("=") =>
val parts = v.split("=")
Some(s"Perhaps you want '${parts(0)} ${parts(1)}'?")
case _ =>
None
case ("--class") :: value :: tail =>
mainClass = value
parse(tail)

case ("--deploy-mode") :: value :: tail =>
if (value != "client" && value != "cluster") {
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
}
deployMode = value
parse(tail)

case ("--num-executors") :: value :: tail =>
numExecutors = value
parse(tail)

case ("--total-executor-cores") :: value :: tail =>
totalExecutorCores = value
parse(tail)

case ("--executor-cores") :: value :: tail =>
executorCores = value
parse(tail)

case ("--executor-memory") :: value :: tail =>
executorMemory = value
parse(tail)

case ("--driver-memory") :: value :: tail =>
driverMemory = value
parse(tail)

case ("--driver-cores") :: value :: tail =>
driverCores = value
parse(tail)

case ("--driver-class-path") :: value :: tail =>
driverExtraClassPath = value
parse(tail)

case ("--driver-java-options") :: value :: tail =>
driverExtraJavaOptions = value
parse(tail)

case ("--driver-library-path") :: value :: tail =>
driverExtraLibraryPath = value
parse(tail)

case ("--properties-file") :: value :: tail =>
propertiesFile = value
parse(tail)

case ("--supervise") :: tail =>
supervise = true
parse(tail)

case ("--queue") :: value :: tail =>
queue = value
parse(tail)

case ("--files") :: value :: tail =>
files = value
parse(tail)

case ("--archives") :: value :: tail =>
archives = value
parse(tail)

case ("--jars") :: value :: tail =>
jars = value
parse(tail)

case ("--help" | "-h") :: tail =>
printUsageAndExit(0)

case ("--verbose" | "-v") :: tail =>
verbose = true
parse(tail)

case value :: tail =>
if (inSparkOpts) {
value match {
// convert --foo=bar to --foo bar
case v if v.startsWith("--") && v.contains("=") && v.split("=").size == 2 =>
val parts = v.split("=")
parse(Seq(parts(0), parts(1)) ++ tail)
case v if v.startsWith("-") =>
val errMessage = s"Unrecognized option '$value'."
SparkSubmit.printErrorAndExit(errMessage)
case v =>
primaryResource = v
inSparkOpts = false
parse(tail)
}
} else {
childArgs += value
parse(tail)
}
SparkSubmit.printErrorAndExit(errMessage + suggestion.map(" " + _).getOrElse(""))
}

if (primaryResource != null) {
val error = s"Found two conflicting resources, $value and $primaryResource." +
" Expecting only one resource."
SparkSubmit.printErrorAndExit(error)
case Nil =>
}
primaryResource = value
parseOpts(tail)

case Nil =>
}

private def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
Expand All @@ -277,7 +279,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
outStream.println("Unknown/unsupported param " + unknownParam)
}
outStream.println(
"""Usage: spark-submit <app jar> [options]
"""Usage: spark-submit [options] <app jar> [app options]
|Options:
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
| --deploy-mode DEPLOY_MODE Mode to deploy the app in, either 'client' or 'cluster'.
Expand Down
26 changes: 22 additions & 4 deletions core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.storage

import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}

import org.apache.spark.annotation.DeveloperApi

/**
* Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
* or Tachyon, whether to drop the RDD to disk if it falls out of memory or Tachyon , whether to
Expand Down Expand Up @@ -142,21 +144,37 @@ object StorageLevel {
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)

/** Create a new StorageLevel object without setting useOffHeap */
/**
* :: DeveloperApi ::
* Create a new StorageLevel object without setting useOffHeap
*/
@DeveloperApi
def apply(useDisk: Boolean, useMemory: Boolean, useOffHeap: Boolean,
deserialized: Boolean, replication: Int) = getCachedStorageLevel(
new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication))

/** Create a new StorageLevel object */
/**
* :: DeveloperApi ::
* Create a new StorageLevel object
*/
@DeveloperApi
def apply(useDisk: Boolean, useMemory: Boolean,
deserialized: Boolean, replication: Int = 1) = getCachedStorageLevel(
new StorageLevel(useDisk, useMemory, false, deserialized, replication))

/** Create a new StorageLevel object from its integer representation */
/**
* :: DeveloperApi ::
* Create a new StorageLevel object from its integer representation
*/
@DeveloperApi
def apply(flags: Int, replication: Int): StorageLevel =
getCachedStorageLevel(new StorageLevel(flags, replication))

/** Read StorageLevel object from ObjectInput stream */
/**
* :: DeveloperApi ::
* Read StorageLevel object from ObjectInput stream
*/
@DeveloperApi
def apply(in: ObjectInput): StorageLevel = {
val obj = new StorageLevel()
obj.readExternal(in)
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1062,4 +1062,10 @@ private[spark] object Utils extends Logging {
def isWindows = Option(System.getProperty("os.name")).
map(_.startsWith("Windows")).getOrElse(false)

/**
* Indicates whether Spark is currently running unit tests.
*/
private[spark] def isTesting = {
sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
}
}
Loading

0 comments on commit fcaafd7

Please sign in to comment.