Skip to content

Commit

Permalink
[SPARK-1259] Make RDD locally iterable
Browse files Browse the repository at this point in the history
Author: Egor Pakhomov <[email protected]>

Closes apache#156 from epahomov/SPARK-1259 and squashes the following commits:

8ec8f24 [Egor Pakhomov] Make to local iterator shorter
34aa300 [Egor Pakhomov] Fix toLocalIterator docs
08363ef [Egor Pakhomov] SPARK-1259 from toLocallyIterable to toLocalIterator
6a994eb [Egor Pakhomov] SPARK-1259 Make RDD locally iterable
8be3dcf [Egor Pakhomov] SPARK-1259 Make RDD locally iterable
33ecb17 [Egor Pakhomov] SPARK-1259 Make RDD locally iterable
  • Loading branch information
epahomov authored and pwendell committed Apr 6, 2014
1 parent 7012ffa commit e258e50
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 1 deletion.
14 changes: 13 additions & 1 deletion core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.api.java

import java.util.{Comparator, List => JList}
import java.util.{Comparator, Iterator => JIterator, List => JList}
import java.lang.{Iterable => JIterable}

import scala.collection.JavaConversions._
import scala.reflect.ClassTag
Expand Down Expand Up @@ -280,6 +281,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
new java.util.ArrayList(arr)
}

/**
* Return an iterator that contains all of the elements in this RDD.
*
* The iterator will consume as much memory as the largest partition in this RDD.
*/
def toLocalIterator(): JIterator[T] = {
import scala.collection.JavaConversions._
rdd.toLocalIterator
}


/**
* Return an array that contains all of the elements in this RDD.
* @deprecated As of Spark 1.0.0, toArray() is deprecated, use {@link #collect()} instead
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,18 @@ abstract class RDD[T: ClassTag](
Array.concat(results: _*)
}

/**
* Return an iterator that contains all of the elements in this RDD.
*
* The iterator will consume as much memory as the largest partition in this RDD.
*/
def toLocalIterator: Iterator[T] = {
def collectPartition(p: Int): Array[T] = {
sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal = false).head
}
(0 until partitions.length).iterator.flatMap(i => collectPartition(i))
}

/**
* Return an array that contains all of the elements in this RDD.
*/
Expand Down
9 changes: 9 additions & 0 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import scala.Tuple2;

import com.google.common.collect.Lists;
import com.google.common.base.Optional;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
Expand Down Expand Up @@ -179,6 +180,14 @@ public void call(String s) {
Assert.assertEquals(2, foreachCalls);
}

@Test
public void toLocalIterator() {
List<Integer> correct = Arrays.asList(1, 2, 3, 4);
JavaRDD<Integer> rdd = sc.parallelize(correct);
List<Integer> result = Lists.newArrayList(rdd.toLocalIterator());
Assert.assertTrue(correct.equals(result));
}

@SuppressWarnings("unchecked")
@Test
public void lookup() {
Expand Down
1 change: 1 addition & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
test("basic operations") {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
assert(nums.collect().toList === List(1, 2, 3, 4))
assert(nums.toLocalIterator.toList === List(1, 2, 3, 4))
val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2)
assert(dups.distinct().count() === 4)
assert(dups.distinct.count === 4) // Can distinct and count be called without parentheses?
Expand Down

0 comments on commit e258e50

Please sign in to comment.