Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify device_memory_limit parsing and set default to 0.8 #439

Merged
15 changes: 8 additions & 7 deletions dask_cuda/cli/dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,14 @@
)
@click.option(
"--device-memory-limit",
default="auto",
help="Bytes of memory per CUDA device that the worker can use. "
"This can be an integer (bytes), "
"float (fraction of total device memory), "
"string (like 5GB or 5000M), "
"'auto', or zero for no memory management "
"(i.e., allow full device memory usage).",
default=0.8,
help="Specifies the size of the CUDA device LRU cache, which "
"is used to determine when the worker starts spilling to host "
"memory. This can be a float (fraction of total device "
"memory), an integer (bytes), a string (like 5GB or 5000M), "
"and 'auto' or 0 to disable spilling to host (i.e., allow "
"full device memory usage). Default is 0.8, 80% of the "
"worker's total device memory.",
)
@click.option(
"--rmm-pool-size",
Expand Down
11 changes: 4 additions & 7 deletions dask_cuda/cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
RMMSetup,
cuda_visible_devices,
get_cpu_affinity,
get_device_total_memory,
get_n_gpus,
get_ucx_config,
get_ucx_net_devices,
parse_device_memory_limit,
)


Expand Down Expand Up @@ -211,12 +211,9 @@ def del_pid_file():
data=(
DeviceHostFile,
{
"device_memory_limit": get_device_total_memory(index=i)
if (
device_memory_limit == "auto"
or device_memory_limit == int(0)
)
else parse_bytes(device_memory_limit),
"device_memory_limit": parse_device_memory_limit(
device_memory_limit, device_index=i
),
"memory_limit": memory_limit,
"local_directory": local_directory,
},
Expand Down
33 changes: 16 additions & 17 deletions dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
RMMSetup,
cuda_visible_devices,
get_cpu_affinity,
get_device_total_memory,
get_ucx_config,
get_ucx_net_devices,
parse_cuda_visible_device,
parse_device_memory_limit,
)


Expand All @@ -38,18 +38,22 @@ class LocalCUDACluster(LocalCluster):

Parameters
----------
CUDA_VISIBLE_DEVICES: str
String like ``"0,1,2,3"`` or ``[0, 1, 2, 3]`` to restrict activity to
different GPUs
CUDA_VISIBLE_DEVICES: str or list
String or list ``"0,1,2,3"`` or ``[0, 1, 2, 3]`` to restrict activity to
different GPUs.
device_memory_limit: int, float or str
Specifies the size of the CUDA device LRU cache, which is used to
determine when the worker starts spilling to host memory. This can be
a float (fraction of total device memory), an integer (bytes), a string
(like 5GB or 5000M), and "auto", 0 or None to disable spilling to
host (i.e., allow full device memory usage). Default is 0.8, 80% of the
worker's total device memory.
interface: str
The external interface used to connect to the scheduler, usually
an ethernet interface is used for connection, and not an InfiniBand
interface (if one is available).
threads_per_worker: int
Number of threads to be used for each CUDA worker process.
CUDA_VISIBLE_DEVICES: str or list
String or list ``"0,1,2,3"`` or ``[0, 1, 2, 3]`` to restrict activity to
different GPUs.
protocol: str
Protocol to use for communication, e.g., "tcp" or "ucx".
enable_tcp_over_ucx: bool
Expand Down Expand Up @@ -117,7 +121,7 @@ def __init__(
threads_per_worker=1,
processes=True,
memory_limit="auto",
device_memory_limit=None,
device_memory_limit=0.8,
CUDA_VISIBLE_DEVICES=None,
data=None,
local_directory=None,
Expand Down Expand Up @@ -147,7 +151,9 @@ def __init__(
self.host_memory_limit = parse_memory_limit(
memory_limit, threads_per_worker, n_workers
)
self.device_memory_limit = device_memory_limit
self.device_memory_limit = parse_device_memory_limit(
device_memory_limit, device_index=0
)

self.rmm_pool_size = rmm_pool_size
self.rmm_managed_memory = rmm_managed_memory
Expand Down Expand Up @@ -176,11 +182,6 @@ def __init__(
"Processes are necessary in order to use multiple GPUs with Dask"
)

if self.device_memory_limit is None:
self.device_memory_limit = get_device_total_memory(0)
elif isinstance(self.device_memory_limit, str):
self.device_memory_limit = parse_bytes(self.device_memory_limit)

if data is None:
data = (
DeviceHostFile,
Expand Down Expand Up @@ -265,9 +266,7 @@ def new_worker_spec(self):
visible_devices = cuda_visible_devices(worker_count, self.cuda_visible_devices)
spec["options"].update(
{
"env": {
"CUDA_VISIBLE_DEVICES": visible_devices,
},
"env": {"CUDA_VISIBLE_DEVICES": visible_devices,},
"plugins": {
CPUAffinity(get_cpu_affinity(worker_count)),
RMMSetup(self.rmm_pool_size, self.rmm_managed_memory),
Expand Down
40 changes: 20 additions & 20 deletions dask_cuda/tests/test_spill.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,23 +85,23 @@ def delayed_worker_assert(total_size, device_chunk_overhead, serialized_chunk_ov
"params",
[
{
"device_memory_limit": 200e6,
"memory_limit": 800e6,
"device_memory_limit": int(200e6),
"memory_limit": int(800e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": None,
"spills_to_disk": False,
},
{
"device_memory_limit": 200e6,
"memory_limit": 200e6,
"device_memory_limit": int(200e6),
"memory_limit": int(200e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": None,
"spills_to_disk": True,
},
{
"device_memory_limit": 200e6,
"device_memory_limit": int(200e6),
"memory_limit": 0,
"host_target": 0.0,
"host_spill": 0.0,
Expand Down Expand Up @@ -163,23 +163,23 @@ def test_device_spill(client, scheduler, worker):
"params",
[
{
"device_memory_limit": 200e6,
"memory_limit": 800e6,
"device_memory_limit": int(200e6),
"memory_limit": int(800e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": None,
"spills_to_disk": False,
},
{
"device_memory_limit": 200e6,
"memory_limit": 200e6,
"device_memory_limit": int(200e6),
"memory_limit": int(200e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": None,
"spills_to_disk": True,
},
{
"device_memory_limit": 200e6,
"device_memory_limit": int(200e6),
"memory_limit": 0,
"host_target": 0.0,
"host_spill": 0.0,
Expand Down Expand Up @@ -241,23 +241,23 @@ async def test_cupy_cluster_device_spill(params):
"params",
[
{
"device_memory_limit": 200e6,
"memory_limit": 800e6,
"device_memory_limit": int(200e6),
"memory_limit": int(800e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": None,
"spills_to_disk": False,
},
{
"device_memory_limit": 200e6,
"memory_limit": 200e6,
"device_memory_limit": int(200e6),
"memory_limit": int(200e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": None,
"spills_to_disk": True,
},
{
"device_memory_limit": 200e6,
"device_memory_limit": int(200e6),
"memory_limit": 0,
"host_target": 0.0,
"host_spill": 0.0,
Expand Down Expand Up @@ -330,23 +330,23 @@ def test_device_spill(client, scheduler, worker):
"params",
[
{
"device_memory_limit": 200e6,
"memory_limit": 800e6,
"device_memory_limit": int(200e6),
"memory_limit": int(800e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": None,
"spills_to_disk": False,
},
{
"device_memory_limit": 200e6,
"memory_limit": 200e6,
"device_memory_limit": int(200e6),
"memory_limit": int(200e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": None,
"spills_to_disk": True,
},
{
"device_memory_limit": 200e6,
"device_memory_limit": int(200e6),
"memory_limit": 0,
"host_target": 0.0,
"host_spill": 0.0,
Expand Down
13 changes: 13 additions & 0 deletions dask_cuda/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
get_ucx_config,
get_ucx_net_devices,
parse_cuda_visible_device,
parse_device_memory_limit,
unpack_bitmask,
)

Expand Down Expand Up @@ -219,3 +220,15 @@ def test_parse_visible_devices():
with pytest.raises(TypeError):
parse_cuda_visible_device(None)
parse_cuda_visible_device([])


def test_parse_device_memory_limit():
total = get_device_total_memory(0)

assert parse_device_memory_limit(None) == total
assert parse_device_memory_limit(0) == total
assert parse_device_memory_limit("auto") == total

assert parse_device_memory_limit(0.8) == int(total * 0.8)
assert parse_device_memory_limit(1000000000) == 1000000000
assert parse_device_memory_limit("1GB") == 1000000000
49 changes: 48 additions & 1 deletion dask_cuda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import pynvml
import toolz

from dask.distributed import wait
from distributed import wait
from distributed.utils import parse_bytes

try:
from nvtx import annotate as nvtx_annotate
Expand Down Expand Up @@ -442,3 +443,49 @@ def cuda_visible_devices(i, visible=None):

L = visible[i:] + visible[:i]
return ",".join(map(str, L))


def parse_device_memory_limit(device_memory_limit, device_index=0):
"""Parse memory limit to be used by a CUDA device.


Parameters
----------
device_memory_limit: float, int, str or None
This can be a float (fraction of total device memory), an integer (bytes),
a string (like 5GB or 5000M), and "auto", 0 or None for the total device
size.
device_index: int
The index of device from which to obtain the total memory amount.

Examples
--------
>>> # On a 32GB CUDA device
>>> parse_device_memory_limit(None)
34089730048
>>> parse_device_memory_limit(0.8)
27271784038
>>> parse_device_memory_limit(1000000000)
1000000000
>>> parse_device_memory_limit("1GB")
1000000000
"""
if any(device_memory_limit == v for v in [0, None, "auto"]):
return get_device_total_memory(device_index)
elif isinstance(device_memory_limit, int):
return device_memory_limit
elif isinstance(device_memory_limit, float):
if device_memory_limit < 0.0 or device_memory_limit > 1.0:
raise ValueError(
"When `device_memory_limit` is float, it must meet the "
"`0.0 <= device_memory_limit <= 1.0` condition, where that is "
"fraction of the total GPU memory."
)
return int(get_device_total_memory(device_index) * device_memory_limit)
elif isinstance(device_memory_limit, str):
return parse_bytes(device_memory_limit)
else:
raise ValueError(
"Invalid value for `device_memory_limit`, valid values are "
"floats, integers, strings, 0, None or 'auto'."
)