From 20d80a38349d3fad0800763cb90d07b224927ad4 Mon Sep 17 00:00:00 2001 From: Larry Xiao Date: Tue, 12 Aug 2014 16:35:00 +0800 Subject: [PATCH 01/11] [SPARK-2062][GraphX] VertexRDD.apply does not use the mergeFunc create verticesDeduplicate with reduceByKey, using mergeFunc then proceed with verticesDedup --- .../src/main/scala/org/apache/spark/graphx/VertexRDD.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 4825d12fc27b3..61496ddfb0346 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -410,9 +410,10 @@ object VertexRDD { def apply[VD: ClassTag]( vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD, mergeFunc: (VD, VD) => VD ): VertexRDD[VD] = { - val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match { - case Some(p) => vertices - case None => vertices.copartitionWithVertices(new HashPartitioner(vertices.partitions.size)) + val verticesDedup = vertices.reduceByKey((VD1, VD2) => mergeFunc(VD1, VD2)) + val vPartitioned: RDD[(VertexId, VD)] = verticesDedup.partitioner match { + case Some(p) => verticesDedup + case None => verticesDedup.copartitionWithVertices(new HashPartitioner(verticesDedup.partitions.size)) } val routingTables = createRoutingTables(edges, vPartitioned.partitioner.get) val vertexPartitions = vPartitioned.zipPartitions(routingTables, preservesPartitioning = true) { From 581e9eef94cd906094f222d774fb61f5041fce84 Mon Sep 17 00:00:00 2001 From: Larry Xiao Date: Tue, 12 Aug 2014 16:37:59 +0800 Subject: [PATCH 02/11] TODO: VertexRDDSuite --- .../org/apache/spark/graphx/VertexRDDSuite.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala index cc86bafd2d644..39e62ea8edd9f 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala @@ -29,6 +29,10 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext { VertexRDD(sc.parallelize((0 to n).map(x => (x.toLong, x)), 5)) } + def verticesDup(sc: SparkContext, n: Int) = { + VertexRDD(sc.parallelize((-n to n).map(x => (math.abs(x.toLong), x)), 5) + } + test("filter") { withSpark { sc => val n = 100 @@ -99,4 +103,14 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext { } } + // TODO: + // need edges in apply function + test("apply.mergeFunc") { + withSpark { sc => + val n = 100 + val verts = vertices(sc, n) + val evens = verts.filter(q => ((q._2 % 2) == 0)) + assert(evens.count === (0 to n).filter(_ % 2 == 0).size) + } + } } From 52dc7f717d40b4836ac9a3efa31c1e9642cecc99 Mon Sep 17 00:00:00 2001 From: Larry Xiao Date: Tue, 19 Aug 2014 09:23:09 +0800 Subject: [PATCH 03/11] pass mergeFunc to VertexPartitionBase, where merge is handled --- .../src/main/scala/org/apache/spark/graphx/VertexRDD.scala | 7 +++---- .../spark/graphx/impl/ShippableVertexPartition.scala | 7 ++++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 61496ddfb0346..4825d12fc27b3 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -410,10 +410,9 @@ object VertexRDD { def apply[VD: ClassTag]( vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD, mergeFunc: (VD, VD) => VD ): VertexRDD[VD] = { - val verticesDedup = vertices.reduceByKey((VD1, VD2) => mergeFunc(VD1, VD2)) - val vPartitioned: RDD[(VertexId, VD)] = verticesDedup.partitioner match { - case Some(p) => verticesDedup - case None => verticesDedup.copartitionWithVertices(new HashPartitioner(verticesDedup.partitions.size)) + val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match { + case Some(p) => vertices + case None => vertices.copartitionWithVertices(new HashPartitioner(vertices.partitions.size)) } val routingTables = createRoutingTables(edges, vPartitioned.partitioner.get) val vertexPartitions = vPartitioned.zipPartitions(routingTables, preservesPartitioning = true) { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala index dca54b8a7da86..d638d578ee300 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala @@ -40,13 +40,14 @@ object ShippableVertexPartition { /** * Construct a `ShippableVertexPartition` from the given vertices with the specified routing - * table, filling in missing vertices mentioned in the routing table using `defaultVal`. + * table, filling in missing vertices mentioned in the routing table using `defaultVal`, + * and merging duplicate vertex atrribute with mergeFunc. */ def apply[VD: ClassTag]( - iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD) + iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD, mergeFunc: (VD, VD) => VD) : ShippableVertexPartition[VD] = { val fullIter = iter ++ routingTable.iterator.map(vid => (vid, defaultVal)) - val (index, values, mask) = VertexPartitionBase.initFrom(fullIter, (a: VD, b: VD) => a) + val (index, values, mask) = VertexPartitionBase.initFrom(fullIter, mergeFunc) new ShippableVertexPartition(index, values, mask, routingTable) } From efae765bfb67ee5eb34ced0d0e72d7005af70e72 Mon Sep 17 00:00:00 2001 From: Larry Xiao Date: Tue, 19 Aug 2014 10:39:53 +0800 Subject: [PATCH 04/11] fix mistakes: should be able to call with or without mergeFunc --- .../scala/org/apache/spark/graphx/VertexRDD.scala | 2 +- .../spark/graphx/impl/ShippableVertexPartition.scala | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 4825d12fc27b3..cc20fd5e085b2 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -419,7 +419,7 @@ object VertexRDD { (vertexIter, routingTableIter) => val routingTable = if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty - Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal)) + Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal, mergeFunc)) } new VertexRDD(vertexPartitions) } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala index d638d578ee300..16e82b64c45bc 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala @@ -36,7 +36,16 @@ private[graphx] object ShippableVertexPartition { /** Construct a `ShippableVertexPartition` from the given vertices without any routing table. */ def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): ShippableVertexPartition[VD] = - apply(iter, RoutingTablePartition.empty, null.asInstanceOf[VD]) + apply(iter, RoutingTablePartition.empty, null.asInstanceOf[VD], (a, b) => a) + + /** + * Construct a `ShippableVertexPartition` from the given vertices with the specified routing + * table, filling in missing vertices mentioned in the routing table using `defaultVal`. + */ + def apply[VD: ClassTag]( + iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD) + : ShippableVertexPartition[VD] = + apply(iter, routingTable, defaultVal, (a, b) => a) /** * Construct a `ShippableVertexPartition` from the given vertices with the specified routing From 4fbc29c98a2a62863102c1c2088937474baf2a65 Mon Sep 17 00:00:00 2001 From: Blie Arkansol Date: Tue, 19 Aug 2014 10:52:46 +0800 Subject: [PATCH 05/11] undo unnecessary change --- .../org/apache/spark/graphx/VertexRDDSuite.scala | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala index 39e62ea8edd9f..cc86bafd2d644 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala @@ -29,10 +29,6 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext { VertexRDD(sc.parallelize((0 to n).map(x => (x.toLong, x)), 5)) } - def verticesDup(sc: SparkContext, n: Int) = { - VertexRDD(sc.parallelize((-n to n).map(x => (math.abs(x.toLong), x)), 5) - } - test("filter") { withSpark { sc => val n = 100 @@ -103,14 +99,4 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext { } } - // TODO: - // need edges in apply function - test("apply.mergeFunc") { - withSpark { sc => - val n = 100 - val verts = vertices(sc, n) - val evens = verts.filter(q => ((q._2 % 2) == 0)) - assert(evens.count === (0 to n).filter(_ % 2 == 0).size) - } - } } From 6a35ea80b45e3d1c507fc1d82b0b44c4adef87f5 Mon Sep 17 00:00:00 2001 From: Larry Xiao Date: Mon, 25 Aug 2014 10:27:31 +0800 Subject: [PATCH 06/11] [TEST] VertexRDD.apply mergeFunc --- .../apache/spark/graphx/VertexRDDSuite.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala index cc86bafd2d644..f8fc7ace09773 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala @@ -99,4 +99,21 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext { } } + test("mergeFunc") { + // test to see if the mergeFunc is working correctly + withSpark { sc => + // VertexRDD default constructor: Duplicate entries are removed arbitrarily. + // val verts = VertexRDD(sc.parallelize(List((0L, 1), (0L, 2), (1L, 3), (1L, 3), (1L, 3)))) + // ensure constructor preserve duplicate vertex + // assert(verts.collect.toSet == Set((0L, 1), (0L, 2), (1L, 3), (1L, 3), (1L, 3))) + // won't pass + + val verts = sc.parallelize(List((0L, 1), (0L, 2), (1L, 3), (1L, 3), (1L, 3))) + val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]])) + val rdd = VertexRDD(verts, edges, 0, (a: Int, b: Int) => a + b) + // test merge function + assert(rdd.collect.toSet == Set((0L, 3), (1L, 9))) + } + } + } From e4ca697a4a5b4a6cad5cf6dfd19dd793fb9d41c5 Mon Sep 17 00:00:00 2001 From: Larry Xiao Date: Mon, 25 Aug 2014 10:27:31 +0800 Subject: [PATCH 07/11] [TEST] VertexRDD.apply mergeFunc --- .../scala/org/apache/spark/graphx/VertexRDDSuite.scala | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala index f8fc7ace09773..42d3f21dbae98 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala @@ -102,17 +102,11 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext { test("mergeFunc") { // test to see if the mergeFunc is working correctly withSpark { sc => - // VertexRDD default constructor: Duplicate entries are removed arbitrarily. - // val verts = VertexRDD(sc.parallelize(List((0L, 1), (0L, 2), (1L, 3), (1L, 3), (1L, 3)))) - // ensure constructor preserve duplicate vertex - // assert(verts.collect.toSet == Set((0L, 1), (0L, 2), (1L, 3), (1L, 3), (1L, 3))) - // won't pass - - val verts = sc.parallelize(List((0L, 1), (0L, 2), (1L, 3), (1L, 3), (1L, 3))) + val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 3), (2L, 3))) val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]])) val rdd = VertexRDD(verts, edges, 0, (a: Int, b: Int) => a + b) // test merge function - assert(rdd.collect.toSet == Set((0L, 3), (1L, 9))) + assert(rdd.collect.toSet == Set((0L, 0), (1L, 3), (2L, 9))) } } From 1c70366d2fdb8354cbb58eb465b137be1ccb562f Mon Sep 17 00:00:00 2001 From: Larry Xiao Date: Wed, 3 Sep 2014 10:03:21 +0800 Subject: [PATCH 08/11] scalastyle check: wrap line, parameter list indent 4 spaces --- .../apache/spark/graphx/impl/ShippableVertexPartition.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala index 16e82b64c45bc..f0834c317fcdb 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala @@ -43,7 +43,7 @@ object ShippableVertexPartition { * table, filling in missing vertices mentioned in the routing table using `defaultVal`. */ def apply[VD: ClassTag]( - iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD) + iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD) : ShippableVertexPartition[VD] = apply(iter, routingTable, defaultVal, (a, b) => a) @@ -53,7 +53,9 @@ object ShippableVertexPartition { * and merging duplicate vertex atrribute with mergeFunc. */ def apply[VD: ClassTag]( - iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD, mergeFunc: (VD, VD) => VD) + iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD, + mergeFunc: (VD, VD) => VD + ) : ShippableVertexPartition[VD] = { val fullIter = iter ++ routingTable.iterator.map(vid => (vid, defaultVal)) val (index, values, mask) = VertexPartitionBase.initFrom(fullIter, mergeFunc) From dfdb3c91e791f66e0e3a58f32bb72c1f4855397b Mon Sep 17 00:00:00 2001 From: Larry Xiao Date: Tue, 16 Sep 2014 16:53:00 +0800 Subject: [PATCH 09/11] minor fix a copy of vertices with defaultVal is created before, and it's b in (a, b) => b see in VertexPartition.scala val fullIter = iter ++ routingTable.iterator.map(vid => (vid, defaultVal)) --- graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index cc20fd5e085b2..210217df55ec2 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -392,7 +392,7 @@ object VertexRDD { */ def apply[VD: ClassTag]( vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD): VertexRDD[VD] = { - VertexRDD(vertices, edges, defaultVal, (a, b) => b) + VertexRDD(vertices, edges, defaultVal, (a, b) => a) } /** From 614059fd711a5cd0351c133fc98a288f8c712ad1 Mon Sep 17 00:00:00 2001 From: Larry Xiao Date: Wed, 17 Sep 2014 07:53:02 +0800 Subject: [PATCH 10/11] doc update: note about the default null value vertices construction --- graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 210217df55ec2..befa3b7398eb3 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -406,6 +406,8 @@ object VertexRDD { * @param edges the [[EdgeRDD]] that these vertices may be joined with * @param defaultVal the vertex attribute to use when creating missing vertices * @param mergeFunc the commutative, associative duplicate vertex attribute merge function + * note that all vertices with default value created upon construction in VertexPartition + * so it will appear as b in (a, b) pair for mergeFunc. */ def apply[VD: ClassTag]( vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD, mergeFunc: (VD, VD) => VD From 476770babc4aece60e08d9dab5bd46c739ec9e66 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Wed, 17 Sep 2014 11:19:23 -0700 Subject: [PATCH 11/11] ShippableVertexPartition.initFrom: Don't run mergeFunc on default values --- .../org/apache/spark/graphx/VertexRDD.scala | 2 -- .../impl/ShippableVertexPartition.scala | 20 ++++++++++++------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index befa3b7398eb3..210217df55ec2 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -406,8 +406,6 @@ object VertexRDD { * @param edges the [[EdgeRDD]] that these vertices may be joined with * @param defaultVal the vertex attribute to use when creating missing vertices * @param mergeFunc the commutative, associative duplicate vertex attribute merge function - * note that all vertices with default value created upon construction in VertexPartition - * so it will appear as b in (a, b) pair for mergeFunc. */ def apply[VD: ClassTag]( vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD, mergeFunc: (VD, VD) => VD diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala index f0834c317fcdb..5412d720475dc 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala @@ -44,7 +44,7 @@ object ShippableVertexPartition { */ def apply[VD: ClassTag]( iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD) - : ShippableVertexPartition[VD] = + : ShippableVertexPartition[VD] = apply(iter, routingTable, defaultVal, (a, b) => a) /** @@ -54,12 +54,18 @@ object ShippableVertexPartition { */ def apply[VD: ClassTag]( iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD, - mergeFunc: (VD, VD) => VD - ) - : ShippableVertexPartition[VD] = { - val fullIter = iter ++ routingTable.iterator.map(vid => (vid, defaultVal)) - val (index, values, mask) = VertexPartitionBase.initFrom(fullIter, mergeFunc) - new ShippableVertexPartition(index, values, mask, routingTable) + mergeFunc: (VD, VD) => VD): ShippableVertexPartition[VD] = { + val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD] + // Merge the given vertices using mergeFunc + iter.foreach { pair => + map.setMerge(pair._1, pair._2, mergeFunc) + } + // Fill in missing vertices mentioned in the routing table + routingTable.iterator.foreach { vid => + map.changeValue(vid, defaultVal, identity) + } + + new ShippableVertexPartition(map.keySet, map._values, map.keySet.getBitSet, routingTable) } import scala.language.implicitConversions