Skip to content

Commit

Permalink
Merge github.com:apache/spark
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Apr 7, 2014
2 parents e2f4ff9 + 1440154 commit bc46fc8
Show file tree
Hide file tree
Showing 89 changed files with 1,685 additions and 388 deletions.
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ work
.*\.q
golden
test.out/*
.*iml
12 changes: 11 additions & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>hive</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-ganglia-lgpl</id>
<dependencies>
Expand Down Expand Up @@ -208,7 +218,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>buildnumber-maven-plugin</artifactId>
<version>1.1</version>
<version>1.2</version>
<executions>
<execution>
<phase>validate</phase>
Expand Down
35 changes: 19 additions & 16 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,7 @@ FWDIR="$(cd `dirname $0`/..; pwd)"
# Build up classpath
CLASSPATH="$SPARK_CLASSPATH:$FWDIR/conf"

# Support for interacting with Hive. Since hive pulls in a lot of dependencies that might break
# existing Spark applications, it is not included in the standard spark assembly. Instead, we only
# include it in the classpath if the user has explicitly requested it by running "sbt hive/assembly"
# Hopefully we will find a way to avoid uber-jars entirely and deploy only the needed packages in
# the future.
if [ -f "$FWDIR"/sql/hive/target/scala-$SCALA_VERSION/spark-hive-assembly-*.jar ]; then

# Datanucleus jars do not work if only included in the uberjar as plugin.xml metadata is lost.
DATANUCLEUSJARS=$(JARS=("$FWDIR/lib_managed/jars"/datanucleus-*.jar); IFS=:; echo "${JARS[*]}")
CLASSPATH=$CLASSPATH:$DATANUCLEUSJARS

ASSEMBLY_DIR="$FWDIR/sql/hive/target/scala-$SCALA_VERSION/"
else
ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION/"
fi
ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION"

# First check if we have a dependencies jar. If so, include binary classes with the deps jar
if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then
Expand All @@ -59,7 +45,7 @@ if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then
CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/classes"

DEPS_ASSEMBLY_JAR=`ls "$ASSEMBLY_DIR"/spark*-assembly*hadoop*-deps.jar`
DEPS_ASSEMBLY_JAR=`ls "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar`
CLASSPATH="$CLASSPATH:$DEPS_ASSEMBLY_JAR"
else
# Else use spark-assembly jar from either RELEASE or assembly directory
Expand All @@ -71,6 +57,23 @@ else
CLASSPATH="$CLASSPATH:$ASSEMBLY_JAR"
fi

# When Hive support is needed, Datanucleus jars must be included on the classpath.
# Datanucleus jars do not work if only included in the uber jar as plugin.xml metadata is lost.
# Both sbt and maven will populate "lib_managed/jars/" with the datanucleus jars when Spark is
# built with Hive, so first check if the datanucleus jars exist, and then ensure the current Spark
# assembly is built for Hive, before actually populating the CLASSPATH with the jars.
# Note that this check order is faster (by up to half a second) in the case where Hive is not used.
num_datanucleus_jars=$(ls "$FWDIR"/lib_managed/jars/ | grep "datanucleus-.*\\.jar" | wc -l)
if [ $num_datanucleus_jars -gt 0 ]; then
AN_ASSEMBLY_JAR=${ASSEMBLY_JAR:-$DEPS_ASSEMBLY_JAR}
num_hive_files=$(jar tvf "$AN_ASSEMBLY_JAR" org/apache/hadoop/hive/ql/exec 2>/dev/null | wc -l)
if [ $num_hive_files -gt 0 ]; then
echo "Spark assembly has been built with Hive, including Datanucleus jars on classpath" 1>&2
DATANUCLEUSJARS=$(echo "$FWDIR/lib_managed/jars"/datanucleus-*.jar | tr " " :)
CLASSPATH=$CLASSPATH:$DATANUCLEUSJARS
fi
fi

# Add test classes if we're running from SBT or Maven with SPARK_TESTING set to 1
if [[ $SPARK_TESTING == 1 ]]; then
CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/test-classes"
Expand Down
2 changes: 0 additions & 2 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -154,5 +154,3 @@ if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
fi

exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"


49 changes: 47 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,10 @@
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill_${scala.binary.version}</artifactId>
<version>0.3.1</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill-java</artifactId>
<version>0.3.1</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
Expand Down Expand Up @@ -200,6 +198,53 @@
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon</artifactId>
<version>0.4.1-thrift</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-jsp</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
</exclusion>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
<exclusion>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
</exclusion>
<exclusion>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
Expand Down
46 changes: 33 additions & 13 deletions core/src/main/java/org/apache/spark/api/java/StorageLevels.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,18 @@
* Expose some commonly useful storage level constants.
*/
public class StorageLevels {
public static final StorageLevel NONE = create(false, false, false, 1);
public static final StorageLevel DISK_ONLY = create(true, false, false, 1);
public static final StorageLevel DISK_ONLY_2 = create(true, false, false, 2);
public static final StorageLevel MEMORY_ONLY = create(false, true, true, 1);
public static final StorageLevel MEMORY_ONLY_2 = create(false, true, true, 2);
public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, 1);
public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, 2);
public static final StorageLevel MEMORY_AND_DISK = create(true, true, true, 1);
public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, true, 2);
public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, 1);
public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, 2);
public static final StorageLevel NONE = create(false, false, false, false, 1);
public static final StorageLevel DISK_ONLY = create(true, false, false, false, 1);
public static final StorageLevel DISK_ONLY_2 = create(true, false, false, false, 2);
public static final StorageLevel MEMORY_ONLY = create(false, true, false, true, 1);
public static final StorageLevel MEMORY_ONLY_2 = create(false, true, false, true, 2);
public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, false, 1);
public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, false, 2);
public static final StorageLevel MEMORY_AND_DISK = create(true, true, false, true, 1);
public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, false, true, 2);
public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, false, 1);
public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, false, 2);
public static final StorageLevel OFF_HEAP = create(false, false, true, false, 1);

/**
* Create a new StorageLevel object.
Expand All @@ -42,7 +43,26 @@ public class StorageLevels {
* @param deserialized saved as deserialized objects, if true
* @param replication replication factor
*/
public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized, int replication) {
return StorageLevel.apply(useDisk, useMemory, deserialized, replication);
@Deprecated
public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized,
int replication) {
return StorageLevel.apply(useDisk, useMemory, false, deserialized, replication);
}

/**
* Create a new StorageLevel object.
* @param useDisk saved to disk, if true
* @param useMemory saved to memory, if true
* @param useOffHeap saved to Tachyon, if true
* @param deserialized saved as deserialized objects, if true
* @param replication replication factor
*/
public static StorageLevel create(
boolean useDisk,
boolean useMemory,
boolean useOffHeap,
boolean deserialized,
int replication) {
return StorageLevel.apply(useDisk, useMemory, useOffHeap, deserialized, replication);
}
}
16 changes: 8 additions & 8 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package org.apache.spark

import java.io._
import java.net.URI
import java.util.{Properties, UUID}
import java.util.concurrent.atomic.AtomicInteger

import java.util.{Properties, UUID}
import java.util.UUID.randomUUID
import scala.collection.{Map, Set}
import scala.collection.generic.Growable
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.reflect.{ClassTag, classTag}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
Expand Down Expand Up @@ -130,6 +129,11 @@ class SparkContext(
val master = conf.get("spark.master")
val appName = conf.get("spark.app.name")

// Generate the random name for a temp folder in Tachyon
// Add a timestamp as the suffix here to make it more safe
val tachyonFolderName = "spark-" + randomUUID.toString()
conf.set("spark.tachyonStore.folderName", tachyonFolderName)

val isLocal = (master == "local" || master.startsWith("local["))

if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
Expand Down Expand Up @@ -393,7 +397,7 @@ class SparkContext(
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @note Small files are perferred, large file is also allowable, but may cause bad performance.
* @note Small files are preferred, as each file will be loaded fully in memory.
*/
def wholeTextFiles(path: String): RDD[(String, String)] = {
newAPIHadoopFile(
Expand Down Expand Up @@ -725,10 +729,6 @@ class SparkContext(
*/
def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap

def getStageInfo: Map[Stage, StageInfo] = {
dagScheduler.stageToInfos
}

/**
* Return information about blocks stored in all of the slaves
*/
Expand Down
14 changes: 13 additions & 1 deletion core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.api.java

import java.util.{Comparator, List => JList}
import java.util.{Comparator, Iterator => JIterator, List => JList}
import java.lang.{Iterable => JIterable}

import scala.collection.JavaConversions._
import scala.reflect.ClassTag
Expand Down Expand Up @@ -280,6 +281,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
new java.util.ArrayList(arr)
}

/**
* Return an iterator that contains all of the elements in this RDD.
*
* The iterator will consume as much memory as the largest partition in this RDD.
*/
def toLocalIterator(): JIterator[T] = {
import scala.collection.JavaConversions._
rdd.toLocalIterator
}


/**
* Return an array that contains all of the elements in this RDD.
* @deprecated As of Spark 1.0.0, toArray() is deprecated, use {@link #collect()} instead
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @note Small files are perferred, large file is also allowable, but may cause bad performance.
* @note Small files are preferred, as each file will be loaded fully in memory.
*/
def wholeTextFiles(path: String): JavaPairRDD[String, String] =
new JavaPairRDD(sc.wholeTextFiles(path))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.api.python

import java.io._
import java.net._
import java.nio.charset.Charset
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -206,6 +207,7 @@ private object SpecialLengths {
}

private[spark] object PythonRDD {
val UTF8 = Charset.forName("UTF-8")

def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int):
JavaRDD[Array[Byte]] = {
Expand Down Expand Up @@ -266,7 +268,7 @@ private[spark] object PythonRDD {
}

def writeUTF(str: String, dataOut: DataOutputStream) {
val bytes = str.getBytes("UTF-8")
val bytes = str.getBytes(UTF8)
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}
Expand All @@ -286,7 +288,7 @@ private[spark] object PythonRDD {

private
class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] {
override def call(arr: Array[Byte]) : String = new String(arr, "UTF-8")
override def call(arr: Array[Byte]) : String = new String(arr, PythonRDD.UTF8)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ private[deploy] object DeployMessages {

case class KillDriver(driverId: String) extends DeployMessage

// Worker internal

case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders

// AppClient to Master

case class RegisterApplication(appDescription: ApplicationDescription)
Expand Down
13 changes: 8 additions & 5 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,23 @@ object SparkSubmit {
printErrorAndExit("master must start with yarn, mesos, spark, or local")
}

// Because "yarn-standalone" and "yarn-client" encapsulate both the master
// Because "yarn-cluster" and "yarn-client" encapsulate both the master
// and deploy mode, we have some logic to infer the master and deploy mode
// from each other if only one is specified, or exit early if they are at odds.
if (appArgs.deployMode == null && appArgs.master == "yarn-standalone") {
if (appArgs.deployMode == null &&
(appArgs.master == "yarn-standalone" || appArgs.master == "yarn-cluster")) {
appArgs.deployMode = "cluster"
}
if (appArgs.deployMode == "cluster" && appArgs.master == "yarn-client") {
printErrorAndExit("Deploy mode \"cluster\" and master \"yarn-client\" are not compatible")
}
if (appArgs.deployMode == "client" && appArgs.master == "yarn-standalone") {
printErrorAndExit("Deploy mode \"client\" and master \"yarn-standalone\" are not compatible")
if (appArgs.deployMode == "client" &&
(appArgs.master == "yarn-standalone" || appArgs.master == "yarn-cluster")) {
printErrorAndExit("Deploy mode \"client\" and master \"" + appArgs.master
+ "\" are not compatible")
}
if (appArgs.deployMode == "cluster" && appArgs.master.startsWith("yarn")) {
appArgs.master = "yarn-standalone"
appArgs.master = "yarn-cluster"
}
if (appArgs.deployMode != "cluster" && appArgs.master.startsWith("yarn")) {
appArgs.master = "yarn-client"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
outStream.println("Unknown/unsupported param " + unknownParam)
}
outStream.println(
"""Usage: spark-submit <primary binary> [options]
"""Usage: spark-submit <app jar> [options]
|Options:
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
| --deploy-mode DEPLOY_MODE Mode to deploy the app in, either 'client' or 'cluster'.
Expand Down
Loading

0 comments on commit bc46fc8

Please sign in to comment.