Skip to content

Commit

Permalink
Code-style changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Yves Raimond committed Nov 24, 2015
1 parent 09d31c8 commit 8506353
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 18 deletions.
29 changes: 14 additions & 15 deletions graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,14 @@ object PageRank extends Logging {
*/
def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
numIter: Int, resetProb: Double = 0.15,
sources : Array[VertexId]): Graph[BSV[Double], Double] =
{
sources: Array[VertexId]): Graph[BSV[Double], Double] = {
// TODO if one sources vertex id is outside of the int range
// we won't be able to store its activations in a sparse vector
val zero = new BSV[Double](Array(), Array(), sources.size)
val sourcesInitMap = sources.zipWithIndex.map{case (vid, i) => {
val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) =>
val v = new BSV[Double](Array(i), Array(resetProb), sources.size)
(vid, v)
}}.toMap
}.toMap
val sc = graph.vertices.sparkContext
val sourcesInitMapBC = sc.broadcast(sourcesInitMap)
// Initialize the PageRank graph with each edge attribute having
Expand All @@ -196,14 +195,14 @@ object PageRank extends Logging {
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src )
.mapVertices( (vid, attr) => {
if (sourcesInitMapBC.value contains vid) {
sourcesInitMapBC.value(vid)
} else {
zero
.mapTriplets(e => 1.0 / e.srcAttr, TripletFields.Src)
.mapVertices { (vid, attr) =>
if (sourcesInitMapBC.value contains vid) {
sourcesInitMapBC.value(vid)
} else {
zero
}
}
})

var i = 0
while (i < numIter) {
Expand All @@ -212,18 +211,18 @@ object PageRank extends Logging {
// and adding start nodes back in with activation resetProb
val rankUpdates = rankGraph.aggregateMessages[BSV[Double]](
ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr),
(a : BSV[Double], b : BSV[Double]) => a :+ b, TripletFields.Src)
(a: BSV[Double], b: BSV[Double]) => a :+ b, TripletFields.Src)

rankGraph = rankGraph.joinVertices(rankUpdates) {
(vid, oldRank, msgSum) => {
val popActivations : BSV[Double] = msgSum :* (1.0 - resetProb)
(vid, oldRank, msgSum) =>
val popActivations: BSV[Double] = msgSum :* (1.0 - resetProb)
val resetActivations = if (sourcesInitMapBC.value contains vid) {
sourcesInitMapBC.value(vid)
} else {
zero
}
popActivations :+ resetActivations
}}.cache()
}.cache()

rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
prevRankGraph.vertices.unpersist(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
assert(compareRanks(staticRanks1, parallelStaticRanks1) < errorTol)

val parallelStaticRanks2 = starGraph
.staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices{
.staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices {
case (vertexId, vector) => vector(0)
}.vertices.cache()
assert(compareRanks(staticRanks2, parallelStaticRanks2) < errorTol)
Expand All @@ -135,7 +135,7 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
.vertices.cache()
val otherDynamicRanks = starGraph.personalizedPageRank(1, 0, resetProb).vertices.cache()
val otherParallelStaticRanks2 = starGraph
.staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices{
.staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices {
case (vertexId, vector) => vector(1)
}.vertices.cache()
assert(compareRanks(otherDynamicRanks, otherStaticRanks2) < errorTol)
Expand Down Expand Up @@ -197,7 +197,7 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
assert(compareRanks(staticRanks, dynamicRanks) < errorTol)

val parallelStaticRanks = chain
.staticParallelPersonalizedPageRank(Array(4), numIter, resetProb).mapVertices{
.staticParallelPersonalizedPageRank(Array(4), numIter, resetProb).mapVertices {
case (vertexId, vector) => vector(0)
}.vertices.cache()
assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol)
Expand Down

0 comments on commit 8506353

Please sign in to comment.