Skip to content

Commit

Permalink
[SPARK-14804][SPARK][GRAPHX] Fix checkpointing of VertexRDD/EdgeRDD
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

EdgeRDD/VertexRDD overrides checkpoint() and isCheckpointed() to forward these to the internal partitionRDD. So when checkpoint() is called on them, its the partitionRDD that actually gets checkpointed. However since isCheckpointed() also overridden to call partitionRDD.isCheckpointed, EdgeRDD/VertexRDD.isCheckpointed returns true even though this RDD is actually not checkpointed.

This would have been fine except the RDD's internal logic for computing the RDD depends on isCheckpointed(). So for VertexRDD/EdgeRDD, since isCheckpointed is true, when computing Spark tries to read checkpoint data of VertexRDD/EdgeRDD even though they are not actually checkpointed. Through a crazy sequence of call forwarding, it reads checkpoint data of partitionsRDD and tries to cast it to types in Vertex/EdgeRDD. This leads to ClassCastException.

The minimal fix that does not change any public behavior is to modify RDD internal to not use public override-able API for internal logic.
## How was this patch tested?

New unit tests.

Author: Tathagata Das <[email protected]>

Closes apache#15396 from tdas/SPARK-14804.
  • Loading branch information
tdas authored and uzadude committed Jan 27, 2017
1 parent d4e0ce7 commit 45f8da7
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 2 deletions.
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1610,14 +1610,15 @@ abstract class RDD[T: ClassTag](
/**
* Return whether this RDD is checkpointed and materialized, either reliably or locally.
*/
def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
def isCheckpointed: Boolean = isCheckpointedAndMaterialized

/**
* Return whether this RDD is checkpointed and materialized, either reliably or locally.
* This is introduced as an alias for `isCheckpointed` to clarify the semantics of the
* return value. Exposed for testing.
*/
private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed
private[spark] def isCheckpointedAndMaterialized: Boolean =
checkpointData.exists(_.isCheckpointed)

/**
* Return whether this RDD is marked for local checkpointing.
Expand Down
27 changes: 27 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.graphx

import org.apache.spark.SparkFunSuite
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {

Expand All @@ -33,4 +34,30 @@ class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("checkpointing") {
withSpark { sc =>
val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 3), (2L, 3)))
val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
edges.checkpoint()

// EdgeRDD not yet checkpointed
assert(!edges.isCheckpointed)
assert(!edges.isCheckpointedAndMaterialized)
assert(!edges.partitionsRDD.isCheckpointed)
assert(!edges.partitionsRDD.isCheckpointedAndMaterialized)

val data = edges.collect().toSeq // force checkpointing

// EdgeRDD shows up as checkpointed, but internally it is not.
// Only internal partitionsRDD is checkpointed.
assert(edges.isCheckpointed)
assert(!edges.isCheckpointedAndMaterialized)
assert(edges.partitionsRDD.isCheckpointed)
assert(edges.partitionsRDD.isCheckpointedAndMaterialized)

assert(edges.collect().toSeq === data) // test checkpointed RDD
}
}

}
26 changes: 26 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.graphx
import org.apache.spark.{HashPartitioner, SparkContext, SparkFunSuite}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {

Expand Down Expand Up @@ -197,4 +198,29 @@ class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("checkpoint") {
withSpark { sc =>
val n = 100
val verts = vertices(sc, n)
sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
verts.checkpoint()

// VertexRDD not yet checkpointed
assert(!verts.isCheckpointed)
assert(!verts.isCheckpointedAndMaterialized)
assert(!verts.partitionsRDD.isCheckpointed)
assert(!verts.partitionsRDD.isCheckpointedAndMaterialized)

val data = verts.collect().toSeq // force checkpointing

// VertexRDD shows up as checkpointed, but internally it is not.
// Only internal partitionsRDD is checkpointed.
assert(verts.isCheckpointed)
assert(!verts.isCheckpointedAndMaterialized)
assert(verts.partitionsRDD.isCheckpointed)
assert(verts.partitionsRDD.isCheckpointedAndMaterialized)

assert(verts.collect().toSeq === data) // test checkpointed RDD
}
}
}

0 comments on commit 45f8da7

Please sign in to comment.