Skip to content

Commit

Permalink
This is the it compiles point of replacing Seq with Iterator and JLis…
Browse files Browse the repository at this point in the history
…t with JIterator in the groupby and cogroup signatures
  • Loading branch information
holdenk committed Apr 8, 2014
1 parent 6dc5f58 commit 687ffbc
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 102 deletions.
12 changes: 7 additions & 5 deletions bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ object Bagel extends Logging {
*/
private def comp[K: Manifest, V <: Vertex, M <: Message[K], C](
sc: SparkContext,
grouped: RDD[(K, (Seq[C], Seq[V]))],
grouped: RDD[(K, (Iterator[C], Iterator[V]))],
compute: (V, Option[C]) => (V, Array[M]),
storageLevel: StorageLevel
): (RDD[(K, (V, Array[M]))], Int, Int) = {
Expand All @@ -230,10 +230,12 @@ object Bagel extends Logging {
case (_, vs) if vs.size == 0 => None
case (c, vs) =>
val (newVert, newMsgs) =
compute(vs(0), c match {
case Seq(comb) => Some(comb)
case Seq() => None
})
compute(vs.next,
c.size match {
case 1 => Some(c.next)
case _ => None
}
)

numMsgs += newMsgs.size
if (newVert.active) {
Expand Down
36 changes: 18 additions & 18 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.api.java

import java.util.{Comparator, List => JList}
import java.util.{Comparator, List => JList, Iterator => JIterator}

import scala.collection.JavaConversions._
import scala.reflect.ClassTag
Expand Down Expand Up @@ -250,14 +250,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
*/
def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JList[V]] =
def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterator[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey(partitioner)))

/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions.
*/
def groupByKey(numPartitions: Int): JavaPairRDD[K, JList[V]] =
def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterator[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))

/**
Expand Down Expand Up @@ -367,7 +367,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with the existing partitioner/parallelism level.
*/
def groupByKey(): JavaPairRDD[K, JList[V]] =
def groupByKey(): JavaPairRDD[K, JIterator[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey()))

/**
Expand Down Expand Up @@ -462,55 +462,55 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (JList[V], JList[W])] =
: JavaPairRDD[K, (JIterator[V], JIterator[W])] =
fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner)))

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2],
partitioner: Partitioner): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
partitioner: Partitioner): JavaPairRDD[K, (JIterator[V], JIterator[W1], JIterator[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterator[V], JIterator[W])] =
fromRDD(cogroupResultToJava(rdd.cogroup(other)))

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
: JavaPairRDD[K, (JIterator[V], JIterator[W1], JIterator[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2)))

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int)
: JavaPairRDD[K, (JList[V], JList[W])] =
: JavaPairRDD[K, (JIterator[V], JIterator[W])] =
fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int)
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
: JavaPairRDD[K, (JIterator[V], JIterator[W1], JIterator[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions)))

/** Alias for cogroup. */
def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterator[V], JIterator[W])] =
fromRDD(cogroupResultToJava(rdd.groupWith(other)))

/** Alias for cogroup. */
def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
: JavaPairRDD[K, (JIterator[V], JIterator[W1], JIterator[W2])] =
fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2)))

/**
Expand Down Expand Up @@ -695,21 +695,21 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])

object JavaPairRDD {
private[spark]
def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Seq[T])]): RDD[(K, JList[T])] = {
rddToPairRDDFunctions(rdd).mapValues(seqAsJavaList)
def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Iterator[T])]): RDD[(K, JIterator[T])] = {
rddToPairRDDFunctions(rdd).mapValues(asJavaIterator)
}

private[spark]
def cogroupResultToJava[K: ClassTag, V, W](
rdd: RDD[(K, (Seq[V], Seq[W]))]): RDD[(K, (JList[V], JList[W]))] = {
rddToPairRDDFunctions(rdd).mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2)))
rdd: RDD[(K, (Iterator[V], Iterator[W]))]): RDD[(K, (JIterator[V], JIterator[W]))] = {
rddToPairRDDFunctions(rdd).mapValues(x => (asJavaIterator(x._1), asJavaIterator(x._2)))
}

private[spark]
def cogroupResult2ToJava[K: ClassTag, V, W1, W2](
rdd: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))]): RDD[(K, (JList[V], JList[W1], JList[W2]))] = {
rdd: RDD[(K, (Iterator[V], Iterator[W1], Iterator[W2]))]): RDD[(K, (JIterator[V], JIterator[W1], JIterator[W2]))] = {
rddToPairRDDFunctions(rdd)
.mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2), seqAsJavaList(x._3)))
.mapValues(x => (asJavaIterator(x._1), asJavaIterator(x._2), asJavaIterator(x._3)))
}

def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = {
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JIterator[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag)))
Expand All @@ -214,7 +214,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = {
def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JIterator[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K])))
Expand Down
38 changes: 19 additions & 19 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
*/
def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
def groupByKey(partitioner: Partitioner): RDD[(K, Iterator[V])] = {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
Expand All @@ -270,14 +270,14 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2
val bufs = combineByKey[ArrayBuffer[V]](
createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false)
bufs.asInstanceOf[RDD[(K, Seq[V])]]
bufs.asInstanceOf[RDD[(K, Iterator[V])]]
}

/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions.
*/
def groupByKey(numPartitions: Int): RDD[(K, Seq[V])] = {
def groupByKey(numPartitions: Int): RDD[(K, Iterator[V])] = {
groupByKey(new HashPartitioner(numPartitions))
}

Expand All @@ -298,7 +298,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
for (v <- vs; w <- ws) yield (v, w)
}
}

Expand All @@ -311,9 +311,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
if (ws.isEmpty) {
vs.iterator.map(v => (v, None))
vs.map(v => (v, None))
} else {
for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
for (v <- vs; w <- ws) yield (v, Some(w))
}
}
}
Expand All @@ -328,9 +328,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
: RDD[(K, (Option[V], W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
if (vs.isEmpty) {
ws.iterator.map(w => (None, w))
ws.map(w => (None, w))
} else {
for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
for (v <- vs; w <- ws) yield (Some(v), w)
}
}
}
Expand Down Expand Up @@ -358,7 +358,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with the existing partitioner/parallelism level.
*/
def groupByKey(): RDD[(K, Seq[V])] = {
def groupByKey(): RDD[(K, Iterator[V])] = {
groupByKey(defaultPartitioner(self))
}

Expand Down Expand Up @@ -453,13 +453,13 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterator[V], Iterator[W]))] = {
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Seq(vs, ws) =>
(vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
(vs.asInstanceOf[Seq[V]].iterator, ws.asInstanceOf[Seq[W]].iterator)
}
}

Expand All @@ -468,21 +468,21 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
: RDD[(K, (Iterator[V], Iterator[W1], Iterator[W2]))] = {
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
cg.mapValues { case Seq(vs, w1s, w2s) =>
(vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
(vs.asInstanceOf[Seq[V]].iterator, w1s.asInstanceOf[Seq[W1]].iterator, w2s.asInstanceOf[Seq[W2]].iterator)
}
}

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterator[V], Iterator[W]))] = {
cogroup(other, defaultPartitioner(self, other))
}

Expand All @@ -491,15 +491,15 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
: RDD[(K, (Iterator[V], Iterator[W1], Iterator[W2]))] = {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = {
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterator[V], Iterator[W]))] = {
cogroup(other, new HashPartitioner(numPartitions))
}

Expand All @@ -508,18 +508,18 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
: RDD[(K, (Iterator[V], Iterator[W1], Iterator[W2]))] = {
cogroup(other1, other2, new HashPartitioner(numPartitions))
}

/** Alias for cogroup. */
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterator[V], Iterator[W]))] = {
cogroup(other, defaultPartitioner(self, other))
}

/** Alias for cogroup. */
def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
: RDD[(K, (Iterator[V], Iterator[W1], Iterator[W2]))] = {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -438,20 +438,20 @@ abstract class RDD[T: ClassTag](
/**
* Return an RDD of grouped items.
*/
def groupBy[K: ClassTag](f: T => K): RDD[(K, Seq[T])] =
def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterator[T])] =
groupBy[K](f, defaultPartitioner(this))

/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] =
def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterator[T])] =
groupBy(f, new HashPartitioner(numPartitions))

/**
* Return an RDD of grouped items.
*/
def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = {
def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterator[T])] = {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(p)
}
Expand Down
21 changes: 13 additions & 8 deletions examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;

import java.util.List;
import java.util.ArrayList;
import java.util.List;
import java.util.Iterator;
import java.util.regex.Pattern;

/**
Expand Down Expand Up @@ -66,7 +67,7 @@ public static void main(String[] args) throws Exception {
JavaRDD<String> lines = ctx.textFile(args[1], 1);

// Loads all URLs from input file and initialize their neighbors.
JavaPairRDD<String, List<String>> links = lines.mapToPair(new PairFunction<String, String, String>() {
JavaPairRDD<String, Iterator<String>> links = lines.mapToPair(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) {
String[] parts = SPACES.split(s);
Expand All @@ -75,9 +76,9 @@ public Tuple2<String, String> call(String s) {
}).distinct().groupByKey().cache();

// Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
JavaPairRDD<String, Double> ranks = links.mapValues(new Function<List<String>, Double>() {
JavaPairRDD<String, Double> ranks = links.mapValues(new Function<Iterator<String>, Double>() {
@Override
public Double call(List<String> rs) {
public Double call(Iterator<String> rs) {
return 1.0;
}
});
Expand All @@ -86,12 +87,16 @@ public Double call(List<String> rs) {
for (int current = 0; current < Integer.parseInt(args[2]); current++) {
// Calculates URL contributions to the rank of other URLs.
JavaPairRDD<String, Double> contribs = links.join(ranks).values()
.flatMapToPair(new PairFlatMapFunction<Tuple2<List<String>, Double>, String, Double>() {
.flatMapToPair(new PairFlatMapFunction<Tuple2<Iterator<String>, Double>, String, Double>() {
@Override
public Iterable<Tuple2<String, Double>> call(Tuple2<List<String>, Double> s) {
public Iterable<Tuple2<String, Double>> call(Tuple2<Iterator<String>, Double> s) {
List<String> urls = new ArrayList<String>();
while (s._1.hasNext()) {
urls.add(s._1.next());
}
List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
for (String n : s._1()) {
results.add(new Tuple2<String, Double>(n, s._2() / s._1().size()));
for (String n : urls) {
results.add(new Tuple2<String, Double>(n, s._2() / urls.size()));
}
return results;
}
Expand Down
Loading

0 comments on commit 687ffbc

Please sign in to comment.