Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into SPARK-1629
Browse files Browse the repository at this point in the history
  • Loading branch information
witgo committed Apr 30, 2014
2 parents fcaafd7 + ff5be9a commit 31520eb
Show file tree
Hide file tree
Showing 95 changed files with 2,100 additions and 978 deletions.
14 changes: 0 additions & 14 deletions bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,6 @@
<name>Spark Project Bagel</name>
<url>http://spark.apache.org/</url>

<profiles>
<profile>
<!-- SPARK-1121: SPARK-1121: Adds an explicit dependency on Avro to work around
a Hadoop 0.23.X issue -->
<id>yarn-alpha</id>
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
2 changes: 1 addition & 1 deletion bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ FWDIR="$(cd `dirname $0`/..; pwd)"
. $FWDIR/bin/load-spark-env.sh

# Build up classpath
CLASSPATH="$SPARK_CLASSPATH:$FWDIR/conf"
CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:$FWDIR/conf"

ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION"

Expand Down
1 change: 1 addition & 0 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export PYSPARK_PYTHON

# Add the PySpark classes to the Python path:
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
Expand Down
1 change: 1 addition & 0 deletions bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ rem Figure out which Python to use.
if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python

set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.1-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py
Expand Down
9 changes: 5 additions & 4 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,13 @@ case "$1" in
OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM}
;;

# All drivers use SPARK_JAVA_OPTS + SPARK_DRIVER_MEMORY. The repl also uses SPARK_REPL_OPTS.
'org.apache.spark.repl.Main')
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_REPL_OPTS"
# Spark submit uses SPARK_SUBMIT_OPTS and SPARK_JAVA_OPTS
'org.apache.spark.deploy.SparkSubmit')
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_SUBMIT_OPTS \
-Djava.library.path=$SPARK_SUBMIT_LIBRARY_PATH"
OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
;;

*)
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS"
OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
Expand All @@ -98,7 +100,6 @@ fi

# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$_SPARK_LIBRARY_PATH"
JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM"
# Load extra JAVA_OPTS from conf/java-opts, if it exists
if [ -e "$FWDIR/conf/java-opts" ] ; then
Expand Down
11 changes: 5 additions & 6 deletions bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#
# Shell script for starting the Spark Shell REPL

args="$@"
cygwin=false
case "`uname`" in
CYGWIN*) cygwin=true;;
Expand All @@ -46,12 +45,12 @@ function main(){
# "Backspace sends ^H" setting in "Keys" section of the Mintty options
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
export SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Djline.terminal=unix"
$FWDIR/bin/spark-submit spark-internal "$args" --class org.apache.spark.repl.Main
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
$FWDIR/bin/spark-submit spark-internal "$@" --class org.apache.spark.repl.Main
stty icanon echo > /dev/null 2>&1
else
export SPARK_REPL_OPTS
$FWDIR/bin/spark-submit spark-internal "$args" --class org.apache.spark.repl.Main
export SPARK_SUBMIT_OPTS
$FWDIR/bin/spark-submit spark-internal "$@" --class org.apache.spark.repl.Main
fi
}

Expand Down Expand Up @@ -83,7 +82,7 @@ if [[ ! $? ]]; then
saved_stty=""
fi

main
main "$@"

# record the exit status lest it be overwritten:
# then reenable echo and propagate the code.
Expand Down
6 changes: 3 additions & 3 deletions bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ while (($#)); do
elif [ "$1" = "--driver-memory" ]; then
DRIVER_MEMORY=$2
elif [ "$1" = "--driver-library-path" ]; then
export _SPARK_LIBRARY_PATH=$2
export SPARK_SUBMIT_LIBRARY_PATH=$2
elif [ "$1" = "--driver-class-path" ]; then
export SPARK_CLASSPATH="$SPARK_CLASSPATH:$2"
export SPARK_SUBMIT_CLASSPATH=$2
elif [ "$1" = "--driver-java-options" ]; then
export SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $2"
export SPARK_SUBMIT_OPTS=$2
fi
shift
done
Expand Down
64 changes: 42 additions & 22 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,6 @@
<packaging>jar</packaging>
<name>Spark Project Core</name>
<url>http://spark.apache.org/</url>
<!-- SPARK-1121: Adds an explicit dependency on Avro to work around a Hadoop 0.23.X issue -->
<profiles>
<profile>
<id>yarn-alpha</id>
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
Expand Down Expand Up @@ -147,15 +134,6 @@
<groupId>org.json4s</groupId>
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
<version>3.2.6</version>
<!-- see also exclusion for lift-json; this is necessary since it depends on
scala-library and scalap 2.10.0, but we use 2.10.4, and only override
scala-library -->
<exclusions>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scalap</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>colt</groupId>
Expand Down Expand Up @@ -316,6 +294,48 @@
</environmentVariables>
</configuration>
</plugin>
<!-- Unzip py4j so we can include its files in the jar -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<phase>generate-resources</phase>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>unzip</executable>
<workingDirectory>../python</workingDirectory>
<arguments>
<argument>-o</argument>
<argument>lib/py4j*.zip</argument>
<argument>-d</argument>
<argument>build</argument>
</arguments>
</configuration>
</plugin>
</plugins>

<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>../python</directory>
<includes>
<include>pyspark/*.py</include>
</includes>
</resource>
<resource>
<directory>../python/build</directory>
<includes>
<include>py4j/*.py</include>
</includes>
</resource>
</resources>
</build>
</project>
22 changes: 21 additions & 1 deletion core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.api.java

import java.util.{Comparator, List => JList, Iterator => JIterator}
import java.lang.{Iterable => JIterable}
import java.lang.{Iterable => JIterable, Long => JLong}

import scala.collection.JavaConversions._
import scala.reflect.ClassTag
Expand Down Expand Up @@ -264,6 +264,26 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])
}

/**
* Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,
* 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method
* won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
*/
def zipWithUniqueId(): JavaPairRDD[T, JLong] = {
JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, JLong]]
}

/**
* Zips this RDD with its element indices. The ordering is first based on the partition index
* and then the ordering of items within each partition. So the first item in the first
* partition gets index 0, and the last item in the last partition receives the largest index.
* This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.
* This method needs to trigger a spark job when this RDD contains more than one partitions.
*/
def zipWithIndex(): JavaPairRDD[T, JLong] = {
JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, JLong]]
}

// Actions (launch a job to return a value to the user program)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))

// Create and start the worker
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/worker.py"))
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
workerEnv.put("PYTHONPATH", pythonPath)
val worker = pb.start()

// Redirect the worker's stderr to ours
Expand Down Expand Up @@ -154,12 +151,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

try {
// Create and start the daemon
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py"))
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
workerEnv.put("PYTHONPATH", pythonPath)
daemon = pb.start()

// Redirect the stderr to ours
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.net.{URI, URL}
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}

import org.apache.spark.executor.ExecutorURLClassLoader
import org.apache.spark.util.Utils

/**
* Scala code behind the spark-submit script. The script handles setting up the classpath with
Expand Down Expand Up @@ -128,6 +129,18 @@ object SparkSubmit {
childArgs += ("--class", appArgs.mainClass)
}

if (clusterManager == YARN) {
// The choice of class is arbitrary, could use any spark-yarn class
if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) {
val msg = "Could not load YARN classes. This copy of Spark may not have been compiled " +
"with YARN support."
throw new Exception(msg)
}
}

// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"

val options = List[OptionAssigner](
new OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
new OptionAssigner(appArgs.driverExtraClassPath, STANDALONE | YARN, true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
| --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512M).
| --driver-java-options Extra Java options to pass to the driver
| --driver-library-path Extra library path entries to pass to the driver
| --driver-class-path Extra class path entries to pass to the driver
| --driver-class-path Extra class path entries to pass to the driver. Note that
| jars added with --jars are automatically included in the
| classpath.
|
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ object CommandUtils extends Logging {
def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
// Note, this will coalesce multiple options into a single command component
val extraOpts = command.extraJavaOptions.toSeq
val extraOpts = command.extraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq())

val libraryOpts =
if (command.libraryPathEntries.size > 0) {
val joined = command.libraryPathEntries.mkString(File.pathSeparator)
Expand All @@ -62,10 +63,10 @@ object CommandUtils extends Logging {
val classPath = Utils.executeAndGetOutput(
Seq(sparkHome + "/bin/compute-classpath" + ext),
extraEnvironment=command.environment)
val userClassPath = command.classPathEntries.mkString(File.pathSeparator)
val classPathWithUser = classPath + File.pathSeparator + userClassPath
val userClassPath = command.classPathEntries ++ Seq(classPath)

Seq("-cp", classPathWithUser) ++ libraryOpts ++ extraOpts ++ memoryOpts
Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++
libraryOpts ++ extraOpts ++ memoryOpts
}

/** Spawn a thread that will redirect a given stream to a file */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.lang.management.ManagementFactory
import org.apache.spark.util.{IntParam, MemoryParam, Utils}

/**
* Command-line parser for the master.
* Command-line parser for the worker.
*/
private[spark] class WorkerArguments(args: Array[String]) {
var host = Utils.localHostName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ private[spark] class SecurityMessage() extends Logging {
* @return BufferMessage
*/
def toBufferMessage: BufferMessage = {
val startTime = System.currentTimeMillis
val buffers = new ArrayBuffer[ByteBuffer]()

// 4 bytes for the length of the connectionId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ private[spark] class GroupedMeanEvaluator[T](totalOutputs: Int, confidence: Doub
} else if (outputsMerged == 0) {
new HashMap[T, BoundedDouble]
} else {
val p = outputsMerged.toDouble / totalOutputs
val studentTCacher = new StudentTCacher(confidence)
val result = new JHashMap[T, BoundedDouble](sums.size)
val iter = sums.entrySet.iterator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.json4s.jackson.JsonMethods._

import org.apache.spark.{Logging, SparkConf, SparkContext}
Expand Down Expand Up @@ -54,7 +55,7 @@ private[spark] class EventLoggingListener(

private val logger =
new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress,
shouldOverwrite)
shouldOverwrite, Some(LOG_FILE_PERMISSIONS))

/**
* Begin logging events.
Expand Down Expand Up @@ -124,6 +125,9 @@ private[spark] object EventLoggingListener extends Logging {
val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
val LOG_FILE_PERMISSIONS: FsPermission =
FsPermission.createImmutable(Integer.parseInt("770", 8).toShort)


// A cache for compression codecs to avoid creating the same codec many times
private val codecMap = new mutable.HashMap[String, CompressionCodec]
Expand Down
Loading

0 comments on commit 31520eb

Please sign in to comment.