-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-14804][Spark][Graphx] Fix checkpointing of VertexRDD/EdgeRDD #15396
Conversation
I'm no expert on checkpointing, but the tests look fine to me. You could eliminated duplicated code in the tests by putting the shared code in a helper method. Tested locally to confirm unit tests fail before the fix. |
Test build #66538 has finished for PR 15396 at commit
|
Can you reference the earlier pull requests here too? |
@@ -1589,7 +1589,8 @@ abstract class RDD[T: ClassTag]( | |||
* This is introduced as an alias for `isCheckpointed` to clarify the semantics of the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to fix the comment here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh wait I just realized this is exactly the same as isCheckpointed
. It's kind of confusing why we have two methods that do the same thing. Is it because VertexRDD or something overrides one but not the other?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andrewor14 This method left as is in this PR - #15447
@@ -1589,7 +1589,8 @@ abstract class RDD[T: ClassTag]( | |||
* This is introduced as an alias for `isCheckpointed` to clarify the semantics of the | |||
* return value. Exposed for testing. | |||
*/ | |||
private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed | |||
private[spark] def isCheckpointedAndMaterialized: Boolean = | |||
checkpointData.exists(_.isCheckpointed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One way to fix the duplicate code is to make this one final and have isCheckpointed
call this one, then implementations of RDD can feel free to override isCheckpointed
without affecting our internal checkpointing logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you mean. if isCheckpointedAndMaterialized = isCheckpointed, and you override isCheckpointed, then that would change the behavior of isCheckpointedAndMaterialized as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I meant:
private[spark] final def isCheckpointedAndMaterialized: Boolean = {
checkpointData.exists(_.isCheckpointed)
}
def isCheckpointed: Boolean = isCheckpointedAndMaterialized
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed it. Can you check again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good, but you forgot to make this final. No big deal
Looks good. I left a suggestion that I think will make the code cleaner. |
Any update on this? Would love to see it in an upcoming release. |
Test build #72002 has finished for PR 15396 at commit
|
I am merging this to master and 2.1 |
## What changes were proposed in this pull request? EdgeRDD/VertexRDD overrides checkpoint() and isCheckpointed() to forward these to the internal partitionRDD. So when checkpoint() is called on them, its the partitionRDD that actually gets checkpointed. However since isCheckpointed() also overridden to call partitionRDD.isCheckpointed, EdgeRDD/VertexRDD.isCheckpointed returns true even though this RDD is actually not checkpointed. This would have been fine except the RDD's internal logic for computing the RDD depends on isCheckpointed(). So for VertexRDD/EdgeRDD, since isCheckpointed is true, when computing Spark tries to read checkpoint data of VertexRDD/EdgeRDD even though they are not actually checkpointed. Through a crazy sequence of call forwarding, it reads checkpoint data of partitionsRDD and tries to cast it to types in Vertex/EdgeRDD. This leads to ClassCastException. The minimal fix that does not change any public behavior is to modify RDD internal to not use public override-able API for internal logic. ## How was this patch tested? New unit tests. Author: Tathagata Das <[email protected]> Closes #15396 from tdas/SPARK-14804. (cherry picked from commit 47d5d0d) Signed-off-by: Tathagata Das <[email protected]>
## What changes were proposed in this pull request? EdgeRDD/VertexRDD overrides checkpoint() and isCheckpointed() to forward these to the internal partitionRDD. So when checkpoint() is called on them, its the partitionRDD that actually gets checkpointed. However since isCheckpointed() also overridden to call partitionRDD.isCheckpointed, EdgeRDD/VertexRDD.isCheckpointed returns true even though this RDD is actually not checkpointed. This would have been fine except the RDD's internal logic for computing the RDD depends on isCheckpointed(). So for VertexRDD/EdgeRDD, since isCheckpointed is true, when computing Spark tries to read checkpoint data of VertexRDD/EdgeRDD even though they are not actually checkpointed. Through a crazy sequence of call forwarding, it reads checkpoint data of partitionsRDD and tries to cast it to types in Vertex/EdgeRDD. This leads to ClassCastException. The minimal fix that does not change any public behavior is to modify RDD internal to not use public override-able API for internal logic. ## How was this patch tested? New unit tests. Author: Tathagata Das <[email protected]> Closes #15396 from tdas/SPARK-14804. (cherry picked from commit 47d5d0d) Signed-off-by: Tathagata Das <[email protected]>
## What changes were proposed in this pull request? EdgeRDD/VertexRDD overrides checkpoint() and isCheckpointed() to forward these to the internal partitionRDD. So when checkpoint() is called on them, its the partitionRDD that actually gets checkpointed. However since isCheckpointed() also overridden to call partitionRDD.isCheckpointed, EdgeRDD/VertexRDD.isCheckpointed returns true even though this RDD is actually not checkpointed. This would have been fine except the RDD's internal logic for computing the RDD depends on isCheckpointed(). So for VertexRDD/EdgeRDD, since isCheckpointed is true, when computing Spark tries to read checkpoint data of VertexRDD/EdgeRDD even though they are not actually checkpointed. Through a crazy sequence of call forwarding, it reads checkpoint data of partitionsRDD and tries to cast it to types in Vertex/EdgeRDD. This leads to ClassCastException. The minimal fix that does not change any public behavior is to modify RDD internal to not use public override-able API for internal logic. ## How was this patch tested? New unit tests. Author: Tathagata Das <[email protected]> Closes apache#15396 from tdas/SPARK-14804.
## What changes were proposed in this pull request? EdgeRDD/VertexRDD overrides checkpoint() and isCheckpointed() to forward these to the internal partitionRDD. So when checkpoint() is called on them, its the partitionRDD that actually gets checkpointed. However since isCheckpointed() also overridden to call partitionRDD.isCheckpointed, EdgeRDD/VertexRDD.isCheckpointed returns true even though this RDD is actually not checkpointed. This would have been fine except the RDD's internal logic for computing the RDD depends on isCheckpointed(). So for VertexRDD/EdgeRDD, since isCheckpointed is true, when computing Spark tries to read checkpoint data of VertexRDD/EdgeRDD even though they are not actually checkpointed. Through a crazy sequence of call forwarding, it reads checkpoint data of partitionsRDD and tries to cast it to types in Vertex/EdgeRDD. This leads to ClassCastException. The minimal fix that does not change any public behavior is to modify RDD internal to not use public override-able API for internal logic. ## How was this patch tested? New unit tests. Author: Tathagata Das <[email protected]> Closes apache#15396 from tdas/SPARK-14804.
What changes were proposed in this pull request?
EdgeRDD/VertexRDD overrides checkpoint() and isCheckpointed() to forward these to the internal partitionRDD. So when checkpoint() is called on them, its the partitionRDD that actually gets checkpointed. However since isCheckpointed() also overridden to call partitionRDD.isCheckpointed, EdgeRDD/VertexRDD.isCheckpointed returns true even though this RDD is actually not checkpointed.
This would have been fine except the RDD's internal logic for computing the RDD depends on isCheckpointed(). So for VertexRDD/EdgeRDD, since isCheckpointed is true, when computing Spark tries to read checkpoint data of VertexRDD/EdgeRDD even though they are not actually checkpointed. Through a crazy sequence of call forwarding, it reads checkpoint data of partitionsRDD and tries to cast it to types in Vertex/EdgeRDD. This leads to ClassCastException.
The minimal fix that does not change any public behavior is to modify RDD internal to not use public override-able API for internal logic.
How was this patch tested?
New unit tests.