From c9e2e03651255657f5dc7ce65d207a5e1f6e9d3a Mon Sep 17 00:00:00 2001 From: Alexander Pivovarov Date: Tue, 11 Oct 2016 22:48:37 -0700 Subject: [PATCH] [SPARK-14804][Graphx] Graph vertexRDD/EdgeRDD checkpoint results ClassCastException --- .../spark/graphx/impl/EdgeRDDImpl.scala | 2 +- .../spark/graphx/impl/VertexRDDImpl.scala | 2 +- .../apache/spark/graphx/EdgeRDDSuite.scala | 26 +++++++++++++++++++ .../apache/spark/graphx/VertexRDDSuite.scala | 25 ++++++++++++++++++ 4 files changed, 53 insertions(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index 98e082cc44e1a..ea74bf4e48dd3 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -76,7 +76,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( } override def isCheckpointed: Boolean = { - firstParent[(PartitionID, EdgePartition[ED, VD])].isCheckpointed + partitionsRDD != null && partitionsRDD.isCheckpointed } override def getCheckpointFile: Option[String] = { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index d314522de9916..f46fb7eb038d6 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -76,7 +76,7 @@ class VertexRDDImpl[VD] private[graphx] ( } override def isCheckpointed: Boolean = { - firstParent[ShippableVertexPartition[VD]].isCheckpointed + partitionsRDD != null && partitionsRDD.isCheckpointed } override def getCheckpointFile: Option[String] = { diff --git a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala index f1ecc9e2219d1..c68c44e83d951 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.graphx import org.apache.spark.SparkFunSuite import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.Utils class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext { @@ -33,4 +34,29 @@ class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext { } } + test("checkpointing") { + withSpark { sc => + val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 3), (2L, 3))) + val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]])) + sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath) + edges.checkpoint() + + // EdgeRDD and partitionsRDD are not checkpointed yet + assert(!edges.isCheckpointed) + assert(!edges.isCheckpointedAndMaterialized) + assert(!edges.partitionsRDD.isCheckpointed) + assert(!edges.partitionsRDD.isCheckpointedAndMaterialized) + + val data = edges.collect().toSeq // force checkpointing + + // EdgeRDD and partitionsRDD are checkpointed now + assert(edges.isCheckpointed) + assert(edges.isCheckpointedAndMaterialized) + assert(edges.partitionsRDD.isCheckpointed) + assert(edges.partitionsRDD.isCheckpointedAndMaterialized) + + assert(edges.collect().toSeq === data) // test checkpointed RDD + } + } + } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala index 0bb9e0a3ea180..1028688867232 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.graphx import org.apache.spark.{HashPartitioner, SparkContext, SparkFunSuite} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.Utils class VertexRDDSuite extends SparkFunSuite with LocalSparkContext { @@ -197,4 +198,28 @@ class VertexRDDSuite extends SparkFunSuite with LocalSparkContext { } } + test("checkpoint") { + withSpark { sc => + val n = 100 + val verts = vertices(sc, n) + sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath) + verts.checkpoint() + + // VertexRDD and partitionsRDD are not checkpointed yet + assert(!verts.isCheckpointed) + assert(!verts.isCheckpointedAndMaterialized) + assert(!verts.partitionsRDD.isCheckpointed) + assert(!verts.partitionsRDD.isCheckpointedAndMaterialized) + + val data = verts.collect().toSeq // force checkpointing + + // VertexRDD and partitionsRDD are not checkpointed now + assert(verts.isCheckpointed) + assert(verts.isCheckpointedAndMaterialized) + assert(verts.partitionsRDD.isCheckpointed) + assert(verts.partitionsRDD.isCheckpointedAndMaterialized) + + assert(verts.collect().toSeq === data) // test checkpointed RDD + } + } }