From f41975efdbe1794fe456babb7f79d0290ecfc70d Mon Sep 17 00:00:00 2001 From: Yves Raimond Date: Fri, 30 Oct 2015 16:05:20 -0700 Subject: [PATCH 01/20] Parallel personalized pagerank implementation --- graphx/pom.xml | 21 +++++ .../org/apache/spark/graphx/GraphOps.scala | 12 +++ .../apache/spark/graphx/lib/PageRank.scala | 79 +++++++++++++++++++ .../spark/graphx/lib/PageRankSuite.scala | 20 +++++ 4 files changed, 132 insertions(+) diff --git a/graphx/pom.xml b/graphx/pom.xml index 987b831021a54..cfc259d96cb7e 100644 --- a/graphx/pom.xml +++ b/graphx/pom.xml @@ -70,6 +70,27 @@ org.apache.spark spark-test-tags_${scala.binary.version} + + org.scalanlp + breeze_${scala.binary.version} + 0.11.2 + + + + junit + junit + + + org.apache.commons + commons-math3 + + + + + org.apache.commons + commons-math3 + target/scala-${scala.binary.version}/classes diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 9451ff1e5c0e2..318e5d47bbf56 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -26,6 +26,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.graphx.lib._ +import breeze.linalg.SparseVector + /** * Contains additional functionality for [[Graph]]. All operations are expressed in terms of the * efficient GraphX API. This class is implicitly constructed for each Graph object. @@ -384,6 +386,16 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali PageRank.runUntilConvergenceWithOptions(graph, tol, resetProb, Some(src)) } + /** + * Run parallel personalized PageRank for a given array of source vertices, such + * that all random walks are started relative to the source vertices + */ + def staticParallelPersonalizedPageRank(sources : Array[VertexId], numIter: Int, + resetProb: Double = 0.15) : Graph[SparseVector[Double], Double] = { + PageRank.runParallelPersonalizedPageRank(graph, numIter, resetProb, sources) + } + + /** * Run Personalized PageRank for a fixed number of iterations with * with all iterations originating at the source node diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 52b237fc15093..ecf3f55d925d9 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -23,6 +23,8 @@ import scala.language.postfixOps import org.apache.spark.Logging import org.apache.spark.graphx._ +import breeze.linalg.SparseVector + /** * PageRank algorithm implementation. There are two implementations of PageRank implemented. * @@ -158,6 +160,83 @@ object PageRank extends Logging { rankGraph } + /** + * Run Personalized PageRank for a fixed number of iterations, for a + * set of starting nodes in parallel. Returns a graph with vertex attributes + * containing the pagerank relative to all starting nodes (as a sparse vector) and + * edge attributes the normalized edge weight + * + * @tparam VD The original vertex attribute (not used) + * @tparam ED The original edge attribute (not used) + * + * @param graph The graph on which to compute personalized pagerank + * @param numIter The number of iterations to run + * @param resetProb The random reset probability + * @param sources The list of sources to compute personalized pagerank from + * @return the graph with vertex attributes + * containing the pagerank relative to all starting nodes (as a sparse vector) and + * edge attributes the normalized edge weight + */ + def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], + numIter: Int, resetProb: Double = 0.15, + sources : Array[VertexId]): Graph[SparseVector[Double], Double] = + { + // TODO if one sources vertex id is outside of the int range + // we won't be able to store its activations in a sparse vector + val zero = new SparseVector[Double](Array(), Array(), sources.size) + val sourcesInitMap = sources.zipWithIndex.map{case (vid, i) => { + val v = new SparseVector[Double](Array(i), Array(resetProb), sources.size) + (vid, v) + }}.toMap + val sc = graph.vertices.sparkContext + val sourcesInitMapBC = sc.broadcast(sourcesInitMap) + // Initialize the PageRank graph with each edge attribute having + // weight 1/outDegree and each source vertex with attribute 1.0. + var rankGraph = graph + // Associate the degree with each vertex + .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) } + // Set the weight on the edges based on the degree + .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src ) + .mapVertices( (vid, attr) => { + if (sourcesInitMapBC.value contains vid) { + sourcesInitMapBC.value(vid) + } else { + zero + } + }) + + var i = 0 + while (i < numIter) { + val prevRankGraph = rankGraph + // Propagates the message along outbound edges + // and adding start nodes back in with activation resetProb + val rankUpdates = rankGraph.aggregateMessages[SparseVector[Double]]( + ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr), + (a : SparseVector[Double], b : SparseVector[Double]) => a :+ b, TripletFields.Src) + + rankGraph = rankGraph.joinVertices(rankUpdates) { + (vid, oldRank, msgSum) => { + val popActivations : SparseVector[Double] = msgSum :* (1.0 - resetProb) + val resetActivations = if (sourcesInitMapBC.value contains vid) { + sourcesInitMapBC.value(vid) + } else { + zero + } + popActivations :+ resetActivations + }}.cache() + + rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices + prevRankGraph.vertices.unpersist(false) + prevRankGraph.edges.unpersist(false) + + logInfo(s"Parallel Personalized PageRank finished iteration $i.") + + i += 1 + } + + rankGraph + } + /** * Run a dynamic version of PageRank returning a graph with vertex attributes containing the * PageRank and edge attributes containing the normalized edge weight. diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index bdff31446f8ee..588db51474db9 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -118,11 +118,26 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { val dynamicRanks = starGraph.personalizedPageRank(0, 0, resetProb).vertices.cache() assert(compareRanks(staticRanks2, dynamicRanks) < errorTol) + val parallelStaticRanks1 = starGraph.staticParallelPersonalizedPageRank(Array(0), 1, resetProb).mapVertices{ + case (vertexId, vector) => vector(0) + }.vertices.cache() + assert(compareRanks(staticRanks1, parallelStaticRanks1) < errorTol) + + val parallelStaticRanks2 = starGraph.staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices{ + case (vertexId, vector) => vector(0) + }.vertices.cache() + assert(compareRanks(staticRanks2, parallelStaticRanks2) < errorTol) + // We have one outbound edge from 1 to 0 val otherStaticRanks2 = starGraph.staticPersonalizedPageRank(1, numIter = 2, resetProb) .vertices.cache() val otherDynamicRanks = starGraph.personalizedPageRank(1, 0, resetProb).vertices.cache() + val otherParallelStaticRanks2 = starGraph.staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices{ + case (vertexId, vector) => vector(1) + }.vertices.cache() assert(compareRanks(otherDynamicRanks, otherStaticRanks2) < errorTol) + assert(compareRanks(otherStaticRanks2, otherParallelStaticRanks2) < errorTol) + assert(compareRanks(otherDynamicRanks, otherParallelStaticRanks2) < errorTol) } } // end of test Star PersonalPageRank @@ -177,6 +192,11 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { val dynamicRanks = chain.personalizedPageRank(4, tol, resetProb).vertices assert(compareRanks(staticRanks, dynamicRanks) < errorTol) + + val parallelStaticRanks = chain.staticParallelPersonalizedPageRank(Array(4), numIter, resetProb).mapVertices{ + case (vertexId, vector) => vector(0) + }.vertices.cache() + assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol) } } } From 3605e40e8e61c853c89637918c82c7e0bcec69e8 Mon Sep 17 00:00:00 2001 From: Yves Raimond Date: Wed, 4 Nov 2015 09:39:05 -0800 Subject: [PATCH 02/20] Scala style tweaks --- .../spark/graphx/lib/PageRankSuite.scala | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index 588db51474db9..5306891ec4e9d 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -118,23 +118,26 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { val dynamicRanks = starGraph.personalizedPageRank(0, 0, resetProb).vertices.cache() assert(compareRanks(staticRanks2, dynamicRanks) < errorTol) - val parallelStaticRanks1 = starGraph.staticParallelPersonalizedPageRank(Array(0), 1, resetProb).mapVertices{ - case (vertexId, vector) => vector(0) - }.vertices.cache() + val parallelStaticRanks1 = starGraph + .staticParallelPersonalizedPageRank(Array(0), 1, resetProb).mapVertices{ + case (vertexId, vector) => vector(0) + }.vertices.cache() assert(compareRanks(staticRanks1, parallelStaticRanks1) < errorTol) - val parallelStaticRanks2 = starGraph.staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices{ - case (vertexId, vector) => vector(0) - }.vertices.cache() + val parallelStaticRanks2 = starGraph + .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices{ + case (vertexId, vector) => vector(0) + }.vertices.cache() assert(compareRanks(staticRanks2, parallelStaticRanks2) < errorTol) // We have one outbound edge from 1 to 0 val otherStaticRanks2 = starGraph.staticPersonalizedPageRank(1, numIter = 2, resetProb) .vertices.cache() val otherDynamicRanks = starGraph.personalizedPageRank(1, 0, resetProb).vertices.cache() - val otherParallelStaticRanks2 = starGraph.staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices{ - case (vertexId, vector) => vector(1) - }.vertices.cache() + val otherParallelStaticRanks2 = starGraph + .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices{ + case (vertexId, vector) => vector(1) + }.vertices.cache() assert(compareRanks(otherDynamicRanks, otherStaticRanks2) < errorTol) assert(compareRanks(otherStaticRanks2, otherParallelStaticRanks2) < errorTol) assert(compareRanks(otherDynamicRanks, otherParallelStaticRanks2) < errorTol) @@ -193,9 +196,10 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { assert(compareRanks(staticRanks, dynamicRanks) < errorTol) - val parallelStaticRanks = chain.staticParallelPersonalizedPageRank(Array(4), numIter, resetProb).mapVertices{ - case (vertexId, vector) => vector(0) - }.vertices.cache() + val parallelStaticRanks = chain + .staticParallelPersonalizedPageRank(Array(4), numIter, resetProb).mapVertices{ + case (vertexId, vector) => vector(0) + }.vertices.cache() assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol) } } From 8b34e5ce0fd25423349c4ded45859ef3a02153cf Mon Sep 17 00:00:00 2001 From: Yves Raimond Date: Mon, 23 Nov 2015 17:26:43 -0800 Subject: [PATCH 03/20] Removing breeze dependency from mllib (available through graphx) --- mllib/pom.xml | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/mllib/pom.xml b/mllib/pom.xml index 70139121d8c78..712603ede5a19 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -68,23 +68,6 @@ ${jblas.version} test - - org.scalanlp - breeze_${scala.binary.version} - 0.11.2 - - - - junit - junit - - - org.apache.commons - commons-math3 - - - org.apache.commons commons-math3 From 508ba451eb636365388fc5bf77b531ce1d3d70d4 Mon Sep 17 00:00:00 2001 From: Yves Raimond Date: Mon, 23 Nov 2015 17:29:02 -0800 Subject: [PATCH 04/20] Renaming SparseVector to BSV --- .../scala/org/apache/spark/graphx/GraphOps.scala | 4 ++-- .../org/apache/spark/graphx/lib/PageRank.scala | 14 +++++++------- .../apache/spark/graphx/lib/PageRankSuite.scala | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 318e5d47bbf56..5e1468d62e878 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.graphx.lib._ -import breeze.linalg.SparseVector +import breeze.linalg.{SparseVector => BSV} /** * Contains additional functionality for [[Graph]]. All operations are expressed in terms of the @@ -391,7 +391,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * that all random walks are started relative to the source vertices */ def staticParallelPersonalizedPageRank(sources : Array[VertexId], numIter: Int, - resetProb: Double = 0.15) : Graph[SparseVector[Double], Double] = { + resetProb: Double = 0.15) : Graph[BSV[Double], Double] = { PageRank.runParallelPersonalizedPageRank(graph, numIter, resetProb, sources) } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index ecf3f55d925d9..339c4c6a25d0b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -23,7 +23,7 @@ import scala.language.postfixOps import org.apache.spark.Logging import org.apache.spark.graphx._ -import breeze.linalg.SparseVector +import breeze.linalg.{SparseVector => BSV} /** * PageRank algorithm implementation. There are two implementations of PageRank implemented. @@ -179,13 +179,13 @@ object PageRank extends Logging { */ def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15, - sources : Array[VertexId]): Graph[SparseVector[Double], Double] = + sources : Array[VertexId]): Graph[BSV[Double], Double] = { // TODO if one sources vertex id is outside of the int range // we won't be able to store its activations in a sparse vector - val zero = new SparseVector[Double](Array(), Array(), sources.size) + val zero = new BSV[Double](Array(), Array(), sources.size) val sourcesInitMap = sources.zipWithIndex.map{case (vid, i) => { - val v = new SparseVector[Double](Array(i), Array(resetProb), sources.size) + val v = new BSV[Double](Array(i), Array(resetProb), sources.size) (vid, v) }}.toMap val sc = graph.vertices.sparkContext @@ -210,13 +210,13 @@ object PageRank extends Logging { val prevRankGraph = rankGraph // Propagates the message along outbound edges // and adding start nodes back in with activation resetProb - val rankUpdates = rankGraph.aggregateMessages[SparseVector[Double]]( + val rankUpdates = rankGraph.aggregateMessages[BSV[Double]]( ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr), - (a : SparseVector[Double], b : SparseVector[Double]) => a :+ b, TripletFields.Src) + (a : BSV[Double], b : BSV[Double]) => a :+ b, TripletFields.Src) rankGraph = rankGraph.joinVertices(rankUpdates) { (vid, oldRank, msgSum) => { - val popActivations : SparseVector[Double] = msgSum :* (1.0 - resetProb) + val popActivations : BSV[Double] = msgSum :* (1.0 - resetProb) val resetActivations = if (sourcesInitMapBC.value contains vid) { sourcesInitMapBC.value(vid) } else { diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index 5306891ec4e9d..a0e6d3fe129b6 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -119,7 +119,7 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { assert(compareRanks(staticRanks2, dynamicRanks) < errorTol) val parallelStaticRanks1 = starGraph - .staticParallelPersonalizedPageRank(Array(0), 1, resetProb).mapVertices{ + .staticParallelPersonalizedPageRank(Array(0), 1, resetProb).mapVertices { case (vertexId, vector) => vector(0) }.vertices.cache() assert(compareRanks(staticRanks1, parallelStaticRanks1) < errorTol) From 09d31c852e8213a2923de68b6e3f2b3ec6f71166 Mon Sep 17 00:00:00 2001 From: Yves Raimond Date: Mon, 23 Nov 2015 17:30:39 -0800 Subject: [PATCH 05/20] Removing extra space, extra line --- graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 5e1468d62e878..e97b1ceb29c08 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -390,12 +390,11 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * Run parallel personalized PageRank for a given array of source vertices, such * that all random walks are started relative to the source vertices */ - def staticParallelPersonalizedPageRank(sources : Array[VertexId], numIter: Int, + def staticParallelPersonalizedPageRank(sources: Array[VertexId], numIter: Int, resetProb: Double = 0.15) : Graph[BSV[Double], Double] = { PageRank.runParallelPersonalizedPageRank(graph, numIter, resetProb, sources) } - /** * Run Personalized PageRank for a fixed number of iterations with * with all iterations originating at the source node From 85063535bd29936c260844f1fd815faccb474169 Mon Sep 17 00:00:00 2001 From: Yves Raimond Date: Mon, 23 Nov 2015 17:39:41 -0800 Subject: [PATCH 06/20] Code-style changes --- .../apache/spark/graphx/lib/PageRank.scala | 29 +++++++++---------- .../spark/graphx/lib/PageRankSuite.scala | 6 ++-- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 339c4c6a25d0b..cd34061415ec9 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -179,15 +179,14 @@ object PageRank extends Logging { */ def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15, - sources : Array[VertexId]): Graph[BSV[Double], Double] = - { + sources: Array[VertexId]): Graph[BSV[Double], Double] = { // TODO if one sources vertex id is outside of the int range // we won't be able to store its activations in a sparse vector val zero = new BSV[Double](Array(), Array(), sources.size) - val sourcesInitMap = sources.zipWithIndex.map{case (vid, i) => { + val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) => val v = new BSV[Double](Array(i), Array(resetProb), sources.size) (vid, v) - }}.toMap + }.toMap val sc = graph.vertices.sparkContext val sourcesInitMapBC = sc.broadcast(sourcesInitMap) // Initialize the PageRank graph with each edge attribute having @@ -196,14 +195,14 @@ object PageRank extends Logging { // Associate the degree with each vertex .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) } // Set the weight on the edges based on the degree - .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src ) - .mapVertices( (vid, attr) => { - if (sourcesInitMapBC.value contains vid) { - sourcesInitMapBC.value(vid) - } else { - zero + .mapTriplets(e => 1.0 / e.srcAttr, TripletFields.Src) + .mapVertices { (vid, attr) => + if (sourcesInitMapBC.value contains vid) { + sourcesInitMapBC.value(vid) + } else { + zero + } } - }) var i = 0 while (i < numIter) { @@ -212,18 +211,18 @@ object PageRank extends Logging { // and adding start nodes back in with activation resetProb val rankUpdates = rankGraph.aggregateMessages[BSV[Double]]( ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr), - (a : BSV[Double], b : BSV[Double]) => a :+ b, TripletFields.Src) + (a: BSV[Double], b: BSV[Double]) => a :+ b, TripletFields.Src) rankGraph = rankGraph.joinVertices(rankUpdates) { - (vid, oldRank, msgSum) => { - val popActivations : BSV[Double] = msgSum :* (1.0 - resetProb) + (vid, oldRank, msgSum) => + val popActivations: BSV[Double] = msgSum :* (1.0 - resetProb) val resetActivations = if (sourcesInitMapBC.value contains vid) { sourcesInitMapBC.value(vid) } else { zero } popActivations :+ resetActivations - }}.cache() + }.cache() rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices prevRankGraph.vertices.unpersist(false) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index a0e6d3fe129b6..b6305c8d00aba 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -125,7 +125,7 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { assert(compareRanks(staticRanks1, parallelStaticRanks1) < errorTol) val parallelStaticRanks2 = starGraph - .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices{ + .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices { case (vertexId, vector) => vector(0) }.vertices.cache() assert(compareRanks(staticRanks2, parallelStaticRanks2) < errorTol) @@ -135,7 +135,7 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { .vertices.cache() val otherDynamicRanks = starGraph.personalizedPageRank(1, 0, resetProb).vertices.cache() val otherParallelStaticRanks2 = starGraph - .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices{ + .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices { case (vertexId, vector) => vector(1) }.vertices.cache() assert(compareRanks(otherDynamicRanks, otherStaticRanks2) < errorTol) @@ -197,7 +197,7 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { assert(compareRanks(staticRanks, dynamicRanks) < errorTol) val parallelStaticRanks = chain - .staticParallelPersonalizedPageRank(Array(4), numIter, resetProb).mapVertices{ + .staticParallelPersonalizedPageRank(Array(4), numIter, resetProb).mapVertices { case (vertexId, vector) => vector(0) }.vertices.cache() assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol) From 2d1dee77f16bb2540bb46c7169dfd9c7e8228d77 Mon Sep 17 00:00:00 2001 From: Yves Raimond Date: Fri, 30 Oct 2015 16:05:20 -0700 Subject: [PATCH 07/20] Parallel personalized pagerank implementation --- graphx/pom.xml | 21 +++++ .../org/apache/spark/graphx/GraphOps.scala | 12 +++ .../apache/spark/graphx/lib/PageRank.scala | 79 +++++++++++++++++++ .../spark/graphx/lib/PageRankSuite.scala | 20 +++++ 4 files changed, 132 insertions(+) diff --git a/graphx/pom.xml b/graphx/pom.xml index bd4e53371b86d..811affb27c089 100644 --- a/graphx/pom.xml +++ b/graphx/pom.xml @@ -73,6 +73,27 @@ org.apache.spark spark-tags_${scala.binary.version} + + org.scalanlp + breeze_${scala.binary.version} + 0.11.2 + + + + junit + junit + + + org.apache.commons + commons-math3 + + + + + org.apache.commons + commons-math3 + target/scala-${scala.binary.version}/classes diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 868658dfe55e5..af3ead71577e9 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -24,6 +24,8 @@ import org.apache.spark.SparkException import org.apache.spark.graphx.lib._ import org.apache.spark.rdd.RDD +import breeze.linalg.SparseVector + /** * Contains additional functionality for [[Graph]]. All operations are expressed in terms of the * efficient GraphX API. This class is implicitly constructed for each Graph object. @@ -391,6 +393,16 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali PageRank.runUntilConvergenceWithOptions(graph, tol, resetProb, Some(src)) } + /** + * Run parallel personalized PageRank for a given array of source vertices, such + * that all random walks are started relative to the source vertices + */ + def staticParallelPersonalizedPageRank(sources : Array[VertexId], numIter: Int, + resetProb: Double = 0.15) : Graph[SparseVector[Double], Double] = { + PageRank.runParallelPersonalizedPageRank(graph, numIter, resetProb, sources) + } + + /** * Run Personalized PageRank for a fixed number of iterations with * with all iterations originating at the source node diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 2f5bd4ed4ff6b..220c4031d9560 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -22,6 +22,8 @@ import scala.reflect.ClassTag import org.apache.spark.graphx._ import org.apache.spark.internal.Logging +import breeze.linalg.SparseVector + /** * PageRank algorithm implementation. There are two implementations of PageRank implemented. * @@ -162,6 +164,83 @@ object PageRank extends Logging { rankGraph } + /** + * Run Personalized PageRank for a fixed number of iterations, for a + * set of starting nodes in parallel. Returns a graph with vertex attributes + * containing the pagerank relative to all starting nodes (as a sparse vector) and + * edge attributes the normalized edge weight + * + * @tparam VD The original vertex attribute (not used) + * @tparam ED The original edge attribute (not used) + * + * @param graph The graph on which to compute personalized pagerank + * @param numIter The number of iterations to run + * @param resetProb The random reset probability + * @param sources The list of sources to compute personalized pagerank from + * @return the graph with vertex attributes + * containing the pagerank relative to all starting nodes (as a sparse vector) and + * edge attributes the normalized edge weight + */ + def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], + numIter: Int, resetProb: Double = 0.15, + sources : Array[VertexId]): Graph[SparseVector[Double], Double] = + { + // TODO if one sources vertex id is outside of the int range + // we won't be able to store its activations in a sparse vector + val zero = new SparseVector[Double](Array(), Array(), sources.size) + val sourcesInitMap = sources.zipWithIndex.map{case (vid, i) => { + val v = new SparseVector[Double](Array(i), Array(resetProb), sources.size) + (vid, v) + }}.toMap + val sc = graph.vertices.sparkContext + val sourcesInitMapBC = sc.broadcast(sourcesInitMap) + // Initialize the PageRank graph with each edge attribute having + // weight 1/outDegree and each source vertex with attribute 1.0. + var rankGraph = graph + // Associate the degree with each vertex + .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) } + // Set the weight on the edges based on the degree + .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src ) + .mapVertices( (vid, attr) => { + if (sourcesInitMapBC.value contains vid) { + sourcesInitMapBC.value(vid) + } else { + zero + } + }) + + var i = 0 + while (i < numIter) { + val prevRankGraph = rankGraph + // Propagates the message along outbound edges + // and adding start nodes back in with activation resetProb + val rankUpdates = rankGraph.aggregateMessages[SparseVector[Double]]( + ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr), + (a : SparseVector[Double], b : SparseVector[Double]) => a :+ b, TripletFields.Src) + + rankGraph = rankGraph.joinVertices(rankUpdates) { + (vid, oldRank, msgSum) => { + val popActivations : SparseVector[Double] = msgSum :* (1.0 - resetProb) + val resetActivations = if (sourcesInitMapBC.value contains vid) { + sourcesInitMapBC.value(vid) + } else { + zero + } + popActivations :+ resetActivations + }}.cache() + + rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices + prevRankGraph.vertices.unpersist(false) + prevRankGraph.edges.unpersist(false) + + logInfo(s"Parallel Personalized PageRank finished iteration $i.") + + i += 1 + } + + rankGraph + } + /** * Run a dynamic version of PageRank returning a graph with vertex attributes containing the * PageRank and edge attributes containing the normalized edge weight. diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index bdff31446f8ee..588db51474db9 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -118,11 +118,26 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { val dynamicRanks = starGraph.personalizedPageRank(0, 0, resetProb).vertices.cache() assert(compareRanks(staticRanks2, dynamicRanks) < errorTol) + val parallelStaticRanks1 = starGraph.staticParallelPersonalizedPageRank(Array(0), 1, resetProb).mapVertices{ + case (vertexId, vector) => vector(0) + }.vertices.cache() + assert(compareRanks(staticRanks1, parallelStaticRanks1) < errorTol) + + val parallelStaticRanks2 = starGraph.staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices{ + case (vertexId, vector) => vector(0) + }.vertices.cache() + assert(compareRanks(staticRanks2, parallelStaticRanks2) < errorTol) + // We have one outbound edge from 1 to 0 val otherStaticRanks2 = starGraph.staticPersonalizedPageRank(1, numIter = 2, resetProb) .vertices.cache() val otherDynamicRanks = starGraph.personalizedPageRank(1, 0, resetProb).vertices.cache() + val otherParallelStaticRanks2 = starGraph.staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices{ + case (vertexId, vector) => vector(1) + }.vertices.cache() assert(compareRanks(otherDynamicRanks, otherStaticRanks2) < errorTol) + assert(compareRanks(otherStaticRanks2, otherParallelStaticRanks2) < errorTol) + assert(compareRanks(otherDynamicRanks, otherParallelStaticRanks2) < errorTol) } } // end of test Star PersonalPageRank @@ -177,6 +192,11 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { val dynamicRanks = chain.personalizedPageRank(4, tol, resetProb).vertices assert(compareRanks(staticRanks, dynamicRanks) < errorTol) + + val parallelStaticRanks = chain.staticParallelPersonalizedPageRank(Array(4), numIter, resetProb).mapVertices{ + case (vertexId, vector) => vector(0) + }.vertices.cache() + assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol) } } } From 202acb2bb1f2451646e7aa69f1b0152cc5fb623e Mon Sep 17 00:00:00 2001 From: Yves Raimond Date: Wed, 4 Nov 2015 09:39:05 -0800 Subject: [PATCH 08/20] Scala style tweaks --- .../spark/graphx/lib/PageRankSuite.scala | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index 588db51474db9..5306891ec4e9d 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -118,23 +118,26 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { val dynamicRanks = starGraph.personalizedPageRank(0, 0, resetProb).vertices.cache() assert(compareRanks(staticRanks2, dynamicRanks) < errorTol) - val parallelStaticRanks1 = starGraph.staticParallelPersonalizedPageRank(Array(0), 1, resetProb).mapVertices{ - case (vertexId, vector) => vector(0) - }.vertices.cache() + val parallelStaticRanks1 = starGraph + .staticParallelPersonalizedPageRank(Array(0), 1, resetProb).mapVertices{ + case (vertexId, vector) => vector(0) + }.vertices.cache() assert(compareRanks(staticRanks1, parallelStaticRanks1) < errorTol) - val parallelStaticRanks2 = starGraph.staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices{ - case (vertexId, vector) => vector(0) - }.vertices.cache() + val parallelStaticRanks2 = starGraph + .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices{ + case (vertexId, vector) => vector(0) + }.vertices.cache() assert(compareRanks(staticRanks2, parallelStaticRanks2) < errorTol) // We have one outbound edge from 1 to 0 val otherStaticRanks2 = starGraph.staticPersonalizedPageRank(1, numIter = 2, resetProb) .vertices.cache() val otherDynamicRanks = starGraph.personalizedPageRank(1, 0, resetProb).vertices.cache() - val otherParallelStaticRanks2 = starGraph.staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices{ - case (vertexId, vector) => vector(1) - }.vertices.cache() + val otherParallelStaticRanks2 = starGraph + .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices{ + case (vertexId, vector) => vector(1) + }.vertices.cache() assert(compareRanks(otherDynamicRanks, otherStaticRanks2) < errorTol) assert(compareRanks(otherStaticRanks2, otherParallelStaticRanks2) < errorTol) assert(compareRanks(otherDynamicRanks, otherParallelStaticRanks2) < errorTol) @@ -193,9 +196,10 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { assert(compareRanks(staticRanks, dynamicRanks) < errorTol) - val parallelStaticRanks = chain.staticParallelPersonalizedPageRank(Array(4), numIter, resetProb).mapVertices{ - case (vertexId, vector) => vector(0) - }.vertices.cache() + val parallelStaticRanks = chain + .staticParallelPersonalizedPageRank(Array(4), numIter, resetProb).mapVertices{ + case (vertexId, vector) => vector(0) + }.vertices.cache() assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol) } } From 69db3855e5485847f3951f87c1f14ed3059a7f6f Mon Sep 17 00:00:00 2001 From: Yves Raimond Date: Mon, 23 Nov 2015 17:29:02 -0800 Subject: [PATCH 09/20] Renaming SparseVector to BSV --- .../scala/org/apache/spark/graphx/GraphOps.scala | 4 ++-- .../org/apache/spark/graphx/lib/PageRank.scala | 14 +++++++------- .../apache/spark/graphx/lib/PageRankSuite.scala | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index af3ead71577e9..3aabe0afa3460 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -24,7 +24,7 @@ import org.apache.spark.SparkException import org.apache.spark.graphx.lib._ import org.apache.spark.rdd.RDD -import breeze.linalg.SparseVector +import breeze.linalg.{SparseVector => BSV} /** * Contains additional functionality for [[Graph]]. All operations are expressed in terms of the @@ -398,7 +398,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * that all random walks are started relative to the source vertices */ def staticParallelPersonalizedPageRank(sources : Array[VertexId], numIter: Int, - resetProb: Double = 0.15) : Graph[SparseVector[Double], Double] = { + resetProb: Double = 0.15) : Graph[BSV[Double], Double] = { PageRank.runParallelPersonalizedPageRank(graph, numIter, resetProb, sources) } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 220c4031d9560..313a21b3b69f2 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -22,7 +22,7 @@ import scala.reflect.ClassTag import org.apache.spark.graphx._ import org.apache.spark.internal.Logging -import breeze.linalg.SparseVector +import breeze.linalg.{SparseVector => BSV} /** * PageRank algorithm implementation. There are two implementations of PageRank implemented. @@ -183,13 +183,13 @@ object PageRank extends Logging { */ def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15, - sources : Array[VertexId]): Graph[SparseVector[Double], Double] = + sources : Array[VertexId]): Graph[BSV[Double], Double] = { // TODO if one sources vertex id is outside of the int range // we won't be able to store its activations in a sparse vector - val zero = new SparseVector[Double](Array(), Array(), sources.size) + val zero = new BSV[Double](Array(), Array(), sources.size) val sourcesInitMap = sources.zipWithIndex.map{case (vid, i) => { - val v = new SparseVector[Double](Array(i), Array(resetProb), sources.size) + val v = new BSV[Double](Array(i), Array(resetProb), sources.size) (vid, v) }}.toMap val sc = graph.vertices.sparkContext @@ -214,13 +214,13 @@ object PageRank extends Logging { val prevRankGraph = rankGraph // Propagates the message along outbound edges // and adding start nodes back in with activation resetProb - val rankUpdates = rankGraph.aggregateMessages[SparseVector[Double]]( + val rankUpdates = rankGraph.aggregateMessages[BSV[Double]]( ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr), - (a : SparseVector[Double], b : SparseVector[Double]) => a :+ b, TripletFields.Src) + (a : BSV[Double], b : BSV[Double]) => a :+ b, TripletFields.Src) rankGraph = rankGraph.joinVertices(rankUpdates) { (vid, oldRank, msgSum) => { - val popActivations : SparseVector[Double] = msgSum :* (1.0 - resetProb) + val popActivations : BSV[Double] = msgSum :* (1.0 - resetProb) val resetActivations = if (sourcesInitMapBC.value contains vid) { sourcesInitMapBC.value(vid) } else { diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index 5306891ec4e9d..a0e6d3fe129b6 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -119,7 +119,7 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { assert(compareRanks(staticRanks2, dynamicRanks) < errorTol) val parallelStaticRanks1 = starGraph - .staticParallelPersonalizedPageRank(Array(0), 1, resetProb).mapVertices{ + .staticParallelPersonalizedPageRank(Array(0), 1, resetProb).mapVertices { case (vertexId, vector) => vector(0) }.vertices.cache() assert(compareRanks(staticRanks1, parallelStaticRanks1) < errorTol) From a42d272c21955d838ac50eeba53ca1b74bf2c4d2 Mon Sep 17 00:00:00 2001 From: Yves Raimond Date: Mon, 23 Nov 2015 17:30:39 -0800 Subject: [PATCH 10/20] Removing extra space, extra line --- graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 3aabe0afa3460..137394161e3c7 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -397,12 +397,11 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * Run parallel personalized PageRank for a given array of source vertices, such * that all random walks are started relative to the source vertices */ - def staticParallelPersonalizedPageRank(sources : Array[VertexId], numIter: Int, + def staticParallelPersonalizedPageRank(sources: Array[VertexId], numIter: Int, resetProb: Double = 0.15) : Graph[BSV[Double], Double] = { PageRank.runParallelPersonalizedPageRank(graph, numIter, resetProb, sources) } - /** * Run Personalized PageRank for a fixed number of iterations with * with all iterations originating at the source node From 53ab670cd48375732535a95368743fc57fc4e942 Mon Sep 17 00:00:00 2001 From: Yves Raimond Date: Mon, 23 Nov 2015 17:39:41 -0800 Subject: [PATCH 11/20] Code-style changes --- .../apache/spark/graphx/lib/PageRank.scala | 29 +++++++++---------- .../spark/graphx/lib/PageRankSuite.scala | 6 ++-- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 313a21b3b69f2..336a6bc5a9d96 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -183,15 +183,14 @@ object PageRank extends Logging { */ def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15, - sources : Array[VertexId]): Graph[BSV[Double], Double] = - { + sources: Array[VertexId]): Graph[BSV[Double], Double] = { // TODO if one sources vertex id is outside of the int range // we won't be able to store its activations in a sparse vector val zero = new BSV[Double](Array(), Array(), sources.size) - val sourcesInitMap = sources.zipWithIndex.map{case (vid, i) => { + val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) => val v = new BSV[Double](Array(i), Array(resetProb), sources.size) (vid, v) - }}.toMap + }.toMap val sc = graph.vertices.sparkContext val sourcesInitMapBC = sc.broadcast(sourcesInitMap) // Initialize the PageRank graph with each edge attribute having @@ -200,14 +199,14 @@ object PageRank extends Logging { // Associate the degree with each vertex .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) } // Set the weight on the edges based on the degree - .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src ) - .mapVertices( (vid, attr) => { - if (sourcesInitMapBC.value contains vid) { - sourcesInitMapBC.value(vid) - } else { - zero + .mapTriplets(e => 1.0 / e.srcAttr, TripletFields.Src) + .mapVertices { (vid, attr) => + if (sourcesInitMapBC.value contains vid) { + sourcesInitMapBC.value(vid) + } else { + zero + } } - }) var i = 0 while (i < numIter) { @@ -216,18 +215,18 @@ object PageRank extends Logging { // and adding start nodes back in with activation resetProb val rankUpdates = rankGraph.aggregateMessages[BSV[Double]]( ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr), - (a : BSV[Double], b : BSV[Double]) => a :+ b, TripletFields.Src) + (a: BSV[Double], b: BSV[Double]) => a :+ b, TripletFields.Src) rankGraph = rankGraph.joinVertices(rankUpdates) { - (vid, oldRank, msgSum) => { - val popActivations : BSV[Double] = msgSum :* (1.0 - resetProb) + (vid, oldRank, msgSum) => + val popActivations: BSV[Double] = msgSum :* (1.0 - resetProb) val resetActivations = if (sourcesInitMapBC.value contains vid) { sourcesInitMapBC.value(vid) } else { zero } popActivations :+ resetActivations - }}.cache() + }.cache() rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices prevRankGraph.vertices.unpersist(false) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index a0e6d3fe129b6..b6305c8d00aba 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -125,7 +125,7 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { assert(compareRanks(staticRanks1, parallelStaticRanks1) < errorTol) val parallelStaticRanks2 = starGraph - .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices{ + .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices { case (vertexId, vector) => vector(0) }.vertices.cache() assert(compareRanks(staticRanks2, parallelStaticRanks2) < errorTol) @@ -135,7 +135,7 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { .vertices.cache() val otherDynamicRanks = starGraph.personalizedPageRank(1, 0, resetProb).vertices.cache() val otherParallelStaticRanks2 = starGraph - .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices{ + .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices { case (vertexId, vector) => vector(1) }.vertices.cache() assert(compareRanks(otherDynamicRanks, otherStaticRanks2) < errorTol) @@ -197,7 +197,7 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { assert(compareRanks(staticRanks, dynamicRanks) < errorTol) val parallelStaticRanks = chain - .staticParallelPersonalizedPageRank(Array(4), numIter, resetProb).mapVertices{ + .staticParallelPersonalizedPageRank(Array(4), numIter, resetProb).mapVertices { case (vertexId, vector) => vector(0) }.vertices.cache() assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol) From 31e2e98d8ea1e374141b7e87101966513eceb992 Mon Sep 17 00:00:00 2001 From: Yves Raimond Date: Mon, 5 Sep 2016 10:28:55 -0700 Subject: [PATCH 12/20] Moving to mllib-local --- graphx/pom.xml | 5 +++++ .../org/apache/spark/graphx/GraphOps.scala | 4 ++-- .../apache/spark/graphx/lib/PageRank.scala | 19 ++++++++++++------- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/graphx/pom.xml b/graphx/pom.xml index 811affb27c089..7cf0ef6488974 100644 --- a/graphx/pom.xml +++ b/graphx/pom.xml @@ -46,6 +46,11 @@ test-jar test + + org.apache.spark + spark-mllib-local_${scala.binary.version} + ${project.version} + org.apache.xbean xbean-asm5-shaded diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 137394161e3c7..2097aa2fae9b3 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -24,7 +24,7 @@ import org.apache.spark.SparkException import org.apache.spark.graphx.lib._ import org.apache.spark.rdd.RDD -import breeze.linalg.{SparseVector => BSV} +import org.apache.spark.ml.linalg.Vector /** * Contains additional functionality for [[Graph]]. All operations are expressed in terms of the @@ -398,7 +398,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * that all random walks are started relative to the source vertices */ def staticParallelPersonalizedPageRank(sources: Array[VertexId], numIter: Int, - resetProb: Double = 0.15) : Graph[BSV[Double], Double] = { + resetProb: Double = 0.15) : Graph[Vector, Double] = { PageRank.runParallelPersonalizedPageRank(graph, numIter, resetProb, sources) } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 336a6bc5a9d96..bb575c2a62783 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -22,7 +22,9 @@ import scala.reflect.ClassTag import org.apache.spark.graphx._ import org.apache.spark.internal.Logging -import breeze.linalg.{SparseVector => BSV} +import org.apache.spark.ml.linalg.{Vectors, Vector} + +import breeze.linalg.{Vector => BV} /** * PageRank algorithm implementation. There are two implementations of PageRank implemented. @@ -183,12 +185,12 @@ object PageRank extends Logging { */ def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15, - sources: Array[VertexId]): Graph[BSV[Double], Double] = { + sources: Array[VertexId]): Graph[Vector, Double] = { // TODO if one sources vertex id is outside of the int range // we won't be able to store its activations in a sparse vector - val zero = new BSV[Double](Array(), Array(), sources.size) + val zero = Vectors.sparse(sources.size, List()).asBreeze val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) => - val v = new BSV[Double](Array(i), Array(resetProb), sources.size) + val v = Vectors.sparse(sources.size, Array(i), Array(resetProb)).asBreeze (vid, v) }.toMap val sc = graph.vertices.sparkContext @@ -213,13 +215,13 @@ object PageRank extends Logging { val prevRankGraph = rankGraph // Propagates the message along outbound edges // and adding start nodes back in with activation resetProb - val rankUpdates = rankGraph.aggregateMessages[BSV[Double]]( + val rankUpdates = rankGraph.aggregateMessages[BV[Double]]( ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr), - (a: BSV[Double], b: BSV[Double]) => a :+ b, TripletFields.Src) + (a : BV[Double], b : BV[Double]) => a :+ b, TripletFields.Src) rankGraph = rankGraph.joinVertices(rankUpdates) { (vid, oldRank, msgSum) => - val popActivations: BSV[Double] = msgSum :* (1.0 - resetProb) + val popActivations: BV[Double] = msgSum :* (1.0 - resetProb) val resetActivations = if (sourcesInitMapBC.value contains vid) { sourcesInitMapBC.value(vid) } else { @@ -238,6 +240,9 @@ object PageRank extends Logging { } rankGraph + .mapVertices { (vid, attr) => + Vectors.fromBreeze(attr) + } } /** From c7ca220423054137480ce197014d898afc83c745 Mon Sep 17 00:00:00 2001 From: Yves Raimond Date: Mon, 5 Sep 2016 10:38:04 -0700 Subject: [PATCH 13/20] Cleaning up pom dependencies --- graphx/pom.xml | 42 ------------------------------------------ mllib/pom.xml | 4 ++++ 2 files changed, 4 insertions(+), 42 deletions(-) diff --git a/graphx/pom.xml b/graphx/pom.xml index 8de217369d9fa..10d5ba93ebb88 100644 --- a/graphx/pom.xml +++ b/graphx/pom.xml @@ -78,48 +78,6 @@ org.apache.spark spark-tags_${scala.binary.version} - - org.scalanlp - breeze_${scala.binary.version} - 0.11.2 - - - - junit - junit - - - org.apache.commons - commons-math3 - - - - - org.apache.commons - commons-math3 - - - org.scalanlp - breeze_${scala.binary.version} - 0.11.2 - - - - junit - junit - - - org.apache.commons - commons-math3 - - - - - org.apache.commons - commons-math3 - target/scala-${scala.binary.version}/classes diff --git a/mllib/pom.xml b/mllib/pom.xml index 41339f4486e6f..4484998a49c8f 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -73,6 +73,10 @@ test-jar test + + org.scalanlp + breeze_${scala.binary.version} + org.apache.commons commons-math3 From 1ec345f2168b4b4f54847aed0dccf9d8c1c06873 Mon Sep 17 00:00:00 2001 From: Yves Raimond Date: Mon, 5 Sep 2016 14:53:15 -0700 Subject: [PATCH 14/20] Removing unused import --- graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 07bd8eb557a55..2097aa2fae9b3 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -26,8 +26,6 @@ import org.apache.spark.rdd.RDD import org.apache.spark.ml.linalg.Vector -import breeze.linalg.{SparseVector => BSV} - /** * Contains additional functionality for [[Graph]]. All operations are expressed in terms of the * efficient GraphX API. This class is implicitly constructed for each Graph object. From 2d00fc0575896bf04e4cece999f059028185fe46 Mon Sep 17 00:00:00 2001 From: Yves Raimond Date: Wed, 7 Sep 2016 16:23:23 -0700 Subject: [PATCH 15/20] Import style --- .../main/scala/org/apache/spark/graphx/GraphOps.scala | 1 - .../scala/org/apache/spark/graphx/lib/PageRank.scala | 9 +++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 2097aa2fae9b3..21c3516c2f5ab 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -23,7 +23,6 @@ import scala.util.Random import org.apache.spark.SparkException import org.apache.spark.graphx.lib._ import org.apache.spark.rdd.RDD - import org.apache.spark.ml.linalg.Vector /** diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 15b02bc311dbf..5735cdce91dc3 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -19,14 +19,11 @@ package org.apache.spark.graphx.lib import scala.reflect.ClassTag -import org.apache.spark.graphx._ -import org.apache.spark.internal.Logging - -import org.apache.spark.ml.linalg.{Vectors, Vector} - import breeze.linalg.{Vector => BV} -import breeze.linalg.{SparseVector => BSV} +import org.apache.spark.ml.linalg.{Vectors, Vector} +import org.apache.spark.graphx._ +import org.apache.spark.internal.Logging /** * PageRank algorithm implementation. There are two implementations of PageRank implemented. From 46381c84d353065e3e4ba3b2fa1c8c9e58c90b36 Mon Sep 17 00:00:00 2001 From: Yves Raimond Date: Wed, 7 Sep 2016 16:38:23 -0700 Subject: [PATCH 16/20] More import refactor --- .../src/main/scala/org/apache/spark/graphx/GraphOps.scala | 2 +- .../main/scala/org/apache/spark/graphx/lib/PageRank.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 21c3516c2f5ab..4f84a83c03979 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -20,10 +20,10 @@ package org.apache.spark.graphx import scala.reflect.ClassTag import scala.util.Random +import org.apache.spark.ml.linalg.Vector import org.apache.spark.SparkException import org.apache.spark.graphx.lib._ import org.apache.spark.rdd.RDD -import org.apache.spark.ml.linalg.Vector /** * Contains additional functionality for [[Graph]]. All operations are expressed in terms of the diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 5735cdce91dc3..75cd8dbf6235d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -21,9 +21,9 @@ import scala.reflect.ClassTag import breeze.linalg.{Vector => BV} -import org.apache.spark.ml.linalg.{Vectors, Vector} -import org.apache.spark.graphx._ +import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.internal.Logging +import org.apache.spark.graphx._ /** * PageRank algorithm implementation. There are two implementations of PageRank implemented. @@ -239,7 +239,7 @@ object PageRank extends Logging { } rankGraph - .mapVertices { (vid, attr) => + .mapVertices { (vid, attr) => Vectors.fromBreeze(attr) } } From 4b2d564ac57d331cfdfe5d2a0cd67516661c1b04 Mon Sep 17 00:00:00 2001 From: Yves Raimond Date: Wed, 7 Sep 2016 16:55:58 -0700 Subject: [PATCH 17/20] Alphabetical ordering of imports --- graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala | 4 ++-- .../src/main/scala/org/apache/spark/graphx/lib/PageRank.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 4f84a83c03979..90907300be975 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -20,10 +20,10 @@ package org.apache.spark.graphx import scala.reflect.ClassTag import scala.util.Random -import org.apache.spark.ml.linalg.Vector -import org.apache.spark.SparkException import org.apache.spark.graphx.lib._ +import org.apache.spark.ml.linalg.Vector import org.apache.spark.rdd.RDD +import org.apache.spark.SparkException /** * Contains additional functionality for [[Graph]]. All operations are expressed in terms of the diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 75cd8dbf6235d..57d9ea759cd3c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -21,9 +21,9 @@ import scala.reflect.ClassTag import breeze.linalg.{Vector => BV} -import org.apache.spark.ml.linalg.{Vector, Vectors} -import org.apache.spark.internal.Logging import org.apache.spark.graphx._ +import org.apache.spark.internal.Logging +import org.apache.spark.ml.linalg.{Vector, Vectors} /** * PageRank algorithm implementation. There are two implementations of PageRank implemented. From 40f5780d5be8a3ccd31ee9002650eabcd1ca968c Mon Sep 17 00:00:00 2001 From: Yves Raimond Date: Fri, 9 Sep 2016 15:37:09 -0700 Subject: [PATCH 18/20] Destroying broadcast map --- graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 57d9ea759cd3c..84754a33c21ce 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -232,6 +232,7 @@ object PageRank extends Logging { rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices prevRankGraph.vertices.unpersist(false) prevRankGraph.edges.unpersist(false) + sourcesInitMapBC.destroy(false) logInfo(s"Parallel Personalized PageRank finished iteration $i.") From 7dc2c234696b1780ff105f0519f72b067b003559 Mon Sep 17 00:00:00 2001 From: Yves Raimond Date: Fri, 9 Sep 2016 16:11:29 -0700 Subject: [PATCH 19/20] Minor styling --- .../src/main/scala/org/apache/spark/graphx/lib/PageRank.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 84754a33c21ce..ecad47040ec1a 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -239,8 +239,7 @@ object PageRank extends Logging { i += 1 } - rankGraph - .mapVertices { (vid, attr) => + rankGraph.mapVertices { (vid, attr) => Vectors.fromBreeze(attr) } } From adc5fc330bd7c467ee2c58953c4fa2eda741d92b Mon Sep 17 00:00:00 2001 From: Yves Raimond Date: Fri, 9 Sep 2016 17:24:27 -0700 Subject: [PATCH 20/20] Undoing destroy --- .../main/scala/org/apache/spark/graphx/lib/PageRank.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index ecad47040ec1a..f4b00757a8b54 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -232,7 +232,6 @@ object PageRank extends Logging { rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices prevRankGraph.vertices.unpersist(false) prevRankGraph.edges.unpersist(false) - sourcesInitMapBC.destroy(false) logInfo(s"Parallel Personalized PageRank finished iteration $i.") @@ -240,8 +239,8 @@ object PageRank extends Logging { } rankGraph.mapVertices { (vid, attr) => - Vectors.fromBreeze(attr) - } + Vectors.fromBreeze(attr) + } } /**