From 20d80a38349d3fad0800763cb90d07b224927ad4 Mon Sep 17 00:00:00 2001 From: Larry Xiao Date: Tue, 12 Aug 2014 16:35:00 +0800 Subject: [PATCH] [SPARK-2062][GraphX] VertexRDD.apply does not use the mergeFunc create verticesDeduplicate with reduceByKey, using mergeFunc then proceed with verticesDedup --- .../src/main/scala/org/apache/spark/graphx/VertexRDD.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 4825d12fc27b3..61496ddfb0346 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -410,9 +410,10 @@ object VertexRDD { def apply[VD: ClassTag]( vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD, mergeFunc: (VD, VD) => VD ): VertexRDD[VD] = { - val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match { - case Some(p) => vertices - case None => vertices.copartitionWithVertices(new HashPartitioner(vertices.partitions.size)) + val verticesDedup = vertices.reduceByKey((VD1, VD2) => mergeFunc(VD1, VD2)) + val vPartitioned: RDD[(VertexId, VD)] = verticesDedup.partitioner match { + case Some(p) => verticesDedup + case None => verticesDedup.copartitionWithVertices(new HashPartitioner(verticesDedup.partitions.size)) } val routingTables = createRoutingTables(edges, vPartitioned.partitioner.get) val vertexPartitions = vPartitioned.zipPartitions(routingTables, preservesPartitioning = true) {