Skip to content

Commit

Permalink
Rebuild routing table after Graph.reverse
Browse files Browse the repository at this point in the history
GraphImpl.reverse used to reverse edges in each partition of the edge
RDD but preserve the routing table and replicated vertex view, since
reversing should not affect partitioning.

However, the old routing table would then have incorrect information for
srcAttrOnly and dstAttrOnly. These RDDs should be switched.

A simple fix is for Graph.reverse to rebuild the routing table and
replicated vertex view.

Thanks to Bogdan Ghidireac for reporting this issue on the mailing list.
  • Loading branch information
ankurdave committed Apr 16, 2014
1 parent 725925c commit 75d63cb
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (

override def reverse: Graph[VD, ED] = {
val newETable = edges.mapEdgePartitions((pid, part) => part.reverse)
new GraphImpl(vertices, newETable, routingTable, replicatedVertexView)
GraphImpl(vertices, newETable)
}

override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): Graph[VD2, ED] = {
Expand Down
10 changes: 10 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,16 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}

test("reverse with join elimination") {
withSpark { sc =>
val vertices: RDD[(VertexId, Int)] = sc.parallelize(Array((1L, 1), (2L, 2)))
val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0)))
val graph = Graph(vertices, edges).reverse
val result = graph.mapReduceTriplets[Int](et => Iterator((et.dstId, et.srcAttr)), _ + _)
assert(result.collect.toSet === Set((1L, 2)))
}
}

test("subgraph") {
withSpark { sc =>
// Create a star graph of 10 veritces.
Expand Down

0 comments on commit 75d63cb

Please sign in to comment.