From 33ecb17fb8d3189fd5c0461690b08ac593936f18 Mon Sep 17 00:00:00 2001 From: Egor Pakhomov Date: Sun, 16 Mar 2014 16:10:53 +0400 Subject: [PATCH] SPARK-1259 Make RDD locally iterable --- .../main/scala/org/apache/spark/rdd/RDD.scala | 16 ++++++++++++++++ .../scala/org/apache/spark/rdd/RDDSuite.scala | 1 + 2 files changed, 17 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 3fe56963e0008..1aa494f6a4125 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -663,6 +663,22 @@ abstract class RDD[T: ClassTag]( Array.concat(results: _*) } + /** + * Return a Stream that contains all of the elements in this RDD. + * + * In case of iterating it consumes memory as the biggest partition in cluster. + */ + def toStream(): Stream[T] = { + def collectPartition(p: Int): Array[T] = sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal = false).head + var buffer = Stream.empty[T] + for (p <- 0 until this.partitions.length) { + buffer = buffer #::: { + collectPartition(p).toStream + } + } + buffer + } + /** * Return an array that contains all of the elements in this RDD. */ diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 60bcada55245b..ef88f2bc467e3 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -33,6 +33,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { test("basic operations") { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) assert(nums.collect().toList === List(1, 2, 3, 4)) + assert(nums.toStream().toList === List(1, 2, 3, 4)) val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2) assert(dups.distinct().count() === 4) assert(dups.distinct.count === 4) // Can distinct and count be called without parentheses?