Skip to content

Commit

Permalink
Merge pull request #1 from ankurdave/SPARK-2062
Browse files Browse the repository at this point in the history
ShippableVertexPartition.initFrom: Don't run mergeFunc on default values
  • Loading branch information
larryxiao committed Sep 18, 2014
2 parents 614059f + 476770b commit 625aa9d
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 9 deletions.
2 changes: 0 additions & 2 deletions graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -406,8 +406,6 @@ object VertexRDD {
* @param edges the [[EdgeRDD]] that these vertices may be joined with
* @param defaultVal the vertex attribute to use when creating missing vertices
* @param mergeFunc the commutative, associative duplicate vertex attribute merge function
* note that all vertices with default value created upon construction in VertexPartition
* so it will appear as b in (a, b) pair for mergeFunc.
*/
def apply[VD: ClassTag](
vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD, mergeFunc: (VD, VD) => VD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object ShippableVertexPartition {
*/
def apply[VD: ClassTag](
iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD)
: ShippableVertexPartition[VD] =
: ShippableVertexPartition[VD] =
apply(iter, routingTable, defaultVal, (a, b) => a)

/**
Expand All @@ -54,12 +54,18 @@ object ShippableVertexPartition {
*/
def apply[VD: ClassTag](
iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD,
mergeFunc: (VD, VD) => VD
)
: ShippableVertexPartition[VD] = {
val fullIter = iter ++ routingTable.iterator.map(vid => (vid, defaultVal))
val (index, values, mask) = VertexPartitionBase.initFrom(fullIter, mergeFunc)
new ShippableVertexPartition(index, values, mask, routingTable)
mergeFunc: (VD, VD) => VD): ShippableVertexPartition[VD] = {
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
// Merge the given vertices using mergeFunc
iter.foreach { pair =>
map.setMerge(pair._1, pair._2, mergeFunc)
}
// Fill in missing vertices mentioned in the routing table
routingTable.iterator.foreach { vid =>
map.changeValue(vid, defaultVal, identity)
}

new ShippableVertexPartition(map.keySet, map._values, map.keySet.getBitSet, routingTable)
}

import scala.language.implicitConversions
Expand Down

0 comments on commit 625aa9d

Please sign in to comment.