-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19525][CORE] Compressing checkpoints. #17024
Conversation
Spark's performance improves greatly if we enable compression of checkpoints.
Can one of the admins verify this patch? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be a great addition to checkpoint'ing, thanks for contributing !
I have left some comments.
@@ -95,6 +95,7 @@ private[spark] object CompressionCodec { | |||
val FALLBACK_COMPRESSION_CODEC = "snappy" | |||
val DEFAULT_COMPRESSION_CODEC = "lz4" | |||
val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq | |||
val ALL_COMPRESSION_CODECS_SHORT: Set[String] = shortCompressionCodecNames.keySet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of exposing this and supporting only short codec names for checkpoint, the pattern should be same as in rest of spark code when dealing with codec's.
sparkConf.getOption("spark.checkpoint.compress.codec").map(c =>
logInfo(s"Compressing checkpoint using $c.")
CompressionCodec.createCodec(conf, c).
compressedInputStream or compressedOutputStream
).getOrElse(fileStream)
This will ensure that support for checkpoint compression is in line with rest of spark (short and long classes, no need to introduce 'none')
Note: you will need to change fileStream to a lazy val
- so that if codec creation throws exception, we dont leave dangling streams around (with limited block visibility scope to fileStream)
@@ -133,9 +134,13 @@ private[spark] object ReliableCheckpointRDD extends Logging { | |||
val broadcastedConf = sc.broadcast( | |||
new SerializableConfiguration(sc.hadoopConfiguration)) | |||
// TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582) | |||
logInfo(s"The checkpoint compression codec is " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be logged if compression is enabled (none is not a supported compression codec).
It could also be rolled into the timing info log message below.
} else { | ||
// This is mainly for testing purpose | ||
fs.create(tempOutputPath, false, bufferSize, | ||
fs.getDefaultReplication(fs.getWorkingDirectory), blockSize) | ||
} | ||
val serializer = env.serializer.newInstance() | ||
val serializeStream = serializer.serializeStream(fileOutputStream) | ||
logInfo(s"Starting to write to checkpoint file $tempOutputPath.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will make the logs verbose.
If it does help with debugging, you could make it logTrace - or remove it entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thought was that since checkpointing shouldn't be done too frequently anyway, this won't make the logs too verbose in the executor, and may be helpful for debugging after issues with checkpointing have already occurred. I'll make it logDebug for now, is this okay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logDebug should be fine too.
@@ -197,6 +212,7 @@ private[spark] object ReliableCheckpointRDD extends Logging { | |||
} | |||
} | |||
} | |||
logInfo(s"Checkpointing took ${System.currentTimeMillis() - startTimeMs} ms.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add codec (if used) here.
logInfo(s"Compressing using $checkpointCodec.") | ||
compressionCodec.compressedOutputStream(fileStream) | ||
} else { | ||
fileStream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This repeated pattern can be rewritten as indicated above https://github.com/apache/spark/pull/17024/files#r102418860
CompressionCodec.createCodec(env.conf, checkpointCodec).compressedInputStream(fileStream) | ||
} else { | ||
fileStream | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mridulm Thank you so much! I will definitely update with your suggestions. |
+CC @tdas |
@mridulm I've added a new commit. Thank you for the review! :) |
@aramesh117 looks good ! |
I wonder if adding an extension (to the file) helps based on codec ... |
@mridulm Sure I can add in a file extension based on the codec being used. But is there a specific use case that adding an extension would solve? |
It makes it possible to identify what the data within the file is (compressed or not) - for user's perusal (it does not change anything for the application, that is true). |
@aramesh117 Unfortunately, since this heavily affects streaming, I cannot sign off on it without someone more familiar with spark streaming reviews it as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay. Made one pass.
import org.apache.spark.util.{SerializableConfiguration, Utils} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please remove unnecessary space changes
sc.runJob(originalRDD, | ||
writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _) | ||
|
||
logInfo(s"Checkpointing took ${System.currentTimeMillis() - startTime} ms.") | ||
sc.conf.getOption("spark.checkpoint.compress.codec").foreach(codec => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency, I suggest we just add a new config spark.checkpoint.compress
which means whether to enable checkpoint compression. See
compressionCodec = if (conf.getBoolean("spark.broadcast.compress", true)) { |
@@ -133,9 +136,14 @@ private[spark] object ReliableCheckpointRDD extends Logging { | |||
val broadcastedConf = sc.broadcast( | |||
new SerializableConfiguration(sc.hadoopConfiguration)) | |||
// TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582) | |||
val startTime = System.currentTimeMillis() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please use nanoTime
to measure the duration. See https://github.com/databricks/scala-style-guide/tree/f6cce9ab32e7b288638f2f1615d20a3b6d16ef2e#misc_currentTimeMillis_vs_nanoTime
} else { | ||
// This is mainly for testing purpose | ||
fs.create(tempOutputPath, false, bufferSize, | ||
fs.getDefaultReplication(fs.getWorkingDirectory), blockSize) | ||
} | ||
val serializer = env.serializer.newInstance() | ||
val serializeStream = serializer.serializeStream(fileOutputStream) | ||
logTrace(s"Starting to write to checkpoint file $tempOutputPath.") | ||
val startTimeMs = System.currentTimeMillis() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
import org.apache.spark.rdd._ | ||
import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId} | ||
import org.apache.spark.util.Utils | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please move unnecessary changes.
sc = new SparkContext("local", "test") | ||
sc.setCheckpointDir(checkpointDir.toString) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please move unnecessary changes.
testBasicCheckpoint(sc, reliableCheckpoint) | ||
} | ||
|
||
runTest("compression with snappy", skipLocalCheckpoint = true) { _: Boolean => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After you change the config to spark.checkpoint.compress
, you don't need to test all compression codecs. Just write one test for the default codec. Others should be covered in CompressionCodecSuite
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the new test, I think we just need one simple test. And if we put it into a new suite (e.g., the below example), then we don't need to touch the existing codes.
class CheckpointCompressionSuite extends SparkFunSuite with LocalSparkContext {
test("checkpoint compression") {
val checkpointDir = File.createTempFile("temp", "", Utils.createTempDir())
try {
val conf = new SparkConf().set("spark.checkpoint.compress", "true")
sc = new SparkContext("local", "test", conf)
sc.setCheckpointDir(checkpointDir.toString)
val rdd = sc.makeRDD(1 to 20, numSlices = 1)
rdd.checkpoint()
assert(rdd.collect().toSeq === (1 to 20))
val checkpointPath = new Path(rdd.getCheckpointFile.get)
val fs = checkpointPath.getFileSystem(sc.hadoopConfiguration)
val checkpointFile =
fs.listStatus(checkpointPath).map(_.getPath).find(_.getName.startsWith("part-")).get
// Verify the checkpoint file can be decompressed
val compressedInputStream = CompressionCodec.createCodec(conf)
.compressedInputStream(fs.open(checkpointFile))
ByteStreams.toByteArray(compressedInputStream)
// Verify that the compressed content can be read back
assert(rdd.collect().toSeq === (1 to 20))
} finally {
Utils.deleteRecursively(checkpointDir)
}
}
}
@@ -238,6 +241,42 @@ trait RDDCheckpointTester { self: SparkFunSuite => | |||
protected def generateFatPairRDD(): RDD[(Int, Int)] = { | |||
new FatPairRDD(sparkContext.makeRDD(1 to 100, 4), partitioner).mapValues(x => x) | |||
} | |||
|
|||
protected def testBasicCheckpoint(sc: SparkContext, reliableCheckpoint: Boolean): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: does this one test any special logic? If it's covered by other tests, not need to add it to increase the test time.
@aramesh117 do you have time to work on this PR recently? We need to merge this PR ASAP in order to get it into 2.2.0. Thanks! |
@aramesh117 I just opened #17789 to finish the rest work. All credits will go to you when merging the new PR. |
## What changes were proposed in this pull request? This PR adds RDD checkpoint compression support and add a new config `spark.checkpoint.compress` to enable/disable it. Credit goes to aramesh117 Closes #17024 ## How was this patch tested? The new unit test. Author: Shixiong Zhu <[email protected]> Author: Aaditya Ramesh <[email protected]> Closes #17789 from zsxwing/pr17024. (cherry picked from commit 77bcd77) Signed-off-by: Shixiong Zhu <[email protected]>
This PR adds RDD checkpoint compression support and add a new config `spark.checkpoint.compress` to enable/disable it. Credit goes to aramesh117 Closes apache#17024 The new unit test. Author: Shixiong Zhu <[email protected]> Author: Aaditya Ramesh <[email protected]> Closes apache#17789 from zsxwing/pr17024. (cherry picked from commit 77bcd77) Signed-off-by: Shixiong Zhu <[email protected]>
Spark Streaming's latency performance improves greatly for smaller batches if we enable compression of
checkpoints.
What changes were proposed in this pull request?
How was this patch tested?
This was tested using existing unit tests for backwards compatibility and with new tests for this functionality. It has also been used in our production system for almost a year.
Please review http://spark.apache.org/contributing.html before opening a pull request.