-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use "dask"
serialization to move to/from host
#256
Use "dask"
serialization to move to/from host
#256
Conversation
8e4bf0b
to
d7c2795
Compare
Codecov Report
@@ Coverage Diff @@
## branch-0.13 #256 +/- ##
===============================================
+ Coverage 78.03% 78.09% +0.06%
===============================================
Files 14 14
Lines 979 968 -11
===============================================
- Hits 764 756 -8
+ Misses 215 212 -3
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are also some conflicts due to merging of #257
@@ -157,9 +157,9 @@ def test_serialize_cupy_collection(collection, length, value): | |||
if length > 5: | |||
assert obj.header["serializer"] == "pickle" | |||
elif length > 0: | |||
assert all([h["serializer"] == "cuda" for h in obj.header["sub-headers"]]) | |||
assert all([h["serializer"] == "dask" for h in obj.header["sub-headers"]]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that there are two types of serialization for CUDA objects in Dask now:
- "cuda" to serialize CUDA objects and transfer them without necessarily copying them to/from host -- e.g., can be used to transfer only
__cuda_array_interface__
that something like UCX would use to ensure transfer goes over CUDA IPC; - "dask" to serialize CUDA objects and transfer them to/from host -- useful for spilling, as is the case here.
Is my understanding correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally yes.
"dask"
serialization already existed before in Dask. We've just now implemented it for CUDA objects. Implementation details with references in issue ( #242 ).
@pentschev, any more thoughts here? 🙂 |
Sorry @jakirkham I was just testing this now, and I think we have an issue with this PR, the reason it didn't fail is because the tests are marked to xfail as they would fail randomly in CI, but I could always (or almost always) have them pass in a DGX. We have many I'm sorry that CI masked these errors, in fact what hinted me there could be an issue was the decrease in coverage as seen in https://codecov.io/gh/rapidsai/dask-cuda/pull/256/changes. Could you check why this is happening @jakirkham ? |
@jakirkham I should also mention that you can see the errors on a DGX too if you run |
Sure I can take a look. Should these tests not be marked as |
Ah I overlooked There may be more than that though. |
I would say this is the goal, but we haven't been able to make the tests deterministic and there's #79 for that, but it has been pretty hard to have CI always working.
I don't think we can drop that, it is used to control when dask-cuda spills. IOW, it uses them to keep track of LRU cache sizes and when to move data to host and back. |
9c263a1
to
a8045f3
Compare
Have improved this a bit, but there is still an issue. My guess is this has something to do with Dask splitting and compressing frames. Will look more into this next week. Traceback (most recent call last):
File "/conda/envs/gdf/lib/python3.7/site-packages/distributed/worker.py", line 2164, in release_key
if key in self.data and key not in self.dep_state:
File "/conda/envs/gdf/lib/python3.7/_collections_abc.py", line 666, in __contains__
self[key]
File "/var/lib/jenkins/workspace/rapidsai/gpuci/dask-cuda/prb/dask-cuda-gpu-build/dask_cuda/device_host_file.py", line 122, in __getitem__
return self.device_buffer[key]
File "/conda/envs/gdf/lib/python3.7/site-packages/zict/buffer.py", line 78, in __getitem__
return self.slow_to_fast(key)
File "/conda/envs/gdf/lib/python3.7/site-packages/zict/buffer.py", line 65, in slow_to_fast
value = self.slow[key]
File "/conda/envs/gdf/lib/python3.7/site-packages/zict/func.py", line 38, in __getitem__
return self.load(self.d[key])
File "/var/lib/jenkins/workspace/rapidsai/gpuci/dask-cuda/prb/dask-cuda-gpu-build/dask_cuda/device_host_file.py", line 57, in host_to_device
return deserialize(s.header, s.frames)
File "/conda/envs/gdf/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 269, in deserialize
return loads(header, frames)
File "/conda/envs/gdf/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 51, in dask_loads
return loads(header, frames)
File "/conda/envs/gdf/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 82, in dask_deserialize_cupy_ndarray
frames = [dask_deserialize_cuda_buffer(header, frames)]
File "/conda/envs/gdf/lib/python3.7/site-packages/distributed/protocol/rmm.py", line 37, in dask_deserialize_rmm_device_buffer
(frame,) = frames
ValueError: too many values to unpack (expected 1) |
7bbd1d2
to
e34898e
Compare
I think I've narrowed it down. It appears there may be a bug in Distributed. Basically it splits frames for compression. However it doesn't merge them back into their original structure after. Have submitted PR ( dask/distributed#3639 ) to address this. |
f9d9a27
to
1699d0d
Compare
As these serialization function names are important and are meant to be kept around, use proper names and not `_` (which is intended for values not to be kept).
As all `"cuda"` serializable objects are now also `"dask"` serializable, switch to just using `"dask"` serialization to perform device-to-host transfers to spill to host memory. Though continue falling back to `"pickle"` if nothing better can be found (after all that will be on host too). Similarly "unspilling" from host-to-device will happen naturally as part of the deserialization step. Should simplify what Dask-CUDA needs to keep track of/do.
This should ensure the frames can go through `"dask"` serialization when `DeviceSerialized` is serialized.
Note these frames are `bytes`-like. IOW C-contiguous buffers that can be easily loaded back into device memory or spilled to disk. https://docs.python.org/3/glossary.html#term-bytes-like-object
1699d0d
to
3f1577c
Compare
frames = [copy_to_host(f) if ic else f for ic, f in zip(is_cuda, frames)] | ||
return DeviceSerialized(header, frames, is_cuda) | ||
header, frames = serialize(obj, serializers=["dask", "pickle"]) | ||
frames = [numpy.asarray(f) for f in frames] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a note here, the frames
are memoryviews
. Since we want to serialize DeviceSerialized
later, which involves serializing these frames
, and Dask doesn't know how to serialize a memoryview
, it tries to pickle it or fails. To fix this, we construct a NumPy ndarray
that merely views the underlying data in the memoryview
. IOW this is quite fast and zero-copy. Since Dask knows how to serialize NumPy ndarray
s (and we were using ndarray
s here before), this should proceed without issues.
xref: dask/distributed#3640
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, this makes sense. Thanks for the description of how it works, it was really helpful to make me understand what's going on in here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah ideally we wouldn't need this, but for now its a simple enough workaround 🙂
I've cut this back a bit to a state where it seems to work based on local testing. CI is a bit backed up, but should come through in a bit. Please let me know if anything else is needed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jakirkham , I ran the tests in a DGX again and everything passed now!
Thanks for double checking Peter! 😄 I'll try adding back the other changes in another PR to see if we can simplify things further. |
Fixes #242
As all
"cuda"
serializable objects are now also"dask"
serializable, switch to just using"dask"
serialization to perform device-to-host transfers to spill to host memory. Though continue falling back to"pickle"
if nothing better can be found (after all that will be on host too). Similarly "unspilling" from host-to-device will happen naturally as part of the deserialization step. Should simplify what Dask-CUDA needs to keep track of/do.