Skip to content

Commit

Permalink
Merge branch 'master' into pyspark-inputformats
Browse files Browse the repository at this point in the history
Conflicts:
	project/SparkBuild.scala
  • Loading branch information
MLnick committed Apr 19, 2014
2 parents f6aac55 + 28238c8 commit 951c117
Show file tree
Hide file tree
Showing 22 changed files with 216 additions and 136 deletions.
35 changes: 24 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,33 @@ guide, on the project webpage at <http://spark.apache.org/documentation.html>.
This README file only contains basic setup instructions.


## Building
## Building Spark

Spark requires Scala 2.10. The project is built using Simple Build Tool (SBT),
which can be obtained [here](http://www.scala-sbt.org). If SBT is installed we
will use the system version of sbt otherwise we will attempt to download it
automatically. To build Spark and its example programs, run:
Spark is built on Scala 2.10. To build Spark and its example programs, run:

./sbt/sbt assembly

Once you've built Spark, the easiest way to start using it is the shell:
## Interactive Scala Shell

The easiest way to start using Spark is through the Scala shell:

./bin/spark-shell

Or, for the Python API, the Python shell (`./bin/pyspark`).
Try the following command, which should return 1000:

scala> sc.parallelize(1 to 1000).count()

## Interactive Python Shell

Alternatively, if you prefer Python, you can use the Python shell:

./bin/pyspark

And run the following command, which should also return 1000:

>>> sc.parallelize(range(1000)).count()

## Example Programs

Spark also comes with several sample programs in the `examples` directory.
To run one of them, use `./bin/run-example <class> <params>`. For example:
Expand All @@ -38,13 +51,13 @@ All of the Spark samples take a `<master>` parameter that is the cluster URL
to connect to. This can be a mesos:// or spark:// URL, or "local" to run
locally with one thread, or "local[N]" to run locally with N threads.

## Running tests
## Running Tests

Testing first requires [Building](#building) Spark. Once Spark is built, tests
Testing first requires [building Spark](#building-spark). Once Spark is built, tests
can be run using:

`./sbt/sbt test`
./sbt/sbt test

## A Note About Hadoop Versions

Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,14 @@ class HashPartitioner(partitions: Int) extends Partitioner {
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
* equal ranges. The ranges are determined by sampling the content of the RDD passed in.
*/
class RangePartitioner[K <% Ordered[K]: ClassTag, V](
class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
@transient rdd: RDD[_ <: Product2[K,V]],
private val ascending: Boolean = true)
extends Partitioner {

private val ordering = implicitly[Ordering[K]]

// An array of upper bounds for the first (partitions - 1) partitions
private val rangeBounds: Array[K] = {
if (partitions == 1) {
Expand All @@ -103,7 +105,7 @@ class RangePartitioner[K <% Ordered[K]: ClassTag, V](
val rddSize = rdd.count()
val maxSampleSize = partitions * 20.0
val frac = math.min(maxSampleSize / math.max(rddSize, 1), 1.0)
val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sortWith(_ < _)
val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sorted
if (rddSample.length == 0) {
Array()
} else {
Expand All @@ -126,7 +128,7 @@ class RangePartitioner[K <% Ordered[K]: ClassTag, V](
var partition = 0
if (rangeBounds.length < 1000) {
// If we have less than 100 partitions naive search
while (partition < rangeBounds.length && k > rangeBounds(partition)) {
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
partition += 1
}
} else {
Expand Down
49 changes: 27 additions & 22 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -427,9 +427,9 @@ class SparkContext(config: SparkConf) extends Logging {
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minSplits).map(pair => pair._2.toString)
minPartitions).map(pair => pair._2.toString)
}

/**
Expand Down Expand Up @@ -457,9 +457,10 @@ class SparkContext(config: SparkConf) extends Logging {
*
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
*
* @param minSplits A suggestion value of the minimal splitting number for input data.
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits): RDD[(String, String)] = {
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions):
RDD[(String, String)] = {
val job = new NewHadoopJob(hadoopConfiguration)
NewFileInputFormat.addInputPath(job, new Path(path))
val updateConf = job.getConfiguration
Expand All @@ -469,7 +470,7 @@ class SparkContext(config: SparkConf) extends Logging {
classOf[String],
classOf[String],
updateConf,
minSplits)
minPartitions)
}

/**
Expand All @@ -481,7 +482,7 @@ class SparkContext(config: SparkConf) extends Logging {
* @param inputFormatClass Class of the InputFormat
* @param keyClass Class of the keys
* @param valueClass Class of the values
* @param minSplits Minimum number of Hadoop Splits to generate.
* @param minPartitions Minimum number of Hadoop Splits to generate.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
Expand All @@ -493,11 +494,11 @@ class SparkContext(config: SparkConf) extends Logging {
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int = defaultMinSplits
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it.
SparkHadoopUtil.get.addCredentials(conf)
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minPartitions)
}

/** Get an RDD for a Hadoop file with an arbitrary InputFormat
Expand All @@ -512,7 +513,7 @@ class SparkContext(config: SparkConf) extends Logging {
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int = defaultMinSplits
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
Expand All @@ -524,15 +525,15 @@ class SparkContext(config: SparkConf) extends Logging {
inputFormatClass,
keyClass,
valueClass,
minSplits)
minPartitions)
}

/**
* Smarter version of hadoopFile() that uses class tags to figure out the classes of keys,
* values and the InputFormat so that users don't need to pass them directly. Instead, callers
* can just write, for example,
* {{{
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minPartitions)
* }}}
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
Expand All @@ -541,13 +542,13 @@ class SparkContext(config: SparkConf) extends Logging {
* a `map` function.
*/
def hadoopFile[K, V, F <: InputFormat[K, V]]
(path: String, minSplits: Int)
(path: String, minPartitions: Int)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
hadoopFile(path,
fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]],
vm.runtimeClass.asInstanceOf[Class[V]],
minSplits)
minPartitions)
}

/**
Expand All @@ -565,7 +566,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
hadoopFile[K, V, F](path, defaultMinSplits)
hadoopFile[K, V, F](path, defaultMinPartitions)

/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
Expand Down Expand Up @@ -626,10 +627,10 @@ class SparkContext(config: SparkConf) extends Logging {
def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int
minPartitions: Int
): RDD[(K, V)] = {
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
}

/** Get an RDD for a Hadoop SequenceFile with given key and value types.
Expand All @@ -641,7 +642,7 @@ class SparkContext(config: SparkConf) extends Logging {
* */
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]
): RDD[(K, V)] =
sequenceFile(path, keyClass, valueClass, defaultMinSplits)
sequenceFile(path, keyClass, valueClass, defaultMinPartitions)

/**
* Version of sequenceFile() for types implicitly convertible to Writables through a
Expand All @@ -665,7 +666,7 @@ class SparkContext(config: SparkConf) extends Logging {
* a `map` function.
*/
def sequenceFile[K, V]
(path: String, minSplits: Int = defaultMinSplits)
(path: String, minPartitions: Int = defaultMinPartitions)
(implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
: RDD[(K, V)] = {
Expand All @@ -674,7 +675,7 @@ class SparkContext(config: SparkConf) extends Logging {
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
val writables = hadoopFile(path, format,
kc.writableClass(km).asInstanceOf[Class[Writable]],
vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions)
writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
}

Expand All @@ -688,9 +689,9 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def objectFile[T: ClassTag](
path: String,
minSplits: Int = defaultMinSplits
minPartitions: Int = defaultMinPartitions
): RDD[T] = {
sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits)
sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions)
.flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes))
}

Expand Down Expand Up @@ -1183,8 +1184,12 @@ class SparkContext(config: SparkConf) extends Logging {
def defaultParallelism: Int = taskScheduler.defaultParallelism

/** Default min number of partitions for Hadoop RDDs when not given by user */
@deprecated("use defaultMinPartitions", "1.0.0")
def defaultMinSplits: Int = math.min(defaultParallelism, 2)

/** Default min number of partitions for Hadoop RDDs when not given by user */
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

private val nextShuffleId = new AtomicInteger(0)

private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()
Expand Down Expand Up @@ -1268,7 +1273,7 @@ object SparkContext extends Logging {
rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)

implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag](
implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
rdd: RDD[(K, V)]) =
new OrderedRDDFunctions[K, V, (K, V)](rdd)

Expand Down
10 changes: 2 additions & 8 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -626,10 +626,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* order of the keys).
*/
def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] = {
class KeyOrdering(val a: K) extends Ordered[K] {
override def compare(b: K) = comp.compare(a, b)
}
implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x)
implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering.
fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending))
}

Expand All @@ -640,10 +637,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* order of the keys).
*/
def sortByKey(comp: Comparator[K], ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V] = {
class KeyOrdering(val a: K) extends Ordered[K] {
override def compare(b: K) = comp.compare(a, b)
}
implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x)
implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering.
fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending, numPartitions))
}

Expand Down
Loading

0 comments on commit 951c117

Please sign in to comment.