Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
	project/SparkBuild.scala
  • Loading branch information
MLnick committed Dec 15, 2013
2 parents c304cc8 + 7db9165 commit 4e7c9e3
Show file tree
Hide file tree
Showing 224 changed files with 3,985 additions and 3,038 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ derby.log
dist/
spark-*-bin.tar.gz
unit-tests.log
lib/
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ This README file only contains basic setup instructions.

## Building

Spark requires Scala 2.9.3 (Scala 2.10 is not yet supported). The project is
built using Simple Build Tool (SBT), which is packaged with it. To build
Spark and its example programs, run:
Spark requires Scala 2.10. The project is built using Simple Build Tool (SBT),
which is packaged with it. To build Spark and its example programs, run:

sbt/sbt assembly

Expand Down Expand Up @@ -55,7 +54,7 @@ versions without YARN, use:
# Cloudera CDH 4.2.0 with MapReduce v1
$ SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.2.0 sbt/sbt assembly

For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
For Apache Hadoop 2.0.X, 2.1.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
with YARN, also set `SPARK_YARN=true`:

# Apache Hadoop 2.0.5-alpha
Expand All @@ -64,8 +63,10 @@ with YARN, also set `SPARK_YARN=true`:
# Cloudera CDH 4.2.0 with MapReduce v2
$ SPARK_HADOOP_VERSION=2.0.0-cdh4.2.0 SPARK_YARN=true sbt/sbt assembly

For convenience, these variables may also be set through the `conf/spark-env.sh` file
described below.
When building for Hadoop 2.2.X and newer, you'll need to include the additional `new-yarn` profile:

# Apache Hadoop 2.2.X and newer
$ mvn -Dyarn.version=2.2.0 -Dhadoop.version=2.2.0 -Pnew-yarn

When developing a Spark application, specify the Hadoop version by adding the
"hadoop-client" artifact to your project's dependencies. For example, if you're
Expand Down
16 changes: 8 additions & 8 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
</parent>

<groupId>org.apache.spark</groupId>
<artifactId>spark-assembly_2.9.3</artifactId>
<artifactId>spark-assembly_2.10</artifactId>
<name>Spark Project Assembly</name>
<url>http://spark.incubator.apache.org/</url>

Expand All @@ -41,27 +41,27 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.9.3</artifactId>
<artifactId>spark-core_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-bagel_2.9.3</artifactId>
<artifactId>spark-bagel_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.9.3</artifactId>
<artifactId>spark-mllib_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl_2.9.3</artifactId>
<artifactId>spark-repl_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.9.3</artifactId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
Expand All @@ -79,7 +79,7 @@
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<outputFile>${project.build.directory}/scala-${scala.version}/${project.artifactId}-${project.version}-hadoop${hadoop.version}.jar</outputFile>
<outputFile>${project.build.directory}/scala-2.10/${project.artifactId}-${project.version}-hadoop${hadoop.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
Expand Down Expand Up @@ -128,7 +128,7 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.9.3</artifactId>
<artifactId>spark-yarn_2.10</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
Expand Down
12 changes: 6 additions & 6 deletions bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@
</parent>

<groupId>org.apache.spark</groupId>
<artifactId>spark-bagel_2.9.3</artifactId>
<artifactId>spark-bagel_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project Bagel</name>
<url>http://spark.incubator.apache.org/</url>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.9.3</artifactId>
<artifactId>spark-core_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
Expand All @@ -43,18 +43,18 @@
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.9.3</artifactId>
<artifactId>scalatest_2.10</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_2.9.3</artifactId>
<artifactId>scalacheck_2.10</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
<outputDirectory>target/scala-2.10/classes</outputDirectory>
<testOutputDirectory>target/scala-2.10/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.scalatest</groupId>
Expand Down
2 changes: 1 addition & 1 deletion bin/compute-classpath.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ rem
rem This script computes Spark's classpath and prints it to stdout; it's used by both the "run"
rem script and the ExecutorRunner in standalone cluster mode.

set SCALA_VERSION=2.9.3
set SCALA_VERSION=2.10

rem Figure out where the Spark framework is installed
set FWDIR=%~dp0..\
Expand Down
2 changes: 1 addition & 1 deletion bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# This script computes Spark's classpath and prints it to stdout; it's used by both the "run"
# script and the ExecutorRunner in standalone cluster mode.

SCALA_VERSION=2.9.3
SCALA_VERSION=2.10

# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"
Expand Down
29 changes: 15 additions & 14 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
</parent>

<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.9.3</artifactId>
<artifactId>spark-core_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project Core</name>
<url>http://spark.incubator.apache.org/</url>
Expand Down Expand Up @@ -86,7 +86,7 @@
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill_2.9.3</artifactId>
<artifactId>chill_2.10</artifactId>
<version>0.3.1</version>
</dependency>
<dependency>
Expand All @@ -96,27 +96,23 @@
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-actor</artifactId>
<artifactId>akka-actor_2.10</artifactId>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-remote</artifactId>
<artifactId>akka-remote_2.10</artifactId>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-slf4j</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scalap</artifactId>
<artifactId>akka-slf4j_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<groupId>net.liftweb</groupId>
<artifactId>lift-json_2.9.2</artifactId>
<artifactId>lift-json_2.10</artifactId>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
Expand Down Expand Up @@ -163,14 +159,19 @@
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.9.3</artifactId>
<artifactId>scalatest_2.10</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_2.9.3</artifactId>
<artifactId>scalacheck_2.10</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -190,8 +191,8 @@
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
<outputDirectory>target/scala-2.10/classes</outputDirectory>
<testOutputDirectory>target/scala-2.10/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.oio.OioSocketChannel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.net.InetSocketAddress;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.oio.OioEventLoopGroup;
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = {
if (!atMost.isFinite()) {
awaitResult()
} else {
} else jobWaiter.synchronized {
val finishTime = System.currentTimeMillis() + atMost.toMillis
while (!isCompleted) {
val time = System.currentTimeMillis()
Expand Down
37 changes: 25 additions & 12 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ import java.io._
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import scala.collection.mutable.HashSet
import scala.concurrent.Await
import scala.concurrent.duration._

import akka.actor._
import akka.dispatch._
import akka.pattern.ask
import akka.util.Duration


import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
Expand Down Expand Up @@ -55,9 +54,9 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
private[spark] class MapOutputTracker extends Logging {

private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")

// Set to the MapOutputTrackerActor living on the driver
var trackerActor: ActorRef = _
var trackerActor: Either[ActorRef, ActorSelection] = _

protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]

Expand All @@ -73,8 +72,18 @@ private[spark] class MapOutputTracker extends Logging {
// throw a SparkException if this fails.
private def askTracker(message: Any): Any = {
try {
val future = trackerActor.ask(message)(timeout)
return Await.result(future, timeout)
/*
The difference between ActorRef and ActorSelection is well explained here:
http://doc.akka.io/docs/akka/2.2.3/project/migration-guide-2.1.x-2.2.x.html#Use_actorSelection_instead_of_actorFor
In spark a map output tracker can be either started on Driver where it is created which
is an ActorRef or it can be on executor from where it is looked up which is an
actorSelection.
*/
val future = trackerActor match {
case Left(a: ActorRef) => a.ask(message)(timeout)
case Right(b: ActorSelection) => b.ask(message)(timeout)
}
Await.result(future, timeout)
} catch {
case e: Exception =>
throw new SparkException("Error communicating with MapOutputTracker", e)
Expand Down Expand Up @@ -117,7 +126,7 @@ private[spark] class MapOutputTracker extends Logging {
fetching += shuffleId
}
}

if (fetchedStatuses == null) {
// We won the race to fetch the output locs; do so
logInfo("Doing the fetch; tracker actor = " + trackerActor)
Expand All @@ -144,7 +153,7 @@ private[spark] class MapOutputTracker extends Logging {
else{
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing all output locations for shuffle " + shuffleId))
}
}
} else {
statuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
Expand Down Expand Up @@ -244,12 +253,12 @@ private[spark] class MapOutputTrackerMaster extends MapOutputTracker {
case Some(bytes) =>
return bytes
case None =>
statuses = mapStatuses(shuffleId)
statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
epochGotten = epoch
}
}
// If we got here, we failed to find the serialized locations in the cache, so we pulled
// out a snapshot of the locations as "locs"; let's serialize and return that
// out a snapshot of the locations as "statuses"; let's serialize and return that
val bytes = MapOutputTracker.serializeMapStatuses(statuses)
logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
// Add them into the table only if the epoch hasn't changed while we were working
Expand All @@ -274,6 +283,10 @@ private[spark] class MapOutputTrackerMaster extends MapOutputTracker {
override def updateEpoch(newEpoch: Long) {
// This might be called on the MapOutputTrackerMaster if we're running in local mode.
}

def has(shuffleId: Int): Boolean = {
cachedSerializedStatuses.get(shuffleId).isDefined || mapStatuses.contains(shuffleId)
}
}

private[spark] object MapOutputTracker {
Expand Down Expand Up @@ -308,7 +321,7 @@ private[spark] object MapOutputTracker {
statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
assert (statuses != null)
statuses.map {
status =>
status =>
if (status == null) {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing an output location for shuffle " + shuffleId))
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.spark

import org.apache.spark.util.Utils
import scala.reflect.ClassTag

import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils

/**
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
Expand Down Expand Up @@ -72,7 +74,7 @@ class HashPartitioner(partitions: Int) extends Partitioner {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}

override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
Expand All @@ -85,7 +87,7 @@ class HashPartitioner(partitions: Int) extends Partitioner {
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly equal ranges.
* Determines the ranges by sampling the RDD passed in.
*/
class RangePartitioner[K <% Ordered[K]: ClassManifest, V](
class RangePartitioner[K <% Ordered[K]: ClassTag, V](
partitions: Int,
@transient rdd: RDD[_ <: Product2[K,V]],
private val ascending: Boolean = true)
Expand Down
Loading

0 comments on commit 4e7c9e3

Please sign in to comment.