From ce8ec5456169682f27f846e7b8d51e6c4bcf75e3 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 8 Apr 2014 18:15:52 -0700 Subject: [PATCH] Spark 1271: Co-Group and Group-By should pass Iterable[X] Author: Holden Karau Closes #242 from holdenk/spark-1320-cogroupandgroupshouldpassiterator and squashes the following commits: f289536 [Holden Karau] Fix bad merge, should have been Iterable rather than Iterator 77048f8 [Holden Karau] Fix merge up to master d3fe909 [Holden Karau] use toSeq instead 7a092a3 [Holden Karau] switch resultitr to resultiterable eb06216 [Holden Karau] maybe I should have had a coffee first. use correct import for guava iterables c5075aa [Holden Karau] If guava 14 had iterables 2d06e10 [Holden Karau] Fix Java 8 cogroup tests for the new API 11e730c [Holden Karau] Fix streaming tests 66b583d [Holden Karau] Fix the core test suite to compile 4ed579b [Holden Karau] Refactor from iterator to iterable d052c07 [Holden Karau] Python tests now pass with iterator pandas 3bcd81d [Holden Karau] Revert "Try and make pickling list iterators work" cd1e81c [Holden Karau] Try and make pickling list iterators work c60233a [Holden Karau] Start investigating moving to iterators for python API like the Java/Scala one. tl;dr: We will have to write our own iterator since the default one doesn't pickle well 88a5cef [Holden Karau] Fix cogroup test in JavaAPISuite for streaming a5ee714 [Holden Karau] oops, was checking wrong iterator e687f21 [Holden Karau] Fix groupbykey test in JavaAPISuite of streaming ec8cc3e [Holden Karau] Fix test issues\! 4b0eeb9 [Holden Karau] Switch cast in PairDStreamFunctions fa395c9 [Holden Karau] Revert "Add a join based on the problem in SVD" ec99e32 [Holden Karau] Revert "Revert this but for now put things in list pandas" b692868 [Holden Karau] Revert 7e533f7 [Holden Karau] Fix the bug 8a5153a [Holden Karau] Revert me, but we have some stuff to debug b4e86a9 [Holden Karau] Add a join based on the problem in SVD c4510e2 [Holden Karau] Revert this but for now put things in list pandas b4e0b1d [Holden Karau] Fix style issues 71e8b9f [Holden Karau] I really need to stop calling size on iterators, it is the path of sadness. b1ae51a [Holden Karau] Fix some of the types in the streaming JavaAPI suite. Probably still needs more work 37888ec [Holden Karau] core/tests now pass 249abde [Holden Karau] org.apache.spark.rdd.PairRDDFunctionsSuite passes 6698186 [Holden Karau] Revert "I think this might be a bad rabbit hole. Started work to make CoGroupedRDD use iterator and then went crazy" fe992fe [Holden Karau] hmmm try and fix up basic operation suite 172705c [Holden Karau] Fix Java API suite caafa63 [Holden Karau] I think this might be a bad rabbit hole. Started work to make CoGroupedRDD use iterator and then went crazy 88b3329 [Holden Karau] Fix groupbykey to actually give back an iterator 4991af6 [Holden Karau] Fix some tests be50246 [Holden Karau] Calling size on an iterator is not so good if we want to use it after 687ffbc [Holden Karau] This is the it compiles point of replacing Seq with Iterator and JList with JIterator in the groupby and cogroup signatures --- .../scala/org/apache/spark/bagel/Bagel.scala | 20 ++++--- .../apache/spark/api/java/JavaPairRDD.scala | 36 ++++++------ .../apache/spark/api/java/JavaRDDLike.scala | 6 +- .../apache/spark/rdd/PairRDDFunctions.scala | 39 +++++++------ .../main/scala/org/apache/spark/rdd/RDD.scala | 6 +- .../java/org/apache/spark/JavaAPISuite.java | 20 ++++--- .../scala/org/apache/spark/FailureSuite.scala | 4 +- .../org/apache/spark/PipedRDDSuite.scala | 2 +- .../spark/rdd/PairRDDFunctionsSuite.scala | 12 ++-- .../ExternalAppendOnlyMapSuite.scala | 4 +- .../apache/spark/examples/JavaPageRank.java | 21 ++++--- .../bagel/WikipediaPageRankStandalone.scala | 14 +++-- .../java/org/apache/spark/Java8APISuite.java | 11 ++-- .../org/apache/spark/mllib/linalg/SVD.scala | 6 +- .../spark/mllib/recommendation/ALS.scala | 4 +- .../org/apache/spark/mllib/util/LAUtils.scala | 6 +- python/pyspark/join.py | 5 +- python/pyspark/rdd.py | 10 ++-- python/pyspark/resultiterable.py | 33 +++++++++++ .../streaming/api/java/JavaPairDStream.scala | 42 +++++++------- .../dstream/PairDStreamFunctions.scala | 29 +++++----- .../streaming/dstream/StateDStream.scala | 13 +++-- .../apache/spark/streaming/JavaAPISuite.java | 58 ++++++++++++++++--- .../streaming/BasicOperationsSuite.scala | 4 +- 24 files changed, 252 insertions(+), 153 deletions(-) create mode 100644 python/pyspark/resultiterable.py diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala index 70c7474a936dc..70a99b33d753c 100644 --- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala @@ -220,20 +220,23 @@ object Bagel extends Logging { */ private def comp[K: Manifest, V <: Vertex, M <: Message[K], C]( sc: SparkContext, - grouped: RDD[(K, (Seq[C], Seq[V]))], + grouped: RDD[(K, (Iterable[C], Iterable[V]))], compute: (V, Option[C]) => (V, Array[M]), storageLevel: StorageLevel ): (RDD[(K, (V, Array[M]))], Int, Int) = { var numMsgs = sc.accumulator(0) var numActiveVerts = sc.accumulator(0) - val processed = grouped.flatMapValues { - case (_, vs) if vs.size == 0 => None - case (c, vs) => + val processed = grouped.mapValues(x => (x._1.iterator, x._2.iterator)) + .flatMapValues { + case (_, vs) if !vs.hasNext => None + case (c, vs) => { val (newVert, newMsgs) = - compute(vs(0), c match { - case Seq(comb) => Some(comb) - case Seq() => None - }) + compute(vs.next, + c.hasNext match { + case true => Some(c.next) + case false => None + } + ) numMsgs += newMsgs.size if (newVert.active) { @@ -241,6 +244,7 @@ object Bagel extends Logging { } Some((newVert, newMsgs)) + } }.persist(storageLevel) // Force evaluation of processed RDD for accurate performance measurements diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 9596dbaf75488..e6c5d85917678 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -18,6 +18,7 @@ package org.apache.spark.api.java import java.util.{Comparator, List => JList} +import java.lang.{Iterable => JIterable} import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -250,14 +251,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * Group the values for each key in the RDD into a single sequence. Allows controlling the * partitioning of the resulting key-value pair RDD by passing a Partitioner. */ - def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JList[V]] = + def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] = fromRDD(groupByResultToJava(rdd.groupByKey(partitioner))) /** * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with into `numPartitions` partitions. */ - def groupByKey(numPartitions: Int): JavaPairRDD[K, JList[V]] = + def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] = fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions))) /** @@ -367,7 +368,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with the existing partitioner/parallelism level. */ - def groupByKey(): JavaPairRDD[K, JList[V]] = + def groupByKey(): JavaPairRDD[K, JIterable[V]] = fromRDD(groupByResultToJava(rdd.groupByKey())) /** @@ -462,7 +463,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner) - : JavaPairRDD[K, (JList[V], JList[W])] = + : JavaPairRDD[K, (JIterable[V], JIterable[W])] = fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner))) /** @@ -470,14 +471,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], - partitioner: Partitioner): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] = fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner))) /** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = + def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] = fromRDD(cogroupResultToJava(rdd.cogroup(other))) /** @@ -485,7 +486,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] = fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2))) /** @@ -493,7 +494,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int) - : JavaPairRDD[K, (JList[V], JList[W])] = + : JavaPairRDD[K, (JIterable[V], JIterable[W])] = fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions))) /** @@ -501,16 +502,16 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] = fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions))) /** Alias for cogroup. */ - def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = + def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] = fromRDD(cogroupResultToJava(rdd.groupWith(other))) /** Alias for cogroup. */ def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] = fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2))) /** @@ -695,21 +696,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) object JavaPairRDD { private[spark] - def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Seq[T])]): RDD[(K, JList[T])] = { - rddToPairRDDFunctions(rdd).mapValues(seqAsJavaList) + def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Iterable[T])]): RDD[(K, JIterable[T])] = { + rddToPairRDDFunctions(rdd).mapValues(asJavaIterable) } private[spark] def cogroupResultToJava[K: ClassTag, V, W]( - rdd: RDD[(K, (Seq[V], Seq[W]))]): RDD[(K, (JList[V], JList[W]))] = { - rddToPairRDDFunctions(rdd).mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2))) + rdd: RDD[(K, (Iterable[V], Iterable[W]))]): RDD[(K, (JIterable[V], JIterable[W]))] = { + rddToPairRDDFunctions(rdd).mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2))) } private[spark] def cogroupResult2ToJava[K: ClassTag, V, W1, W2]( - rdd: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))]): RDD[(K, (JList[V], JList[W1], JList[W2]))] = { + rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]) + : RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2]))] = { rddToPairRDDFunctions(rdd) - .mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2), seqAsJavaList(x._3))) + .mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3))) } def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 6e8ec8e0c7629..ae577b500ccb4 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -17,7 +17,7 @@ package org.apache.spark.api.java -import java.util.{Comparator, Iterator => JIterator, List => JList} +import java.util.{Comparator, List => JList, Iterator => JIterator} import java.lang.{Iterable => JIterable} import scala.collection.JavaConversions._ @@ -204,7 +204,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. */ - def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = { + def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JIterable[T]] = { implicit val ctagK: ClassTag[K] = fakeClassTag implicit val ctagV: ClassTag[JList[T]] = fakeClassTag JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag))) @@ -214,7 +214,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. */ - def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = { + def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JIterable[T]] = { implicit val ctagK: ClassTag[K] = fakeClassTag implicit val ctagV: ClassTag[JList[T]] = fakeClassTag JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K]))) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 14386ff5b9127..a92a84b5342d1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -261,7 +261,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * Group the values for each key in the RDD into a single sequence. Allows controlling the * partitioning of the resulting key-value pair RDD by passing a Partitioner. */ - def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = { + def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. @@ -270,14 +270,14 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2 val bufs = combineByKey[ArrayBuffer[V]]( createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false) - bufs.asInstanceOf[RDD[(K, Seq[V])]] + bufs.mapValues(_.toIterable) } /** * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with into `numPartitions` partitions. */ - def groupByKey(numPartitions: Int): RDD[(K, Seq[V])] = { + def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = { groupByKey(new HashPartitioner(numPartitions)) } @@ -298,7 +298,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) */ def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => - for (v <- vs.iterator; w <- ws.iterator) yield (v, w) + for (v <- vs; w <- ws) yield (v, w) } } @@ -311,9 +311,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => if (ws.isEmpty) { - vs.iterator.map(v => (v, None)) + vs.map(v => (v, None)) } else { - for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w)) + for (v <- vs; w <- ws) yield (v, Some(w)) } } } @@ -328,9 +328,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) : RDD[(K, (Option[V], W))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => if (vs.isEmpty) { - ws.iterator.map(w => (None, w)) + ws.map(w => (None, w)) } else { - for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w) + for (v <- vs; w <- ws) yield (Some(v), w) } } } @@ -358,7 +358,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with the existing partitioner/parallelism level. */ - def groupByKey(): RDD[(K, Seq[V])] = { + def groupByKey(): RDD[(K, Iterable[V])] = { groupByKey(defaultPartitioner(self)) } @@ -453,7 +453,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = { + def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner) + : RDD[(K, (Iterable[V], Iterable[W]))] = { if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } @@ -468,13 +469,15 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) - : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner) cg.mapValues { case Seq(vs, w1s, w2s) => - (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]) + (vs.asInstanceOf[Seq[V]], + w1s.asInstanceOf[Seq[W1]], + w2s.asInstanceOf[Seq[W2]]) } } @@ -482,7 +485,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, defaultPartitioner(self, other)) } @@ -491,7 +494,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) - : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } @@ -499,7 +502,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = { + def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, new HashPartitioner(numPartitions)) } @@ -508,18 +511,18 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int) - : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { cogroup(other1, other2, new HashPartitioner(numPartitions)) } /** Alias for cogroup. */ - def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, defaultPartitioner(self, other)) } /** Alias for cogroup. */ def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) - : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index bf3c57ad41eb2..74fa2a4fcd401 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -438,20 +438,20 @@ abstract class RDD[T: ClassTag]( /** * Return an RDD of grouped items. */ - def groupBy[K: ClassTag](f: T => K): RDD[(K, Seq[T])] = + def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])] = groupBy[K](f, defaultPartitioner(this)) /** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. */ - def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] = + def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])] = groupBy(f, new HashPartitioner(numPartitions)) /** * Return an RDD of grouped items. */ - def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = { + def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])] = { val cleanF = sc.clean(f) this.map(t => (cleanF(t), t)).groupByKey(p) } diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 762405be2a8f9..ab2fdac553349 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -18,10 +18,12 @@ package org.apache.spark; import java.io.*; +import java.lang.StringBuilder; import java.util.*; import scala.Tuple2; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.base.Optional; import com.google.common.base.Charsets; @@ -197,7 +199,7 @@ public void lookup() { new Tuple2("Oranges", "Citrus") )); Assert.assertEquals(2, categories.lookup("Oranges").size()); - Assert.assertEquals(2, categories.groupByKey().lookup("Oranges").get(0).size()); + Assert.assertEquals(2, Iterables.size(categories.groupByKey().lookup("Oranges").get(0))); } @Test @@ -209,15 +211,15 @@ public Boolean call(Integer x) { return x % 2 == 0; } }; - JavaPairRDD> oddsAndEvens = rdd.groupBy(isOdd); + JavaPairRDD> oddsAndEvens = rdd.groupBy(isOdd); Assert.assertEquals(2, oddsAndEvens.count()); - Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens - Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds + Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens + Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds oddsAndEvens = rdd.groupBy(isOdd, 1); Assert.assertEquals(2, oddsAndEvens.count()); - Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens - Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds + Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens + Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds } @SuppressWarnings("unchecked") @@ -232,9 +234,9 @@ public void cogroup() { new Tuple2("Oranges", 2), new Tuple2("Apples", 3) )); - JavaPairRDD, List>> cogrouped = categories.cogroup(prices); - Assert.assertEquals("[Fruit, Citrus]", cogrouped.lookup("Oranges").get(0)._1().toString()); - Assert.assertEquals("[2]", cogrouped.lookup("Oranges").get(0)._2().toString()); + JavaPairRDD, Iterable>> cogrouped = categories.cogroup(prices); + Assert.assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1())); + Assert.assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2())); cogrouped.collect(); } diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala index f3fb64d87a2fd..12dbebcb28644 100644 --- a/core/src/test/scala/org/apache/spark/FailureSuite.scala +++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala @@ -72,7 +72,7 @@ class FailureSuite extends FunSuite with LocalSparkContext { throw new Exception("Intentional task failure") } } - (k, v(0) * v(0)) + (k, v.head * v.head) }.collect() FailureSuiteState.synchronized { assert(FailureSuiteState.tasksRun === 4) @@ -137,5 +137,3 @@ class FailureSuite extends FunSuite with LocalSparkContext { // TODO: Need to add tests with shuffle fetch failures. } - - diff --git a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala index 627e9b5cd9060..867b28cc0d971 100644 --- a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala @@ -85,7 +85,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext { (f: String => Unit) => { bl.value.map(f(_)); f("\u0001") }, - (i: Tuple2[String, Seq[String]], f: String => Unit) => { + (i: Tuple2[String, Iterable[String]], f: String => Unit) => { for (e <- i._2) { f(e + "_") } diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index f9e994b13dfbc..8f3e6bd21b752 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -225,11 +225,12 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val joined = rdd1.groupWith(rdd2).collect() assert(joined.size === 4) - assert(joined.toSet === Set( - (1, (ArrayBuffer(1, 2), ArrayBuffer('x'))), - (2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))), - (3, (ArrayBuffer(1), ArrayBuffer())), - (4, (ArrayBuffer(), ArrayBuffer('w'))) + val joinedSet = joined.map(x => (x._1, (x._2._1.toList, x._2._2.toList))).toSet + assert(joinedSet === Set( + (1, (List(1, 2), List('x'))), + (2, (List(1), List('y', 'z'))), + (3, (List(1), List())), + (4, (List(), List('w'))) )) } @@ -447,4 +448,3 @@ class ConfigTestFormat() extends FakeFormat() with Configurable { super.getRecordWriter(p1) } } - diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index fce1184d46364..cdebefb67510c 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -174,9 +174,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { assert(result1.toSet == Set[(Int, Int)]((0, 5), (1, 5))) // groupByKey - val result2 = rdd.groupByKey().collect() + val result2 = rdd.groupByKey().collect().map(x => (x._1, x._2.toList)).toSet assert(result2.toSet == Set[(Int, Seq[Int])] - ((0, ArrayBuffer[Int](1, 1, 1, 1, 1)), (1, ArrayBuffer[Int](1, 1, 1, 1, 1)))) + ((0, List[Int](1, 1, 1, 1, 1)), (1, List[Int](1, 1, 1, 1, 1)))) } test("simple cogroup") { diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java index eb70fb547564c..8513ba07e7705 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java @@ -17,7 +17,10 @@ package org.apache.spark.examples; + import scala.Tuple2; + +import com.google.common.collect.Iterables; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -26,8 +29,9 @@ import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; -import java.util.List; import java.util.ArrayList; +import java.util.List; +import java.util.Iterator; import java.util.regex.Pattern; /** @@ -66,7 +70,7 @@ public static void main(String[] args) throws Exception { JavaRDD lines = ctx.textFile(args[1], 1); // Loads all URLs from input file and initialize their neighbors. - JavaPairRDD> links = lines.mapToPair(new PairFunction() { + JavaPairRDD> links = lines.mapToPair(new PairFunction() { @Override public Tuple2 call(String s) { String[] parts = SPACES.split(s); @@ -75,9 +79,9 @@ public Tuple2 call(String s) { }).distinct().groupByKey().cache(); // Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one. - JavaPairRDD ranks = links.mapValues(new Function, Double>() { + JavaPairRDD ranks = links.mapValues(new Function, Double>() { @Override - public Double call(List rs) { + public Double call(Iterable rs) { return 1.0; } }); @@ -86,12 +90,13 @@ public Double call(List rs) { for (int current = 0; current < Integer.parseInt(args[2]); current++) { // Calculates URL contributions to the rank of other URLs. JavaPairRDD contribs = links.join(ranks).values() - .flatMapToPair(new PairFlatMapFunction, Double>, String, Double>() { + .flatMapToPair(new PairFlatMapFunction, Double>, String, Double>() { @Override - public Iterable> call(Tuple2, Double> s) { + public Iterable> call(Tuple2, Double> s) { + int urlCount = Iterables.size(s._1); List> results = new ArrayList>(); - for (String n : s._1()) { - results.add(new Tuple2(n, s._2() / s._1().size())); + for (String n : s._1) { + results.add(new Tuple2(n, s._2() / urlCount)); } return results; } diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala index 27afa6b642758..7aac6a13597e6 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala @@ -115,12 +115,16 @@ object WikipediaPageRankStandalone { var ranks = links.mapValues { edges => defaultRank } for (i <- 1 to numIterations) { val contribs = links.groupWith(ranks).flatMap { - case (id, (linksWrapper, rankWrapper)) => - if (linksWrapper.length > 0) { - if (rankWrapper.length > 0) { - linksWrapper(0).map(dest => (dest, rankWrapper(0) / linksWrapper(0).size)) + case (id, (linksWrapperIterable, rankWrapperIterable)) => + val linksWrapper = linksWrapperIterable.iterator + val rankWrapper = rankWrapperIterable.iterator + if (linksWrapper.hasNext) { + val linksWrapperHead = linksWrapper.next + if (rankWrapper.hasNext) { + val rankWrapperHead = rankWrapper.next + linksWrapperHead.map(dest => (dest, rankWrapperHead / linksWrapperHead.size)) } else { - linksWrapper(0).map(dest => (dest, defaultRank / linksWrapper(0).size)) + linksWrapperHead.map(dest => (dest, defaultRank / linksWrapperHead.size)) } } else { Array[(String, Double)]() diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java index f67251217ed4a..7eb8b45fc3cf0 100644 --- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java +++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java @@ -23,6 +23,7 @@ import scala.Tuple2; +import com.google.common.collections.Iterables; import com.google.common.base.Optional; import com.google.common.io.Files; import org.apache.hadoop.io.IntWritable; @@ -85,15 +86,15 @@ public void foreach() { public void groupBy() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); Function isOdd = x -> x % 2 == 0; - JavaPairRDD> oddsAndEvens = rdd.groupBy(isOdd); + JavaPairRDD> oddsAndEvens = rdd.groupBy(isOdd); Assert.assertEquals(2, oddsAndEvens.count()); - Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens - Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds + Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens + Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds oddsAndEvens = rdd.groupBy(isOdd, 1); Assert.assertEquals(2, oddsAndEvens.count()); - Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens - Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds + Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens + Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds } @Test diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index 3e7cc648d1d37..0d97b7d92f155 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -69,11 +69,11 @@ class SVD { /** * Compute SVD using the current set parameters - * Returns (U, S, V) such that A = USV^T + * Returns (U, S, V) such that A = USV^T * U is a row-by-row dense matrix * S is a simple double array of singular values * V is a 2d array matrix - * See [[denseSVD]] for more documentation + * See [[denseSVD]] for more documentation */ def compute(matrix: RDD[Array[Double]]): (RDD[Array[Double]], Array[Double], Array[Array[Double]]) = { @@ -393,5 +393,3 @@ object SVD { System.exit(0) } } - - diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 0cc9f48769f83..3124fac326d22 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -421,12 +421,12 @@ class ALS private ( * Compute the new feature vectors for a block of the users matrix given the list of factors * it received from each product and its InLinkBlock. */ - private def updateBlock(messages: Seq[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock, + private def updateBlock(messages: Iterable[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock, rank: Int, lambda: Double, alpha: Double, YtY: Option[Broadcast[DoubleMatrix]]) : Array[Array[Double]] = { // Sort the incoming block factor messages by block ID and make them an array - val blockFactors = messages.sortBy(_._1).map(_._2).toArray // Array[Array[Double]] + val blockFactors = messages.toSeq.sortBy(_._1).map(_._2).toArray // Array[Array[Double]] val numBlocks = blockFactors.length val numUsers = inLinkBlock.elementIds.length diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala index afe081295bfae..87aac347579c7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala @@ -38,8 +38,10 @@ object LAUtils { case (i, cols) => val rowArray = Array.ofDim[Double](n) var j = 0 - while (j < cols.size) { - rowArray(cols(j)._1) = cols(j)._2 + val colsItr = cols.iterator + while (colsItr.hasNext) { + val element = colsItr.next + rowArray(element._1) = element._2 j += 1 } MatrixRow(i, rowArray) diff --git a/python/pyspark/join.py b/python/pyspark/join.py index 5f4294fb1b777..6f94d26ef86a9 100644 --- a/python/pyspark/join.py +++ b/python/pyspark/join.py @@ -31,11 +31,12 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ +from pyspark.resultiterable import ResultIterable def _do_python_join(rdd, other, numPartitions, dispatch): vs = rdd.map(lambda (k, v): (k, (1, v))) ws = other.map(lambda (k, v): (k, (2, v))) - return vs.union(ws).groupByKey(numPartitions).flatMapValues(dispatch) + return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x : dispatch(x.__iter__())) def python_join(rdd, other, numPartitions): @@ -88,5 +89,5 @@ def dispatch(seq): vbuf.append(v) elif n == 2: wbuf.append(v) - return (vbuf, wbuf) + return (ResultIterable(vbuf), ResultIterable(wbuf)) return vs.union(ws).groupByKey(numPartitions).mapValues(dispatch) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index fb27863e07f55..91fc7e637e2c6 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -38,6 +38,7 @@ from pyspark.statcounter import StatCounter from pyspark.rddsampler import RDDSampler from pyspark.storagelevel import StorageLevel +from pyspark.resultiterable import ResultIterable from py4j.java_collections import ListConverter, MapConverter @@ -1118,7 +1119,7 @@ def groupByKey(self, numPartitions=None): Hash-partitions the resulting RDD with into numPartitions partitions. >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) - >>> sorted(x.groupByKey().collect()) + >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect())) [('a', [1, 1]), ('b', [1])] """ @@ -1133,7 +1134,7 @@ def mergeCombiners(a, b): return a + b return self.combineByKey(createCombiner, mergeValue, mergeCombiners, - numPartitions) + numPartitions).mapValues(lambda x: ResultIterable(x)) # TODO: add tests def flatMapValues(self, f): @@ -1180,7 +1181,7 @@ def cogroup(self, other, numPartitions=None): >>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> y = sc.parallelize([("a", 2)]) - >>> sorted(x.cogroup(y).collect()) + >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect()))) [('a', ([1], [2])), ('b', ([4], []))] """ return python_cogroup(self, other, numPartitions) @@ -1217,7 +1218,7 @@ def keyBy(self, f): >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) >>> y = sc.parallelize(zip(range(0,5), range(0,5))) - >>> sorted(x.cogroup(y).collect()) + >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect())) [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))] """ return self.map(lambda x: (f(x), x)) @@ -1317,7 +1318,6 @@ def getStorageLevel(self): # keys in the pairs. This could be an expensive operation, since those # hashes aren't retained. - class PipelinedRDD(RDD): """ Pipelined maps: diff --git a/python/pyspark/resultiterable.py b/python/pyspark/resultiterable.py new file mode 100644 index 0000000000000..7f418f8d2e29a --- /dev/null +++ b/python/pyspark/resultiterable.py @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +__all__ = ["ResultIterable"] + +import collections + +class ResultIterable(collections.Iterable): + """ + A special result iterable. This is used because the standard iterator can not be pickled + """ + def __init__(self, data): + self.data = data + self.index = 0 + self.maxindex = len(data) + def __iter__(self): + return iter(self.data) + def __len__(self): + return len(self.data) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index ac451d1913aaa..2ac943d7bf781 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.api.java -import java.lang.{Long => JLong} +import java.lang.{Long => JLong, Iterable => JIterable} import java.util.{List => JList} import scala.collection.JavaConversions._ @@ -115,15 +115,15 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to * generate the RDDs with Spark's default number of partitions. */ - def groupByKey(): JavaPairDStream[K, JList[V]] = - dstream.groupByKey().mapValues(seqAsJavaList _) + def groupByKey(): JavaPairDStream[K, JIterable[V]] = + dstream.groupByKey().mapValues(asJavaIterable _) /** * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to * generate the RDDs with `numPartitions` partitions. */ - def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] = - dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _) + def groupByKey(numPartitions: Int): JavaPairDStream[K, JIterable[V]] = + dstream.groupByKey(numPartitions).mapValues(asJavaIterable _) /** * Return a new DStream by applying `groupByKey` on each RDD of `this` DStream. @@ -131,8 +131,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * single sequence to generate the RDDs of the new DStream. org.apache.spark.Partitioner * is used to control the partitioning of each RDD. */ - def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] = - dstream.groupByKey(partitioner).mapValues(seqAsJavaList _) + def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JIterable[V]] = + dstream.groupByKey(partitioner).mapValues(asJavaIterable _) /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are @@ -196,8 +196,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval */ - def groupByKeyAndWindow(windowDuration: Duration): JavaPairDStream[K, JList[V]] = { - dstream.groupByKeyAndWindow(windowDuration).mapValues(seqAsJavaList _) + def groupByKeyAndWindow(windowDuration: Duration): JavaPairDStream[K, JIterable[V]] = { + dstream.groupByKeyAndWindow(windowDuration).mapValues(asJavaIterable _) } /** @@ -211,8 +211,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * DStream's batching interval */ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration) - : JavaPairDStream[K, JList[V]] = { - dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(seqAsJavaList _) + : JavaPairDStream[K, JIterable[V]] = { + dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(asJavaIterable _) } /** @@ -227,9 +227,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * @param numPartitions Number of partitions of each RDD in the new DStream. */ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int) - :JavaPairDStream[K, JList[V]] = { + :JavaPairDStream[K, JIterable[V]] = { dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions) - .mapValues(seqAsJavaList _) + .mapValues(asJavaIterable _) } /** @@ -247,9 +247,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner - ):JavaPairDStream[K, JList[V]] = { + ):JavaPairDStream[K, JIterable[V]] = { dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner) - .mapValues(seqAsJavaList _) + .mapValues(asJavaIterable _) } /** @@ -518,9 +518,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Hash partitioning is used to generate the RDDs with Spark's default number * of partitions. */ - def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = { + def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JIterable[V], JIterable[W])] = { implicit val cm: ClassTag[W] = fakeClassTag - dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + dstream.cogroup(other.dstream).mapValues(t => (asJavaIterable(t._1), asJavaIterable((t._2)))) } /** @@ -530,10 +530,10 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( def cogroup[W]( other: JavaPairDStream[K, W], numPartitions: Int - ): JavaPairDStream[K, (JList[V], JList[W])] = { + ): JavaPairDStream[K, (JIterable[V], JIterable[W])] = { implicit val cm: ClassTag[W] = fakeClassTag dstream.cogroup(other.dstream, numPartitions) - .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + .mapValues(t => (asJavaIterable(t._1), asJavaIterable((t._2)))) } /** @@ -543,10 +543,10 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( def cogroup[W]( other: JavaPairDStream[K, W], partitioner: Partitioner - ): JavaPairDStream[K, (JList[V], JList[W])] = { + ): JavaPairDStream[K, (JIterable[V], JIterable[W])] = { implicit val cm: ClassTag[W] = fakeClassTag dstream.cogroup(other.dstream, partitioner) - .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + .mapValues(t => (asJavaIterable(t._1), asJavaIterable((t._2)))) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index 2473496949360..354bc132dcdc0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -51,7 +51,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to * generate the RDDs with Spark's default number of partitions. */ - def groupByKey(): DStream[(K, Seq[V])] = { + def groupByKey(): DStream[(K, Iterable[V])] = { groupByKey(defaultPartitioner()) } @@ -59,7 +59,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to * generate the RDDs with `numPartitions` partitions. */ - def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = { + def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = { groupByKey(defaultPartitioner(numPartitions)) } @@ -67,12 +67,12 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) * Return a new DStream by applying `groupByKey` on each RDD. The supplied * org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ - def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = { + def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = { val createCombiner = (v: V) => ArrayBuffer[V](v) val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v) val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2) combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner) - .asInstanceOf[DStream[(K, Seq[V])]] + .asInstanceOf[DStream[(K, Iterable[V])]] } /** @@ -126,7 +126,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval */ - def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Seq[V])] = { + def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])] = { groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner()) } @@ -140,7 +140,8 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval */ - def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration) + : DStream[(K, Iterable[V])] = { groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner()) } @@ -161,7 +162,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) windowDuration: Duration, slideDuration: Duration, numPartitions: Int - ): DStream[(K, Seq[V])] = { + ): DStream[(K, Iterable[V])] = { groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions)) } @@ -180,14 +181,14 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner - ): DStream[(K, Seq[V])] = { - val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= v - val mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= v + ): DStream[(K, Iterable[V])] = { + val createCombiner = (v: Iterable[V]) => new ArrayBuffer[V] ++= v + val mergeValue = (buf: ArrayBuffer[V], v: Iterable[V]) => buf ++= v val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2 self.groupByKey(partitioner) .window(windowDuration, slideDuration) .combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner) - .asInstanceOf[DStream[(K, Seq[V])]] + .asInstanceOf[DStream[(K, Iterable[V])]] } /** @@ -438,7 +439,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) * Hash partitioning is used to generate the RDDs with Spark's default number * of partitions. */ - def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = { + def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, defaultPartitioner()) } @@ -447,7 +448,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. */ def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int) - : DStream[(K, (Seq[V], Seq[W]))] = { + : DStream[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, defaultPartitioner(numPartitions)) } @@ -458,7 +459,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) def cogroup[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner - ): DStream[(K, (Seq[V], Seq[W]))] = { + ): DStream[(K, (Iterable[V], Iterable[W]))] = { self.transformWith( other, (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala index 5f7d3ba26c656..7e22268767de7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala @@ -56,9 +56,14 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( // first map the cogrouped tuple to tuples of required type, // and then apply the update function val updateFuncLocal = updateFunc - val finalFunc = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => { + val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => { val i = iterator.map(t => { - (t._1, t._2._1, t._2._2.headOption) + val itr = t._2._2.iterator + val headOption = itr.hasNext match { + case true => Some(itr.next()) + case false => None + } + (t._1, t._2._1.toSeq, headOption) }) updateFuncLocal(i) } @@ -90,8 +95,8 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( // first map the grouped tuple to tuples of required type, // and then apply the update function val updateFuncLocal = updateFunc - val finalFunc = (iterator: Iterator[(K, Seq[V])]) => { - updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2, None))) + val finalFunc = (iterator: Iterator[(K, Iterable[V])]) => { + updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2.toSeq, None))) } val groupedRDD = parentRDD.groupByKey(partitioner) diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index e93bf18b6d0b9..13fa64894b773 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -23,6 +23,7 @@ import org.junit.Test; import java.io.*; import java.util.*; +import java.lang.Iterable; import com.google.common.base.Optional; import com.google.common.collect.Lists; @@ -45,6 +46,18 @@ // see http://stackoverflow.com/questions/758570/. public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable { + public void equalIterator(Iterator a, Iterator b) { + while (a.hasNext() && b.hasNext()) { + Assert.assertEquals(a.next(), b.next()); + } + Assert.assertEquals(a.hasNext(), b.hasNext()); + } + + public void equalIterable(Iterable a, Iterable b) { + equalIterator(a.iterator(), b.iterator()); + } + + @SuppressWarnings("unchecked") @Test public void testCount() { @@ -1016,11 +1029,24 @@ public void testPairGroupByKey() { JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream> grouped = pairStream.groupByKey(); + JavaPairDStream> grouped = pairStream.groupByKey(); JavaTestUtils.attachTestOutputStream(grouped); - List>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); + List>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected.size(), result.size()); + Iterator>>> resultItr = result.iterator(); + Iterator>>> expectedItr = expected.iterator(); + while (resultItr.hasNext() && expectedItr.hasNext()) { + Iterator>> resultElements = resultItr.next().iterator(); + Iterator>> expectedElements = expectedItr.next().iterator(); + while (resultElements.hasNext() && expectedElements.hasNext()) { + Tuple2> resultElement = resultElements.next(); + Tuple2> expectedElement = expectedElements.next(); + Assert.assertEquals(expectedElement._1(), resultElement._1()); + equalIterable(expectedElement._2(), resultElement._2()); + } + Assert.assertEquals(resultElements.hasNext(), expectedElements.hasNext()); + } } @SuppressWarnings("unchecked") @@ -1128,7 +1154,7 @@ public void testGroupByKeyAndWindow() { JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream> groupWindowed = + JavaPairDStream> groupWindowed = pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(groupWindowed); List>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1471,11 +1497,25 @@ public void testCoGroup() { ssc, stringStringKVStream2, 1); JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2); - JavaPairDStream, List>> grouped = pairStream1.cogroup(pairStream2); + JavaPairDStream, Iterable>> grouped = pairStream1.cogroup(pairStream2); JavaTestUtils.attachTestOutputStream(grouped); - List, List>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); + List, Iterable>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected.size(), result.size()); + Iterator, Iterable>>>> resultItr = result.iterator(); + Iterator, List>>>> expectedItr = expected.iterator(); + while (resultItr.hasNext() && expectedItr.hasNext()) { + Iterator, Iterable>>> resultElements = resultItr.next().iterator(); + Iterator, List>>> expectedElements = expectedItr.next().iterator(); + while (resultElements.hasNext() && expectedElements.hasNext()) { + Tuple2, Iterable>> resultElement = resultElements.next(); + Tuple2, List>> expectedElement = expectedElements.next(); + Assert.assertEquals(expectedElement._1(), resultElement._1()); + equalIterable(expectedElement._2()._1(), resultElement._2()._1()); + equalIterable(expectedElement._2()._2(), resultElement._2()._2()); + } + Assert.assertEquals(resultElements.hasNext(), expectedElements.hasNext()); + } } @SuppressWarnings("unchecked") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index bb73dbf29b649..8aec27e39478a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -117,7 +117,7 @@ class BasicOperationsSuite extends TestSuiteBase { test("groupByKey") { testOperation( Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ), - (s: DStream[String]) => s.map(x => (x, 1)).groupByKey(), + (s: DStream[String]) => s.map(x => (x, 1)).groupByKey().mapValues(_.toSeq), Seq( Seq(("a", Seq(1, 1)), ("b", Seq(1))), Seq(("", Seq(1, 1))), Seq() ), true ) @@ -251,7 +251,7 @@ class BasicOperationsSuite extends TestSuiteBase { Seq( ) ) val operation = (s1: DStream[String], s2: DStream[String]) => { - s1.map(x => (x,1)).cogroup(s2.map(x => (x, "x"))) + s1.map(x => (x,1)).cogroup(s2.map(x => (x, "x"))).mapValues(x => (x._1.toSeq, x._2.toSeq)) } testOperation(inputData1, inputData2, operation, outputData, true) }