-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19525][CORE]Add RDD checkpoint compression support #17789
Conversation
Spark's performance improves greatly if we enable compression of checkpoints.
cc @mridulm since you reviewed the initial PR. |
@zsxwing Sorry for the delay! Thank you so much for your review and I saw a bit of your patch - it looks very nice. I have just one question - would it be a good idea to separate the codecs for compressing checkpoints and for network communication? I see that you reuse the same codec. If we wanted to isolate the effect of the RDD checkpointing, we would not be able to do that easily if these are coupled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for pushing on this !
I have a query regarding naming though in case codec's are enabled.
fs.create(tempOutputPath, false, bufferSize) | ||
val fileStream = fs.create(tempOutputPath, false, bufferSize) | ||
if (env.conf.get(CHECKPOINT_COMPRESS)) { | ||
CompressionCodec.createCodec(env.conf).compressedOutputStream(fileStream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A question I had even with the earlier PR was - should we add the extension to either the directory or the file indicating compression type ?
Shuffle and cache files don't have an extension. I think it's better to be consistent in the whole code base.
Save as above. Shuffle and cache files use the same codec. |
In addition, I agree that having an extension and separating the codecs are good ideas. But they should be done in other PRs to not introduce multiple features in a large PR. |
Shuffle and cache files are not on hdfs :-) They do not potentially survive the application or be consumed OOB for recovery/inspection. |
Sounds good on doing it in separate PR - I am not too worried about shuffle/blockdata/etc btw - since they are private to application execution - checkpoint's tend to also be perused for other purposes (whether by design or not) since they are on hdfs; and with this PR it will become necessary to either guess the codec by iterating over supported codec's or pass it through other means for consumers. |
Streaming checkpoint files are on HDFS but don't have an extension :) |
@zsxwing They are compressed ? Interesting ... I never played with spark streaming unfortunately, so did not know ! |
Test build #76242 has finished for PR 17789 at commit
|
I thought the main reason @aramesh117 did this PR was for compression to be enabled for spark streaming usecase. |
To add, for non streaming usecases, this will definitely help - but was this a recent change for streaming ? (probably after @aramesh117 make the PR ?) |
Streaming checkpoint includes two parts:
Right now the first one is compressed. This PR is for RDD checkpoints. |
Ah interesting, thanks for clarifying ... weird that first was compressed and not second. But if there is an expectation some of the data is compressed already; perhaps we are being consistent now and there is no need to add extension (unless we uniformly do it everywhere). LGTM, thanks for the change @zsxwing and @aramesh117 ! |
Thanks, @mridulm @aramesh117 Merging to master and 2.2. |
## What changes were proposed in this pull request? This PR adds RDD checkpoint compression support and add a new config `spark.checkpoint.compress` to enable/disable it. Credit goes to aramesh117 Closes #17024 ## How was this patch tested? The new unit test. Author: Shixiong Zhu <[email protected]> Author: Aaditya Ramesh <[email protected]> Closes #17789 from zsxwing/pr17024. (cherry picked from commit 77bcd77) Signed-off-by: Shixiong Zhu <[email protected]>
This PR adds RDD checkpoint compression support and add a new config `spark.checkpoint.compress` to enable/disable it. Credit goes to aramesh117 Closes apache#17024 The new unit test. Author: Shixiong Zhu <[email protected]> Author: Aaditya Ramesh <[email protected]> Closes apache#17789 from zsxwing/pr17024. (cherry picked from commit 77bcd77) Signed-off-by: Shixiong Zhu <[email protected]>
What changes were proposed in this pull request?
This PR adds RDD checkpoint compression support and add a new config
spark.checkpoint.compress
to enable/disable it. Credit goes to @aramesh117Closes #17024
How was this patch tested?
The new unit test.