Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-14804][Graphx] Graph vertexRDD/EdgeRDD checkpoint results Clas… #15447

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
}

override def isCheckpointed: Boolean = {
firstParent[(PartitionID, EdgePartition[ED, VD])].isCheckpointed
partitionsRDD != null && partitionsRDD.isCheckpointed
}

override def getCheckpointFile: Option[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class VertexRDDImpl[VD] private[graphx] (
}

override def isCheckpointed: Boolean = {
firstParent[ShippableVertexPartition[VD]].isCheckpointed
partitionsRDD != null && partitionsRDD.isCheckpointed
}

override def getCheckpointFile: Option[String] = {
Expand Down
26 changes: 26 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.graphx

import org.apache.spark.SparkFunSuite
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {

Expand All @@ -33,4 +34,29 @@ class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("checkpointing") {
withSpark { sc =>
val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 3), (2L, 3)))
val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
edges.checkpoint()

// EdgeRDD and partitionsRDD are not checkpointed yet
assert(!edges.isCheckpointed)
assert(!edges.isCheckpointedAndMaterialized)
assert(!edges.partitionsRDD.isCheckpointed)
assert(!edges.partitionsRDD.isCheckpointedAndMaterialized)

val data = edges.collect().toSeq // force checkpointing

// EdgeRDD and partitionsRDD are checkpointed now
assert(edges.isCheckpointed)
assert(edges.isCheckpointedAndMaterialized)
assert(edges.partitionsRDD.isCheckpointed)
assert(edges.partitionsRDD.isCheckpointedAndMaterialized)

assert(edges.collect().toSeq === data) // test checkpointed RDD
}
}

}
25 changes: 25 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.graphx
import org.apache.spark.{HashPartitioner, SparkContext, SparkFunSuite}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {

Expand Down Expand Up @@ -197,4 +198,28 @@ class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("checkpoint") {
withSpark { sc =>
val n = 100
val verts = vertices(sc, n)
sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
verts.checkpoint()

// VertexRDD and partitionsRDD are not checkpointed yet
assert(!verts.isCheckpointed)
assert(!verts.isCheckpointedAndMaterialized)
assert(!verts.partitionsRDD.isCheckpointed)
assert(!verts.partitionsRDD.isCheckpointedAndMaterialized)

val data = verts.collect().toSeq // force checkpointing

// VertexRDD and partitionsRDD are not checkpointed now
assert(verts.isCheckpointed)
assert(verts.isCheckpointedAndMaterialized)
assert(verts.partitionsRDD.isCheckpointed)
assert(verts.partitionsRDD.isCheckpointedAndMaterialized)

assert(verts.collect().toSeq === data) // test checkpointed RDD
}
}
}