Skip to content

Commit

Permalink
org.apache.spark.rdd.PairRDDFunctionsSuite passes
Browse files Browse the repository at this point in the history
  • Loading branch information
holdenk committed Apr 8, 2014
1 parent 6698186 commit 249abde
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
for (v <- vs; w <- ws) yield (v, w)
val wlist = ws.toList
for (v <- vs; w <- wlist.iterator) yield (v, w)
}
}

Expand All @@ -313,7 +314,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
if (ws.isEmpty) {
vs.map(v => (v, None))
} else {
for (v <- vs; w <- ws) yield (v, Some(w))
val wlist = ws.toList
for (v <- vs; w <- wlist.iterator) yield (v, Some(w))
}
}
}
Expand All @@ -330,7 +332,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
if (vs.isEmpty) {
ws.map(w => (None, w))
} else {
for (v <- vs; w <- ws) yield (Some(v), w)
val wlist = ws.toList
for (v <- vs; w <- wlist) yield (Some(v), w)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,12 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.groupWith(rdd2).collect()
assert(joined.size === 4)
assert(joined.toSet === Set(
(1, (ArrayBuffer(1, 2), ArrayBuffer('x'))),
(2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))),
(3, (ArrayBuffer(1), ArrayBuffer())),
(4, (ArrayBuffer(), ArrayBuffer('w')))
val joinedSet = joined.map(x => (x._1, (x._2._1.toList, x._2._2.toList))).toSet
assert(joinedSet === Set(
(1, (List(1, 2), List('x'))),
(2, (List(1), List('y', 'z'))),
(3, (List(1), List())),
(4, (List(), List('w')))
))
}

Expand Down Expand Up @@ -447,4 +448,3 @@ class ConfigTestFormat() extends FakeFormat() with Configurable {
super.getRecordWriter(p1)
}
}

0 comments on commit 249abde

Please sign in to comment.