Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into state-cleanup
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
	core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
  • Loading branch information
tdas committed Apr 4, 2014
2 parents 762a4d8 + 33e6361 commit a2cc8bc
Show file tree
Hide file tree
Showing 262 changed files with 7,292 additions and 1,680 deletions.
3 changes: 2 additions & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ if [ -n "$IPYTHON_OPTS" ]; then
IPYTHON=1
fi

if [[ "$IPYTHON" = "1" ]] ; then
# Only use ipython if no command line arguments were provided [SPARK-1134]
if [[ "$IPYTHON" = "1" && $# = 0 ]] ; then
exec ipython $IPYTHON_OPTS
else
exec "$PYSPARK_PYTHON" "$@"
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
<version>3.2.6</version>
<!-- see also exclusion for lift-json; this is necessary since it depends on
scala-library and scalap 2.10.0, but we use 2.10.3, and only override
scala-library and scalap 2.10.0, but we use 2.10.4, and only override
scala-library -->
<exclusions>
<exclusion>
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
*/
object Client {
def main(args: Array[String]) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
println("Use ./bin/spark-submit with \"--master spark://host:port\"")

val conf = new SparkConf()
val driverArgs = new ClientArguments(args)

Expand Down
67 changes: 43 additions & 24 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

import java.io.File
import java.io.{PrintStream, File}
import java.net.URL

import org.apache.spark.executor.ExecutorURLClassLoader
Expand All @@ -32,38 +32,51 @@ import scala.collection.mutable.Map
* modes that Spark supports.
*/
object SparkSubmit {
val YARN = 1
val STANDALONE = 2
val MESOS = 4
val LOCAL = 8
val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
private val YARN = 1
private val STANDALONE = 2
private val MESOS = 4
private val LOCAL = 8
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL

var clusterManager: Int = LOCAL
private var clusterManager: Int = LOCAL

def main(args: Array[String]) {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
printStream.println(appArgs)
}
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
launch(childArgs, classpath, sysProps, mainClass)
launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
}

// Exposed for testing
private[spark] var printStream: PrintStream = System.err
private[spark] var exitFn: () => Unit = () => System.exit(-1)

private[spark] def printErrorAndExit(str: String) = {
printStream.println("error: " + str)
printStream.println("run with --help for more information or --verbose for debugging output")
exitFn()
}
private[spark] def printWarning(str: String) = printStream.println("warning: " + str)

/**
* @return
* a tuple containing the arguments for the child, a list of classpath
* entries for the child, and the main class for the child
*/
def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String],
private[spark] def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String],
ArrayBuffer[String], Map[String, String], String) = {
if (appArgs.master.startsWith("yarn")) {
if (appArgs.master.startsWith("local")) {
clusterManager = LOCAL
} else if (appArgs.master.startsWith("yarn")) {
clusterManager = YARN
} else if (appArgs.master.startsWith("spark")) {
clusterManager = STANDALONE
} else if (appArgs.master.startsWith("mesos")) {
clusterManager = MESOS
} else if (appArgs.master.startsWith("local")) {
clusterManager = LOCAL
} else {
System.err.println("master must start with yarn, mesos, spark, or local")
System.exit(1)
printErrorAndExit("master must start with yarn, mesos, spark, or local")
}

// Because "yarn-standalone" and "yarn-client" encapsulate both the master
Expand All @@ -73,12 +86,10 @@ object SparkSubmit {
appArgs.deployMode = "cluster"
}
if (appArgs.deployMode == "cluster" && appArgs.master == "yarn-client") {
System.err.println("Deploy mode \"cluster\" and master \"yarn-client\" are at odds")
System.exit(1)
printErrorAndExit("Deploy mode \"cluster\" and master \"yarn-client\" are not compatible")
}
if (appArgs.deployMode == "client" && appArgs.master == "yarn-standalone") {
System.err.println("Deploy mode \"client\" and master \"yarn-standalone\" are at odds")
System.exit(1)
printErrorAndExit("Deploy mode \"client\" and master \"yarn-standalone\" are not compatible")
}
if (appArgs.deployMode == "cluster" && appArgs.master.startsWith("yarn")) {
appArgs.master = "yarn-standalone"
Expand All @@ -95,8 +106,7 @@ object SparkSubmit {
var childMainClass = ""

if (clusterManager == MESOS && deployOnCluster) {
System.err.println("Mesos does not support running the driver on the cluster")
System.exit(1)
printErrorAndExit("Mesos does not support running the driver on the cluster")
}

if (!deployOnCluster) {
Expand Down Expand Up @@ -174,8 +184,17 @@ object SparkSubmit {
(childArgs, childClasspath, sysProps, childMainClass)
}

def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
sysProps: Map[String, String], childMainClass: String) {
private def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false) {

if (verbose) {
System.err.println(s"Main class:\n$childMainClass")
System.err.println(s"Arguments:\n${childArgs.mkString("\n")}")
System.err.println(s"System properties:\n${sysProps.mkString("\n")}")
System.err.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
System.err.println("\n")
}

val loader = new ExecutorURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
Thread.currentThread.setContextClassLoader(loader)
Expand All @@ -193,10 +212,10 @@ object SparkSubmit {
mainMethod.invoke(null, childArgs.toArray)
}

def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
val localJarFile = new File(localJar)
if (!localJarFile.exists()) {
System.err.println("Jar does not exist: " + localJar + ". Skipping.")
printWarning(s"Jar $localJar does not exist, skipping.")
}

val url = localJarFile.getAbsoluteFile.toURI.toURL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,45 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
var name: String = null
var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
var jars: String = null
var verbose: Boolean = false

loadEnvVars()
parseArgs(args.toList)

def loadEnvVars() {
master = System.getenv("MASTER")
deployMode = System.getenv("DEPLOY_MODE")
parseOpts(args.toList)

// Sanity checks
if (args.length == 0) printUsageAndExit(-1)
if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource")
if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class")

override def toString = {
s"""Parsed arguments:
| master $master
| deployMode $deployMode
| executorMemory $executorMemory
| executorCores $executorCores
| totalExecutorCores $totalExecutorCores
| driverMemory $driverMemory
| drivercores $driverCores
| supervise $supervise
| queue $queue
| numExecutors $numExecutors
| files $files
| archives $archives
| mainClass $mainClass
| primaryResource $primaryResource
| name $name
| childArgs [${childArgs.mkString(" ")}]
| jars $jars
| verbose $verbose
""".stripMargin
}

def parseArgs(args: List[String]) {
if (args.size == 0) {
printUsageAndExit(1)
System.exit(1)
}
primaryResource = args(0)
parseOpts(args.tail)
private def loadEnvVars() {
Option(System.getenv("MASTER")).map(master = _)
Option(System.getenv("DEPLOY_MODE")).map(deployMode = _)
}

def parseOpts(opts: List[String]): Unit = opts match {
private def parseOpts(opts: List[String]): Unit = opts match {
case ("--name") :: value :: tail =>
name = value
parseOpts(tail)
Expand All @@ -73,8 +93,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {

case ("--deploy-mode") :: value :: tail =>
if (value != "client" && value != "cluster") {
System.err.println("--deploy-mode must be either \"client\" or \"cluster\"")
System.exit(1)
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
}
deployMode = value
parseOpts(tail)
Expand Down Expand Up @@ -130,17 +149,28 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
case ("--help" | "-h") :: tail =>
printUsageAndExit(0)

case Nil =>
case ("--verbose" | "-v") :: tail =>
verbose = true
parseOpts(tail)

case _ =>
printUsageAndExit(1, opts)
case value :: tail =>
if (primaryResource != null) {
val error = s"Found two conflicting resources, $value and $primaryResource." +
" Expecting only one resource."
SparkSubmit.printErrorAndExit(error)
}
primaryResource = value
parseOpts(tail)

case Nil =>
}

def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
private def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
val outStream = SparkSubmit.printStream
if (unknownParam != null) {
System.err.println("Unknown/unsupported param " + unknownParam)
outStream.println("Unknown/unsupported param " + unknownParam)
}
System.err.println(
outStream.println(
"""Usage: spark-submit <primary binary> [options]
|Options:
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
Expand Down Expand Up @@ -171,6 +201,6 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
| --archives ARCHIVES Comma separated list of archives to be extracted into the
| working dir of each executor.""".stripMargin
)
System.exit(exitCode)
SparkSubmit.exitFn()
}
}
21 changes: 10 additions & 11 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class DAGScheduler(
private[scheduler] val stageIdToJobIds = new HashMap[Int, HashSet[Int]]
private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
private[scheduler] val shuffleToMapStage = new HashMap[Int, Stage]
private[scheduler] val stageIdToActiveJob = new HashMap[Int, ActiveJob]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
private[scheduler] val resultStageToJob = new HashMap[Stage, ActiveJob]
private[spark] val stageToInfos = new HashMap[Stage, StageInfo]

Expand Down Expand Up @@ -536,7 +536,7 @@ class DAGScheduler(
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
runLocally(job)
} else {
stageIdToActiveJob(jobId) = job
jobIdToActiveJob(jobId) = job
activeJobs += job
resultStageToJob(finalStage) = job
listenerBus.post(
Expand All @@ -559,7 +559,7 @@ class DAGScheduler(
// Cancel all running jobs.
runningStages.map(_.jobId).foreach(handleJobCancellation)
activeJobs.clear() // These should already be empty by this point,
stageIdToActiveJob.clear() // but just in case we lost track of some jobs...
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...

case ExecutorAdded(execId, host) =>
handleExecutorAdded(execId, host)
Expand All @@ -569,7 +569,6 @@ class DAGScheduler(

case BeginEvent(task, taskInfo) =>
for (
job <- stageIdToActiveJob.get(task.stageId);
stage <- stageIdToStage.get(task.stageId);
stageInfo <- stageToInfos.get(stage)
) {
Expand Down Expand Up @@ -697,7 +696,7 @@ class DAGScheduler(
private def activeJobForStage(stage: Stage): Option[Int] = {
if (stageIdToJobIds.contains(stage.id)) {
val jobsThatUseStage: Array[Int] = stageIdToJobIds(stage.id).toArray.sorted
jobsThatUseStage.find(stageIdToActiveJob.contains)
jobsThatUseStage.find(jobIdToActiveJob.contains)
} else {
None
}
Expand Down Expand Up @@ -750,8 +749,8 @@ class DAGScheduler(
}
}

val properties = if (stageIdToActiveJob.contains(jobId)) {
stageIdToActiveJob(stage.jobId).properties
val properties = if (jobIdToActiveJob.contains(jobId)) {
jobIdToActiveJob(stage.jobId).properties
} else {
// this stage will be assigned to "default" pool
null
Expand Down Expand Up @@ -827,7 +826,7 @@ class DAGScheduler(
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
stageIdToActiveJob -= stage.jobId
jobIdToActiveJob -= stage.jobId
activeJobs -= job
resultStageToJob -= stage
markStageAsFinished(stage)
Expand Down Expand Up @@ -986,11 +985,11 @@ class DAGScheduler(
val independentStages = removeJobAndIndependentStages(jobId)
independentStages.foreach(taskScheduler.cancelTasks)
val error = new SparkException("Job %d cancelled".format(jobId))
val job = stageIdToActiveJob(jobId)
val job = jobIdToActiveJob(jobId)
job.listener.jobFailed(error)
jobIdToStageIds -= jobId
activeJobs -= job
stageIdToActiveJob -= jobId
jobIdToActiveJob -= jobId
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, job.finalStage.id)))
}
}
Expand All @@ -1011,7 +1010,7 @@ class DAGScheduler(
val error = new SparkException("Job aborted: " + reason)
job.listener.jobFailed(error)
jobIdToStageIdsRemove(job.jobId)
stageIdToActiveJob -= resultStage.jobId
jobIdToActiveJob -= resultStage.jobId
activeJobs -= job
resultStageToJob -= resultStage
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, failedStage.id)))
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
package org.apache.spark.ui

import java.net.{InetSocketAddress, URL}
import javax.servlet.DispatcherType
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}

import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}
import scala.xml.Node

import org.eclipse.jetty.server.{DispatcherType, Server}
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.handler._
import org.eclipse.jetty.servlet._
import org.eclipse.jetty.util.thread.QueuedThreadPool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private[ui] class BlockManagerListener(storageStatusListener: StorageStatusListe

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized {
val rddInfo = stageSubmitted.stageInfo.rddInfo
_rddInfoMap(rddInfo.id) = rddInfo
_rddInfoMap.getOrElseUpdate(rddInfo.id, rddInfo)
}

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
Expand Down
Loading

0 comments on commit a2cc8bc

Please sign in to comment.