-
Notifications
You must be signed in to change notification settings - Fork 236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support encryption and compression in disk store #9454
Conversation
Signed-off-by: Ferdinand Xu <[email protected]>
build |
.stringConf | ||
.createWithDefault("lz4") | ||
|
||
val SHUFFLE_COMPRESSION_ENABLED = conf("spark.rapids.shuffle.io.compression.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have separate configs instead of using the existing config Spark uses for shuffle and spill configuration? The problem with separate configs is if the user has setup shuffle compression and/or encryption before, they will not get it in practice when using the RAPIDS Accelerator which is surprising.
We should leverage the Spark configs. I'd be OK with using custom configs as an override for the Spark configs iff the custom configs are set. However we should not add those configs unless we know we absolutely need them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@winningsix I would love to see some performance numbers for this too. I'm happy to help produce them if you want me to. For encryption I think we have to follow what Spark has set no matter what. I see no reason to have a separate config for that. It is a security problem if we don't follow the Spark config. But for compression I can see us having some leeway, and I would love some hard numbers to back up any decision we make about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The discussion is generally around two things: 1) whether we should have separated configurations; 2) what's the our expectation on performance.
W.R.T. configuration, we're using the same encryption configuration as Spark while introduced some for fine-grained controls on different compression use cases. For current Spark implementation, the compression was controlled by block ID type (see link) with several different configurations. Now we're not exactly mapping RapidsBufferId
type to Spark's BlockId
type. Extra compression configurations were introduced here for Rapids dedicated use cases. But we can still respect existing Spark compression configuration and try to map our RapidsBufferId to Spark's blockId. Personally, I am preferring more to extra configurations. @abellina , your thoughts here?
W.R.T. performance, just to clarify test further. There're three comparisons on top of mind: i. compression CPU vs. GPU; ii. without compression VS. compression using different CPU; iii. performance over different compression codecs:
- CPU VS. GPU: that's something we can definitely consider later. But I'd like to have CPU based impl as a default option. And we can follow up on some perf here comparing to nvcomp.
- Non-compression VS. compression: the performance lost should be expected. The original motivation for this feature is to cover some CSP use case where their local disk is not big enough. So it's not a pure performance consideration here.
- Performance on various compression codecs: this is deciding on default codec. AFAIK, ZSTD would be a good option here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think on the performance the thought is that if we are compressing on the host we are writing less to disk, so we may come up even or faster, depending on the compression ratio. So the idea was to run a case that would spill terabytes with and without compression, to see what the effects are.
In terms of configs, yes agree the Spark configs seem a bit confusing. I do like that broadcasts can be optionally not compressed, so that could be follow on work to figure out how to implement that. Here are some ideas:
-
For the codec, we should just use the default from Spark, specified here:
spark.io.compression.codec
. You addedspark.rapids.shuffle.compression.codec
for shuffle, but I don't see a way to control the codec specifically for shuffle in Spark. In the name of less configs I propose we remove that too, and just usespark.io.compresion.codec
. -
spark.rapids.shuffle.io.compression.enabled
doesn't make a lot of sense right now. We have nvcomp right now for UCX shuffle compression and the buffers that you picked for compression are UCX buffers (ShuffleBufferId
andShuffleReceivedBufferId
are exclusively for UCX). -
In order to enable/disable compression, because Spark has
spark.broadcast.compress
,spark.shuffle.compress
,spark.rdd.compress
, etc, I don't think there's a great mapping to our stuff. Instead of all of this, I think the only config we add isspark.rapids.spill.io.compression.enabled
and that just says whether we will usespark.io.compression.codec
to compress and decompress spill blocks that are NOT UCX shuffle.
We can revisit the UCX shuffle pieces at a later time, but I think that's outside of the scope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the only config we add is spark.rapids.spill.io.compression.enabled and that just says whether we will use spark.io.compression.codec to compress and decompress spill blocks that are NOT UCX shuffle.
I don't think we want to separate this. Spill is spill. If compression helps the CPU, it's very likely to help the GPU. I don't think we should add this config until we have performance numbers justifying it. Even if we do add it, we clearly have use cases that are failing today when it's not compressing, so arguably false
is not the best default for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed with @jlowe Respecting Spark existing configuration sounds good. Dropped the newly added configuration in latest update.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala
Outdated
Show resolved
Hide resolved
build |
meta: TableMeta, | ||
spillPriority: Long) | ||
extends RapidsBufferBase( | ||
id, meta, spillPriority) { | ||
private[this] var hostBuffer: Option[HostMemoryBuffer] = None | ||
|
||
override val memoryUsedBytes: Long = size | ||
override val memoryUsedBytes: Long = uncompressedSize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we make this compressedSize
because that's the size of the RapidsBuffer
once it hits disk?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, the return value is for the original size of spilled data in memory, thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It all depends on what it is going to be used for. Technically there is no memory being used, so this API might as well return 0. The docs for it say.
The size of this buffer in bytes in its current store. As the buffer goes through
contiguous split (either added as a contiguous table already, or spilled to host),
its size changes because contiguous_split adds its own alignment padding.NOTE: Do not use this size to allocate a target buffer to copy, always use
getPackedSize.
The problem is that this API is used in all kinds of places for metrics and logging. The only places I can find where it is used beyond metrics are places where we know exactly what type is being passed in, by convention, so that we end up with a lot of coupling happening.
To me we should file a follow on issue and make it new APIs around what we actually want to work with. I don't know what those APIs are exactly, but a few that come to mind are.
- how much (disk, host memory, device memory) would freeing/spilling this make available. I can see two options for this.
a. one API that says how much and another API that says where it is stored.
b. one API that returns a case class with all three values in it. - how much memory do I need to hold the the uncompressed, packed version of this
- what codec is this compressed with.
- is this currently packed
For now I really don't care what we return so long as we have a follow on issue to clean this up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would make sense to expose & cleanup our metadata (we have it as BufferMeta
) to include more specifics. When we start looking at UCX compression (nvcomp or CPU) we could revisit this, but I would like to keep it as a different issue/PR.
We have I think most of what Bobby wants in the BufferMeta
, that said we need to expose it better and carry it through the whole stack, not just in the shuffle cases. Right now we only expressed nvcomp compressed UCX buffers to be compressed after partitioning (on shuffle write) and after the shuffle read during our coalesce step, nothing else in the stack actually knows how to use this. In the spill framework, we make sure we are compatible with these buffers, but we don't take actions if the buffers are compressed. Here's the BufferMeta
for reference:
enum CodecType : byte {
/// data simply copied, codec is only for testing
COPY = -1,
/// no compression codec was used on the data
UNCOMPRESSED = 0,
/// data compressed with the nvcomp LZ4 codec
NVCOMP_LZ4 = 1,
}
/// Descriptor for a compressed buffer
table CodecBufferDescriptor {
/// the compression codec used
codec: CodecType;
/// byte offset from the start of the enclosing compressed buffer
/// where the compressed data begins
compressed_offset: long;
/// size of the compressed data in bytes
compressed_size: long;
/// byte offset from the start of the enclosing uncompressed buffer
/// where the uncompressed data should be written
uncompressed_offset: long;
/// size of the uncompressed data in bytes
uncompressed_size: long;
}
table BufferMeta {
/// ID of this buffer
id: int;
/// size of the buffer data in bytes
size: long;
/// size of the uncompressed buffer data
uncompressed_size: long;
/// array of codec buffer descriptors if the data is compressed
codec_buffer_descrs: [CodecBufferDescriptor];
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI. I filed an issue to follow up.
val mappedBuffer = HostMemoryBuffer.mapFile(path, MapMode.READ_WRITE, | ||
fileOffset, size) | ||
hostBuffer = Some(mappedBuffer) | ||
val memBuffer = closeOnExcept(HostMemoryBuffer.allocate(uncompressedSize)) { decompressed => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to keep two implementations here, one when the block is ShuffleBufferId
or ShuffleReceivedBufferId
and the other for any other buffer id.
The Shuffle*BufferId buffers are specific to UCX shuffle. Lets just keep them mmap'ed and treat them separately for now.
The rest of the buffers are regular spill buffers we are trying to address in this change, so lets go ahead with the HostMemoryInputStream
approach here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, why do we want to skip those two types? Also, it needs to discuss whether skips encryption as well. Current the encryption will be turned on once we found encryption exists.
BTW, this logic is controlled by RapidsSerializerManager
's shouldCompress
, which determines compression on/off accordingly based on bufferID type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so the UCX buffers are complicated because they could be compressed via nvcomp already, that is ShuffleBufferId
and ShuffleReceiveBufferId
could represent a buffer that was compressed by the GPU. I didn't want to compress on the GPU and the CPU for these. UCX hasn't been a priority for that, so I think we should them and keep the old way for those buffers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, that makes sense avoiding duplicated compression. Will leave ShuffleBufferId
and ShuffleReceiveBufferId
uncompressed for CPU side (already updated).
In the end state, we should have an unified compression behavior. For example,
(1) Compression on/off based on buffer ID type (respect existing Spark compression configurations)
(2) CPU based impl. VS. GPU based impl. (depending on the compression codec configuration)
We can have further discussion when revisiting UCX part, given its current priority.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@winningsix have you tested this with UCX?
As far as I can tell, we won't compress the shuffle blocks due to https://github.com/NVIDIA/spark-rapids/pull/9454/files#diff-e6ad0480a5a3d6227c1a0a3915fd4ca9908a500f08eeea90d3e5f4645d12dbbdR74, but it seems wrapStream
always wraps for encryption. Was that intended? Either way, we need to test the whole thing to see if UCX shuffle breaks once we hit merge.
We need to keep two implementations here, one when the block is ShuffleBufferId or ShuffleReceivedBufferId and the other for any other buffer id.
^^ Also I was really hoping to get the original implementation with the mmap
back for UCX only, so we can tackle that as one unit later with the UCX issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, is there any reason we want to skip encryption for UCX path? We skipped compression due to potential GPU compression duplications.
For mmap or file stream, it seems we only need to keep one. What's the consideration behind for maintaining mmap path?
.stringConf | ||
.createWithDefault("lz4") | ||
|
||
val SHUFFLE_COMPRESSION_ENABLED = conf("spark.rapids.shuffle.io.compression.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think on the performance the thought is that if we are compressing on the host we are writing less to disk, so we may come up even or faster, depending on the compression ratio. So the idea was to run a case that would spill terabytes with and without compression, to see what the effects are.
In terms of configs, yes agree the Spark configs seem a bit confusing. I do like that broadcasts can be optionally not compressed, so that could be follow on work to figure out how to implement that. Here are some ideas:
-
For the codec, we should just use the default from Spark, specified here:
spark.io.compression.codec
. You addedspark.rapids.shuffle.compression.codec
for shuffle, but I don't see a way to control the codec specifically for shuffle in Spark. In the name of less configs I propose we remove that too, and just usespark.io.compresion.codec
. -
spark.rapids.shuffle.io.compression.enabled
doesn't make a lot of sense right now. We have nvcomp right now for UCX shuffle compression and the buffers that you picked for compression are UCX buffers (ShuffleBufferId
andShuffleReceivedBufferId
are exclusively for UCX). -
In order to enable/disable compression, because Spark has
spark.broadcast.compress
,spark.shuffle.compress
,spark.rdd.compress
, etc, I don't think there's a great mapping to our stuff. Instead of all of this, I think the only config we add isspark.rapids.spill.io.compression.enabled
and that just says whether we will usespark.io.compression.codec
to compress and decompress spill blocks that are NOT UCX shuffle.
We can revisit the UCX shuffle pieces at a later time, but I think that's outside of the scope.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala
Outdated
Show resolved
Hide resolved
build |
meta: TableMeta, | ||
spillPriority: Long) | ||
extends RapidsBufferBase( | ||
id, meta, spillPriority) { | ||
private[this] var hostBuffer: Option[HostMemoryBuffer] = None | ||
|
||
override val memoryUsedBytes: Long = size | ||
override val memoryUsedBytes: Long = uncompressedSize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It all depends on what it is going to be used for. Technically there is no memory being used, so this API might as well return 0. The docs for it say.
The size of this buffer in bytes in its current store. As the buffer goes through
contiguous split (either added as a contiguous table already, or spilled to host),
its size changes because contiguous_split adds its own alignment padding.NOTE: Do not use this size to allocate a target buffer to copy, always use
getPackedSize.
The problem is that this API is used in all kinds of places for metrics and logging. The only places I can find where it is used beyond metrics are places where we know exactly what type is being passed in, by convention, so that we end up with a lot of coupling happening.
To me we should file a follow on issue and make it new APIs around what we actually want to work with. I don't know what those APIs are exactly, but a few that come to mind are.
- how much (disk, host memory, device memory) would freeing/spilling this make available. I can see two options for this.
a. one API that says how much and another API that says where it is stored.
b. one API that returns a case class with all three values in it. - how much memory do I need to hold the the uncompressed, packed version of this
- what codec is this compressed with.
- is this currently packed
For now I really don't care what we return so long as we have a follow on issue to clean this up.
On performance tests I think we need to see our regular NDS @ 3TB and also a spill-prone case (we have some test queries we can run internally that would help show several TBs of spill). We'd compare before/after your change. |
OK, I will do it. Will check with you in slack about spill-prone case. |
build |
For spill-prone case, with compression on, it's slightly better than compression off case.
|
It may be nice to add here the spill sizes, how much did we spill to host and disk. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are still waiting for NDS tests to be run with this patch and UCX tests.
val mappedBuffer = HostMemoryBuffer.mapFile(path, MapMode.READ_WRITE, | ||
fileOffset, size) | ||
hostBuffer = Some(mappedBuffer) | ||
val memBuffer = closeOnExcept(HostMemoryBuffer.allocate(uncompressedSize)) { decompressed => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@winningsix have you tested this with UCX?
As far as I can tell, we won't compress the shuffle blocks due to https://github.com/NVIDIA/spark-rapids/pull/9454/files#diff-e6ad0480a5a3d6227c1a0a3915fd4ca9908a500f08eeea90d3e5f4645d12dbbdR74, but it seems wrapStream
always wraps for encryption. Was that intended? Either way, we need to test the whole thing to see if UCX shuffle breaks once we hit merge.
We need to keep two implementations here, one when the block is ShuffleBufferId or ShuffleReceivedBufferId and the other for any other buffer id.
^^ Also I was really hoping to get the original implementation with the mmap
back for UCX only, so we can tackle that as one unit later with the UCX issue.
For NDS tests, I verified it on spark cluster. It shows slightly better (6% - 7%) in power run. For UCX tests, any particular tests we want to conduct? Or NDS should be fine? |
Take a further look here. Currently spilled data from Rapids disk store is not counted in disk spill Spark metric. I will clean up this in issue 9496 as well. |
Please take a look at RapidsHostMemoryStore.trySpillToMaximumSize (https://github.com/NVIDIA/spark-rapids/blob/branch-23.12/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala#L86). In this function we update the disk bytes spilled metric. This is the trigger for the host->disk spill, and the metric is shown in the spark UI per stage. If you are not seeing spill bytes here that would be a bug. I'd expect with compression that your spill-prone test would show a difference in the amount of bytes spilled to disk compressed vs not compressed. |
As discussed, lets document this fully (what configs) once you have the runs with some of our automation that produces the regression report. The setting to alter the amount of GPU memory we make available to spark-rapids is |
build |
@abellina can you take a further look at this? Thanks! |
For spill heavy query, it shows some difference in both size and execution time. Note: stage 3 is most spill heavy stage for this query.
|
Thanks @winningsix. As discussed, we didn't think there was a performance change with this patch for NDS @ 3TB, since it doesn't spill. I will run some tests on my own today, including a UCX test. |
@winningsix I ran a special case of NDS where every buffer was spilled to disk (documented here #9596). I also ran this patch with UCX and verified that UCX buffers are not going through the new path, and the smoke tests pass. For the amount spilled in my NDS example: compression was 12TB, without compression we spilled 25TB. This is great as this is our benchmark data, and this shows we can reduce our disk usage by 50% with this patch. Nice!!! There is a big performance impact in the absolute worst case. I say that because this is our performance cluster and it has great IO bandwidth, so any work we do (compression) is overhead. There are other issues I documented in #9596 that we can tackle later where we could make our spill code smarter. As of today, with this patch, we could be 50% slower in a scenario where IO is really great. That means we should at least run tests in the cloud, perhaps with the same patch, to find out if we do way better there (with worse IO and smaller host memory), and these experiments would inform our auto tuner for recommendations where for some history files we may recommend disabling compression.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work, thanks for doing this @winningsix
build |
This PR fixes #9398. It is to support encryption and compression in disk store.
For compression part, it introduces two separated configurations to turn on/off compression behavior.
For encryption part, it leverages Spark existing IO encryption from its serializerManager.
PR is tested by newly added UTs locally.