Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-11496][GRAPHX] Parallel implementation of personalized pagerank #9457

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions graphx/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,27 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-test-tags_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>breeze_${scala.binary.version}</artifactId>
<version>0.11.2</version>
<exclusions>
<!-- This is included as a compile-scoped dependency by jtransforms, which is
a dependency of breeze. -->
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since mllib depneds on graphx, please remove the breeze dependencies in mllib.

</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
11 changes: 11 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.apache.spark.rdd.RDD

import org.apache.spark.graphx.lib._

import breeze.linalg.{SparseVector => BSV}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import breeze.linalg.{SparseVector => BSV}

/**
* Contains additional functionality for [[Graph]]. All operations are expressed in terms of the
* efficient GraphX API. This class is implicitly constructed for each Graph object.
Expand Down Expand Up @@ -384,6 +386,15 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
PageRank.runUntilConvergenceWithOptions(graph, tol, resetProb, Some(src))
}

/**
* Run parallel personalized PageRank for a given array of source vertices, such
* that all random walks are started relative to the source vertices
*/
def staticParallelPersonalizedPageRank(sources: Array[VertexId], numIter: Int,
resetProb: Double = 0.15) : Graph[BSV[Double], Double] = {
PageRank.runParallelPersonalizedPageRank(graph, numIter, resetProb, sources)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extra line.

/**
* Run Personalized PageRank for a fixed number of iterations with
* with all iterations originating at the source node
Expand Down
78 changes: 78 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import scala.language.postfixOps
import org.apache.spark.Logging
import org.apache.spark.graphx._

import breeze.linalg.{SparseVector => BSV}

/**
* PageRank algorithm implementation. There are two implementations of PageRank implemented.
*
Expand Down Expand Up @@ -158,6 +160,82 @@ object PageRank extends Logging {
rankGraph
}

/**
* Run Personalized PageRank for a fixed number of iterations, for a
* set of starting nodes in parallel. Returns a graph with vertex attributes
* containing the pagerank relative to all starting nodes (as a sparse vector) and
* edge attributes the normalized edge weight
*
* @tparam VD The original vertex attribute (not used)
* @tparam ED The original edge attribute (not used)
*
* @param graph The graph on which to compute personalized pagerank
* @param numIter The number of iterations to run
* @param resetProb The random reset probability
* @param sources The list of sources to compute personalized pagerank from
* @return the graph with vertex attributes
* containing the pagerank relative to all starting nodes (as a sparse vector) and
* edge attributes the normalized edge weight
*/
def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
numIter: Int, resetProb: Double = 0.15,
sources: Array[VertexId]): Graph[BSV[Double], Double] = {
// TODO if one sources vertex id is outside of the int range
// we won't be able to store its activations in a sparse vector
val zero = new BSV[Double](Array(), Array(), sources.size)
val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) =>
val v = new BSV[Double](Array(i), Array(resetProb), sources.size)
(vid, v)
}.toMap
val sc = graph.vertices.sparkContext
val sourcesInitMapBC = sc.broadcast(sourcesInitMap)
// Initialize the PageRank graph with each edge attribute having
// weight 1/outDegree and each source vertex with attribute 1.0.
var rankGraph = graph
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
// Set the weight on the edges based on the degree
.mapTriplets(e => 1.0 / e.srcAttr, TripletFields.Src)
.mapVertices { (vid, attr) =>
if (sourcesInitMapBC.value contains vid) {
sourcesInitMapBC.value(vid)
} else {
zero
}
}

var i = 0
while (i < numIter) {
val prevRankGraph = rankGraph
// Propagates the message along outbound edges
// and adding start nodes back in with activation resetProb
val rankUpdates = rankGraph.aggregateMessages[BSV[Double]](
ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr),
(a: BSV[Double], b: BSV[Double]) => a :+ b, TripletFields.Src)

rankGraph = rankGraph.joinVertices(rankUpdates) {
(vid, oldRank, msgSum) =>
val popActivations: BSV[Double] = msgSum :* (1.0 - resetProb)
val resetActivations = if (sourcesInitMapBC.value contains vid) {
sourcesInitMapBC.value(vid)
} else {
zero
}
popActivations :+ resetActivations
}.cache()

rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
prevRankGraph.vertices.unpersist(false)
prevRankGraph.edges.unpersist(false)

logInfo(s"Parallel Personalized PageRank finished iteration $i.")

i += 1
}

rankGraph
}

/**
* Run a dynamic version of PageRank returning a graph with vertex attributes containing the
* PageRank and edge attributes containing the normalized edge weight.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,29 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
val dynamicRanks = starGraph.personalizedPageRank(0, 0, resetProb).vertices.cache()
assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)

val parallelStaticRanks1 = starGraph
.staticParallelPersonalizedPageRank(Array(0), 1, resetProb).mapVertices {
case (vertexId, vector) => vector(0)
}.vertices.cache()
assert(compareRanks(staticRanks1, parallelStaticRanks1) < errorTol)

val parallelStaticRanks2 = starGraph
.staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices {
case (vertexId, vector) => vector(0)
}.vertices.cache()
assert(compareRanks(staticRanks2, parallelStaticRanks2) < errorTol)

// We have one outbound edge from 1 to 0
val otherStaticRanks2 = starGraph.staticPersonalizedPageRank(1, numIter = 2, resetProb)
.vertices.cache()
val otherDynamicRanks = starGraph.personalizedPageRank(1, 0, resetProb).vertices.cache()
val otherParallelStaticRanks2 = starGraph
.staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices {
case (vertexId, vector) => vector(1)
}.vertices.cache()
assert(compareRanks(otherDynamicRanks, otherStaticRanks2) < errorTol)
assert(compareRanks(otherStaticRanks2, otherParallelStaticRanks2) < errorTol)
assert(compareRanks(otherDynamicRanks, otherParallelStaticRanks2) < errorTol)
}
} // end of test Star PersonalPageRank

Expand Down Expand Up @@ -177,6 +195,12 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
val dynamicRanks = chain.personalizedPageRank(4, tol, resetProb).vertices

assert(compareRanks(staticRanks, dynamicRanks) < errorTol)

val parallelStaticRanks = chain
.staticParallelPersonalizedPageRank(Array(4), numIter, resetProb).mapVertices {
case (vertexId, vector) => vector(0)
}.vertices.cache()
assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol)
}
}
}
17 changes: 0 additions & 17 deletions mllib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,23 +68,6 @@
<version>${jblas.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>breeze_${scala.binary.version}</artifactId>
<version>0.11.2</version>
<exclusions>
<!-- This is included as a compile-scoped dependency by jtransforms, which is
a dependency of breeze. -->
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
Expand Down