Skip to content

Commit

Permalink
[SPARK-2062][GraphX] VertexRDD.apply does not use the mergeFunc
Browse files Browse the repository at this point in the history
VertexRDD.apply had a bug where it ignored the merge function for
duplicate vertices and instead used whichever vertex attribute occurred
first. This commit fixes the bug by passing the merge function through
to ShippableVertexPartition.apply, which merges any duplicates using the
merge function and then fills in missing vertices using the specified
default vertex attribute. This commit also adds a unit test for
VertexRDD.apply.

Author: Larry Xiao <[email protected]>
Author: Blie Arkansol <[email protected]>
Author: Ankur Dave <[email protected]>

Closes apache#1903 from larryxiao/2062 and squashes the following commits:

625aa9d [Blie Arkansol] Merge pull request #1 from ankurdave/SPARK-2062
476770b [Ankur Dave] ShippableVertexPartition.initFrom: Don't run mergeFunc on default values
614059f [Larry Xiao] doc update: note about the default null value vertices construction
dfdb3c9 [Larry Xiao] minor fix
1c70366 [Larry Xiao] scalastyle check: wrap line, parameter list indent 4 spaces
e4ca697 [Larry Xiao] [TEST] VertexRDD.apply mergeFunc
6a35ea8 [Larry Xiao] [TEST] VertexRDD.apply mergeFunc
4fbc29c [Blie Arkansol] undo unnecessary change
efae765 [Larry Xiao] fix mistakes: should be able to call with or without mergeFunc
b2422f9 [Larry Xiao] Merge branch '2062' of github.com:larryxiao/spark into 2062
52dc7f7 [Larry Xiao] pass mergeFunc to VertexPartitionBase, where merge is handled
581e9ee [Larry Xiao] TODO: VertexRDDSuite
20d80a3 [Larry Xiao] [SPARK-2062][GraphX] VertexRDD.apply does not use the mergeFunc
  • Loading branch information
larryxiao authored and ankurdave committed Sep 19, 2014
1 parent e76ef5c commit 3bbbdd8
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 7 deletions.
4 changes: 2 additions & 2 deletions graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ object VertexRDD {
*/
def apply[VD: ClassTag](
vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD): VertexRDD[VD] = {
VertexRDD(vertices, edges, defaultVal, (a, b) => b)
VertexRDD(vertices, edges, defaultVal, (a, b) => a)
}

/**
Expand All @@ -419,7 +419,7 @@ object VertexRDD {
(vertexIter, routingTableIter) =>
val routingTable =
if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal))
Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal, mergeFunc))
}
new VertexRDD(vertexPartitions)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,36 @@ private[graphx]
object ShippableVertexPartition {
/** Construct a `ShippableVertexPartition` from the given vertices without any routing table. */
def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): ShippableVertexPartition[VD] =
apply(iter, RoutingTablePartition.empty, null.asInstanceOf[VD])
apply(iter, RoutingTablePartition.empty, null.asInstanceOf[VD], (a, b) => a)

/**
* Construct a `ShippableVertexPartition` from the given vertices with the specified routing
* table, filling in missing vertices mentioned in the routing table using `defaultVal`.
*/
def apply[VD: ClassTag](
iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD)
: ShippableVertexPartition[VD] = {
val fullIter = iter ++ routingTable.iterator.map(vid => (vid, defaultVal))
val (index, values, mask) = VertexPartitionBase.initFrom(fullIter, (a: VD, b: VD) => a)
new ShippableVertexPartition(index, values, mask, routingTable)
: ShippableVertexPartition[VD] =
apply(iter, routingTable, defaultVal, (a, b) => a)

/**
* Construct a `ShippableVertexPartition` from the given vertices with the specified routing
* table, filling in missing vertices mentioned in the routing table using `defaultVal`,
* and merging duplicate vertex atrribute with mergeFunc.
*/
def apply[VD: ClassTag](
iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD,
mergeFunc: (VD, VD) => VD): ShippableVertexPartition[VD] = {
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
// Merge the given vertices using mergeFunc
iter.foreach { pair =>
map.setMerge(pair._1, pair._2, mergeFunc)
}
// Fill in missing vertices mentioned in the routing table
routingTable.iterator.foreach { vid =>
map.changeValue(vid, defaultVal, identity)
}

new ShippableVertexPartition(map.keySet, map._values, map.keySet.getBitSet, routingTable)
}

import scala.language.implicitConversions
Expand Down
11 changes: 11 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,15 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
}
}

test("mergeFunc") {
// test to see if the mergeFunc is working correctly
withSpark { sc =>
val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 3), (2L, 3)))
val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
val rdd = VertexRDD(verts, edges, 0, (a: Int, b: Int) => a + b)
// test merge function
assert(rdd.collect.toSet == Set((0L, 0), (1L, 3), (2L, 9)))
}
}

}

0 comments on commit 3bbbdd8

Please sign in to comment.