Skip to content

Commit

Permalink
Individual CUDA object spilling(#451)
Browse files Browse the repository at this point in the history
This PR introduces a new _device host file_ that uses `ProxyObejct` to implement spilling of individual CUDA objects as opposed to the current host file, which spills entire keys. 

- [x] Implement spilling of individual objects
- [x] Handle task level aliasing
- [x] Handle shared device buffers
- [x] Write docs

To use, set `DASK_JIT_UNSPILL=True`

## Motivation  

### Aliases at the task level 

Consider the following two tasks:
```python

def task1():  # Create list of dataframes
    df1 = cudf.DataFrame({"a": range(10)})
    df2 = cudf.DataFrame({"a": range(10)})
    return [df1, df2]

def task2(dfs):  # Get the second item
    return dfs[1]    
```
Running the two task on a worker we get something like:
```python

>>> data["k1"] = task1()
>>> data["k2"] = task2(data["k1"])
>>> data
{
    "k1": [df1, df2],
    "k2": df2,
}
```
Since the current implementation of spilling works on keys and handles each keys separately, it overestimate the device memory used: `sizeof(df)*3`. But even worse, if it decides to spill `k2` no device memory is freed since `k1` still holds a reference to `df2`!

The new spilling implementation fixes this issue by wrapping identical CUDA objects in a shared `ProxyObejct` thus in this case `df2` in both `k1` and `k2` will refer to the same `ProxyObejct`.


### Sharing device buffers

Consider the following code snippet:
```python

>>> data["df"] = cudf.DataFrame({"a": range(10)})
>>> data["grouped"] = shuffle_group(data["df"], "a", 0, 2, 2, False, 2)
>>> data["v1"] = data["grouped"][0]
>>> data["v2"] = data["grouped"][1]
```
In this case `v1` and `v2` are separate objects and are handled separately both in the current and the new spilling implementation. However, the `shuffle_group()` in cudf actually returns a single device memory buffer such that `v1` and `v2` points to the same underlying memory buffer. Thus the current implement will again overestimate the memory use and spill one of the dataframes without any effect.
The new implementation takes this into account when estimating memory usage and make sure that either both dataframes are spilled or none of them are.

cc. @beckernick, @VibhuJawa
xref: dask/distributed#3756

Authors:
  - Mads R. B. Kristensen <[email protected]>

Approvers:
  - Peter Andreas Entschev

URL: #451
  • Loading branch information
madsbk authored Jan 6, 2021
1 parent 32d9d33 commit f6ec9ef
Show file tree
Hide file tree
Showing 11 changed files with 970 additions and 143 deletions.
6 changes: 4 additions & 2 deletions dask_cuda/cli/dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,10 @@
)
@click.option(
"--enable-jit-unspill/--disable-jit-unspill",
default=None, # If not specified, use Dask config
help="Enable just-in-time unspilling",
default=None,
help="Enable just-in-time unspilling. This is experimental and doesn't "
"support memory spilling to disk Please see proxy_object.ProxyObject "
"and proxify_host_file.ProxifyHostFile.",
)
def main(
scheduler,
Expand Down
37 changes: 24 additions & 13 deletions dask_cuda/cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from .device_host_file import DeviceHostFile
from .initialize import initialize
from .proxify_host_file import ProxifyHostFile
from .utils import (
CPUAffinity,
RMMSetup,
Expand Down Expand Up @@ -126,7 +127,6 @@ def del_pid_file():

preload_argv = kwargs.get("preload_argv", [])
kwargs = {"worker_port": None, "listen_address": None}
t = Nanny

if (
not scheduler
Expand Down Expand Up @@ -186,8 +186,29 @@ def del_pid_file():
else:
self.jit_unspill = jit_unspill

if self.jit_unspill:
data = lambda i: (
ProxifyHostFile,
{
"device_memory_limit": parse_device_memory_limit(
device_memory_limit, device_index=i
),
},
)
else:
data = lambda i: (
DeviceHostFile,
{
"device_memory_limit": parse_device_memory_limit(
device_memory_limit, device_index=i
),
"memory_limit": memory_limit,
"local_directory": local_directory,
},
)

self.nannies = [
t(
Nanny(
scheduler,
scheduler_file=scheduler_file,
nthreads=nthreads,
Expand Down Expand Up @@ -217,17 +238,7 @@ def del_pid_file():
cuda_device_index=i,
)
},
data=(
DeviceHostFile,
{
"device_memory_limit": parse_device_memory_limit(
device_memory_limit, device_index=i
),
"memory_limit": memory_limit,
"local_directory": local_directory,
"jit_unspill": self.jit_unspill,
},
),
data=data(i),
**kwargs,
)
for i in range(nprocs)
Expand Down
25 changes: 6 additions & 19 deletions dask_cuda/device_host_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@


class DeviceSerialized:
""" Store device object on the host
"""Store device object on the host
This stores a device-side object as
1. A msgpack encodable header
2. A list of `bytes`-like objects (like NumPy arrays)
Expand Down Expand Up @@ -56,7 +56,7 @@ def device_deserialize(header, frames):

@nvtx_annotate("SPILL_D2H", color="red", domain="dask_cuda")
def device_to_host(obj: object) -> DeviceSerialized:
header, frames = serialize(obj, serializers=["dask", "pickle"], on_error="raise")
header, frames = serialize(obj, serializers=("dask", "pickle"), on_error="raise")
return DeviceSerialized(header, frames)


Expand All @@ -76,7 +76,7 @@ def pxy_obj_device_to_host(obj: object) -> proxy_object.ProxyObject:

# Notice, both the "dask" and the "pickle" serializer will
# spill `obj` to main memory.
return proxy_object.asproxy(obj, serializers=["dask", "pickle"])
return proxy_object.asproxy(obj, serializers=("dask", "pickle"))


@nvtx_annotate("SPILL_H2D", color="green", domain="dask_cuda")
Expand All @@ -87,7 +87,7 @@ def pxy_obj_host_to_device(s: proxy_object.ProxyObject) -> object:


class DeviceHostFile(ZictBase):
""" Manages serialization/deserialization of objects.
"""Manages serialization/deserialization of objects.
Three LRU cache levels are controlled, for device, host and disk.
Each level takes care of serializing objects once its limit has been
Expand All @@ -106,16 +106,10 @@ class DeviceHostFile(ZictBase):
implies no spilling to disk.
local_directory: path
Path where to store serialized objects on disk
jit_unspill: bool
If True, enable just-in-time unspilling (see proxy_object.ProxyObject).
"""

def __init__(
self,
device_memory_limit=None,
memory_limit=None,
local_directory=None,
jit_unspill=False,
self, device_memory_limit=None, memory_limit=None, local_directory=None,
):
if local_directory is None:
local_directory = dask.config.get("temporary-directory") or os.getcwd()
Expand All @@ -141,14 +135,7 @@ def __init__(

self.device_keys = set()
self.device_func = dict()
if jit_unspill:
self.device_host_func = Func(
pxy_obj_device_to_host, pxy_obj_host_to_device, self.host_buffer
)
else:
self.device_host_func = Func(
device_to_host, host_to_device, self.host_buffer
)
self.device_host_func = Func(device_to_host, host_to_device, self.host_buffer)
self.device_buffer = Buffer(
self.device_func, self.device_host_func, device_memory_limit, weight=weight
)
Expand Down
119 changes: 119 additions & 0 deletions dask_cuda/get_device_memory_objects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from typing import Any, Set

from dask.sizeof import sizeof
from dask.utils import Dispatch

dispatch = Dispatch(name="get_device_memory_objects")


def get_device_memory_objects(obj: Any) -> Set:
""" Find all CUDA device objects in `obj`
Search through `obj` and find all CUDA device objects, which are objects
that either are known to `dispatch` or implement `__cuda_array_interface__`.
Notice, the CUDA device objects must be hashable.
Parameters
----------
obj: Any
Object to search through
Returns
-------
ret: set
Set of CUDA device memory objects
"""
return set(dispatch(obj))


@dispatch.register(object)
def get_device_memory_objects_default(obj):
if hasattr(obj, "_obj_pxy"):
if obj._obj_pxy["serializers"] is None:
return dispatch(obj._obj_pxy["obj"])
else:
return []
if hasattr(obj, "data"):
return dispatch(obj.data)
if hasattr(obj, "_owner") and obj._owner is not None:
return dispatch(obj._owner)
if hasattr(obj, "__cuda_array_interface__"):
return [obj]
return []


@dispatch.register(list)
@dispatch.register(tuple)
@dispatch.register(set)
@dispatch.register(frozenset)
def get_device_memory_objects_python_sequence(seq):
ret = []
for s in seq:
ret.extend(dispatch(s))
return ret


@dispatch.register(dict)
def get_device_memory_objects_python_dict(seq):
ret = []
for s in seq.values():
ret.extend(dispatch(s))
return ret


@dispatch.register_lazy("cupy")
def get_device_memory_objects_register_cupy():
from cupy.cuda.memory import MemoryPointer

@dispatch.register(MemoryPointer)
def get_device_memory_objects_cupy(obj):
return [obj.mem]


@dispatch.register_lazy("cudf")
def get_device_memory_objects_register_cudf():
import cudf.core.dataframe
import cudf.core.index
import cudf.core.multiindex
import cudf.core.series

@dispatch.register(cudf.core.dataframe.DataFrame)
def get_device_memory_objects_cudf_dataframe(obj):

ret = dispatch(obj._index)
for col in obj._data.columns:
ret += dispatch(col)
return ret

@dispatch.register(cudf.core.series.Series)
def get_device_memory_objects_cudf_series(obj):
return dispatch(obj._index) + dispatch(obj._column)

@dispatch.register(cudf.core.index.RangeIndex)
def get_device_memory_objects_cudf_range_index(obj):
# Avoid materializing RangeIndex. This introduce some inaccuracies
# in total device memory usage but we accept the memory use of
# RangeIndexes are limited.
return []

@dispatch.register(cudf.core.index.Index)
def get_device_memory_objects_cudf_index(obj):
return dispatch(obj._values)

@dispatch.register(cudf.core.multiindex.MultiIndex)
def get_device_memory_objects_cudf_multiindex(obj):
return dispatch(obj._columns)


@sizeof.register_lazy("cupy")
def register_cupy(): # NB: this overwrites dask.sizeof.register_cupy()
import cupy.cuda.memory

@sizeof.register(cupy.cuda.memory.BaseMemory)
def sizeof_cupy_base_memory(x):
return int(x.size)

@sizeof.register(cupy.ndarray)
def sizeof_cupy_ndarray(x):
return int(x.nbytes)
33 changes: 21 additions & 12 deletions dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from .device_host_file import DeviceHostFile
from .initialize import initialize
from .proxify_host_file import ProxifyHostFile
from .utils import (
CPUAffinity,
RMMSetup,
Expand Down Expand Up @@ -95,7 +96,10 @@ class LocalCUDACluster(LocalCluster):
WARNING: managed memory is currently incompatible with NVLink, trying
to enable both will result in an exception.
jit_unspill: bool
If True, enable just-in-time unspilling (see proxy_object.ProxyObject).
If True, enable just-in-time unspilling. This is experimental and doesn't
support memory spilling to disk. Please see proxy_object.ProxyObject and
proxify_host_file.ProxifyHostFile.
Examples
--------
Expand Down Expand Up @@ -198,17 +202,22 @@ def __init__(
self.jit_unspill = jit_unspill

if data is None:
data = (
DeviceHostFile,
{
"device_memory_limit": self.device_memory_limit,
"memory_limit": self.host_memory_limit,
"local_directory": local_directory
or dask.config.get("temporary-directory")
or os.getcwd(),
"jit_unspill": self.jit_unspill,
},
)
if self.jit_unspill:
data = (
ProxifyHostFile,
{"device_memory_limit": self.device_memory_limit,},
)
else:
data = (
DeviceHostFile,
{
"device_memory_limit": self.device_memory_limit,
"memory_limit": self.host_memory_limit,
"local_directory": local_directory
or dask.config.get("temporary-directory")
or os.getcwd(),
},
)

if enable_tcp_over_ucx or enable_infiniband or enable_nvlink:
if protocol is None:
Expand Down
Loading

0 comments on commit f6ec9ef

Please sign in to comment.