Skip to content

Commit

Permalink
Add option to enable RMM logging (#542)
Browse files Browse the repository at this point in the history
Continuation of #322; this PR adds:

- Keyword argument `rmm_log_directory` to `LocalCUDACluster` and `CUDAWorker` to enable per-worker RMM logging to a specified directory
- Option `--rmm-log-directory` to the `dask-cuda-worker` CLI to achieve the same as above
- Option `--rmm-log-directory` to the benchmarks to enable worker *and* scheduler RMM logging
- Tests to `test_local_cuda_cluster.py` and `test_dask_cuda_worker.py` to check that logging is happening

The log files use the following naming convention:

- `rmm_log_<N>.dev0.txt` for workers spawned through the construction of a cluster
- `rmm_log_<IP>:<PORT>.dev0.txt` for workers spawned using the CLI
- `rmm_log_scheduler.dev0.txt` for schedulers

Some questions:

- Should we leave `dev0` in the log file names? It seems redundant, but I didn't really put much time into seeing if we could stop RMM from appending that to the file names.
- Are the tests sufficient? Right now they are only checking that the memory resources are `rmm.mr.LoggingResourceAdaptors` when logging is enabled but a check that the files were actually generated could be nice.
- Should we consider adding logging to the scheduler/client?

cc @jakirkham @pentschev

Authors:
  - Charles Blackmon-Luca (@charlesbluca)

Approvers:
  - Peter Andreas Entschev (@pentschev)
  - Benjamin Zaitlen (@quasiben)

URL: #542
  • Loading branch information
charlesbluca authored Mar 2, 2021
1 parent c952d5e commit f99a037
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 15 deletions.
6 changes: 5 additions & 1 deletion dask_cuda/benchmarks/local_cudf_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,15 @@ def main(args):
setup_memory_pool,
pool_size=args.rmm_pool_size,
disable_pool=args.disable_rmm_pool,
log_directory=args.rmm_log_directory,
)
# Create an RMM pool on the scheduler due to occasional deserialization
# of CUDA objects. May cause issues with InfiniBand otherwise.
client.run_on_scheduler(
setup_memory_pool, 1e9, disable_pool=args.disable_rmm_pool
setup_memory_pool,
pool_size=1e9,
disable_pool=args.disable_rmm_pool,
log_directory=args.rmm_log_directory,
)

scheduler_workers = client.run_on_scheduler(get_scheduler_workers)
Expand Down
6 changes: 5 additions & 1 deletion dask_cuda/benchmarks/local_cudf_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,15 @@ def main(args):
setup_memory_pool,
pool_size=args.rmm_pool_size,
disable_pool=args.disable_rmm_pool,
log_directory=args.rmm_log_directory,
)
# Create an RMM pool on the scheduler due to occasional deserialization
# of CUDA objects. May cause issues with InfiniBand otherwise.
client.run_on_scheduler(
setup_memory_pool, 1e9, disable_pool=args.disable_rmm_pool
setup_memory_pool,
pool_size=1e9,
disable_pool=args.disable_rmm_pool,
log_directory=args.rmm_log_directory,
)

scheduler_workers = client.run_on_scheduler(get_scheduler_workers)
Expand Down
11 changes: 9 additions & 2 deletions dask_cuda/benchmarks/local_cupy.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,18 @@ async def run(args):
) as client:
scheduler_workers = await client.run_on_scheduler(get_scheduler_workers)

await client.run(setup_memory_pool, disable_pool=args.disable_rmm_pool)
await client.run(
setup_memory_pool,
disable_pool=args.disable_rmm_pool,
log_directory=args.rmm_log_directory,
)
# Create an RMM pool on the scheduler due to occasional deserialization
# of CUDA objects. May cause issues with InfiniBand otherwise.
await client.run_on_scheduler(
setup_memory_pool, 1e9, disable_pool=args.disable_rmm_pool
setup_memory_pool,
pool_size=1e9,
disable_pool=args.disable_rmm_pool,
log_directory=args.rmm_log_directory,
)

took_list = []
Expand Down
11 changes: 9 additions & 2 deletions dask_cuda/benchmarks/local_cupy_map_overlap.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,18 @@ async def run(args):
) as client:
scheduler_workers = await client.run_on_scheduler(get_scheduler_workers)

await client.run(setup_memory_pool, disable_pool=args.disable_rmm_pool)
await client.run(
setup_memory_pool,
disable_pool=args.disable_rmm_pool,
log_directory=args.rmm_log_directory,
)
# Create an RMM pool on the scheduler due to occasional deserialization
# of CUDA objects. May cause issues with InfiniBand otherwise.
await client.run_on_scheduler(
setup_memory_pool, 1e9, disable_pool=args.disable_rmm_pool
setup_memory_pool,
pool_size=1e9,
disable_pool=args.disable_rmm_pool,
log_directory=args.rmm_log_directory,
)

took_list = []
Expand Down
21 changes: 19 additions & 2 deletions dask_cuda/benchmarks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[]
parser.add_argument(
"--disable-rmm-pool", action="store_true", help="Disable the RMM memory pool"
)
parser.add_argument(
"--rmm-log-directory",
default=None,
type=str,
help="Directory to write worker and scheduler RMM log files to. "
"Logging is only enabled if RMM memory pool is enabled.",
)
parser.add_argument(
"--all-to-all", action="store_true", help="Run all-to-all before computation",
)
Expand Down Expand Up @@ -222,14 +229,24 @@ def get_scheduler_workers(dask_scheduler=None):
return dask_scheduler.workers


def setup_memory_pool(pool_size=None, disable_pool=False):
def setup_memory_pool(
dask_worker=None, pool_size=None, disable_pool=False, log_directory=None,
):
import cupy

import rmm

from dask_cuda.utils import get_rmm_log_file_name

logging = log_directory is not None

if not disable_pool:
rmm.reinitialize(
pool_allocator=True, devices=0, initial_pool_size=pool_size,
pool_allocator=True,
devices=0,
initial_pool_size=pool_size,
logging=logging,
log_file_name=get_rmm_log_file_name(dask_worker, logging, log_directory),
)
cupy.cuda.set_allocator(rmm.rmm_cupy_allocator)

Expand Down
10 changes: 10 additions & 0 deletions dask_cuda/cli/dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@
"WARNING: managed memory is currently incompatible with NVLink, "
"trying to enable both will result in an exception.",
)
@click.option(
"--rmm-log-directory",
default=None,
help="Directory to write per-worker RMM log files to; the client "
"and scheduler are not logged here."
"NOTE: Logging will only be enabled if --rmm-pool-size or "
"--rmm-managed-memory are specified.",
)
@click.option(
"--reconnect/--no-reconnect",
default=True,
Expand Down Expand Up @@ -209,6 +217,7 @@ def main(
device_memory_limit,
rmm_pool_size,
rmm_managed_memory,
rmm_log_directory,
pid_file,
resources,
dashboard,
Expand Down Expand Up @@ -253,6 +262,7 @@ def main(
device_memory_limit,
rmm_pool_size,
rmm_managed_memory,
rmm_log_directory,
pid_file,
resources,
dashboard,
Expand Down
3 changes: 2 additions & 1 deletion dask_cuda/cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def __init__(
device_memory_limit="auto",
rmm_pool_size=None,
rmm_managed_memory=False,
rmm_log_directory=None,
pid_file=None,
resources=None,
dashboard=True,
Expand Down Expand Up @@ -212,7 +213,7 @@ def del_pid_file():
env={"CUDA_VISIBLE_DEVICES": cuda_visible_devices(i)},
plugins={
CPUAffinity(get_cpu_affinity(i)),
RMMSetup(rmm_pool_size, rmm_managed_memory),
RMMSetup(rmm_pool_size, rmm_managed_memory, rmm_log_directory),
},
name=name if nprocs == 1 or not name else name + "-" + str(i),
local_directory=local_directory,
Expand Down
17 changes: 14 additions & 3 deletions dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,15 @@ class LocalCUDACluster(LocalCluster):
NOTE: The size is a per worker (i.e., per GPU) configuration, and
not cluster-wide!
rmm_managed_memory: bool
If True, initialize each worker with RMM and set it to use managed
memory. If False, RMM may still be used if `rmm_pool_size` is specified,
If ``True``, initialize each worker with RMM and set it to use managed
memory. If ``False``, RMM may still be used if `rmm_pool_size` is specified,
but in that case with default (non-managed) memory type.
WARNING: managed memory is currently incompatible with NVLink, trying
to enable both will result in an exception.
rmm_log_directory: str
Directory to write per-worker RMM log files to; the client and scheduler
are not logged here. Logging will only be enabled if `rmm_pool_size` or
`rmm_managed_memory` are specified.
jit_unspill: bool
If True, enable just-in-time unspilling. This is experimental and doesn't
support memory spilling to disk. Please see proxy_object.ProxyObject and
Expand Down Expand Up @@ -141,6 +145,7 @@ def __init__(
ucx_net_devices=None,
rmm_pool_size=None,
rmm_managed_memory=False,
rmm_log_directory=None,
jit_unspill=None,
**kwargs,
):
Expand Down Expand Up @@ -191,6 +196,8 @@ def __init__(
"#important-notes for more details"
)

self.rmm_log_directory = rmm_log_directory

if not processes:
raise ValueError(
"Processes are necessary in order to use multiple GPUs with Dask"
Expand Down Expand Up @@ -294,7 +301,11 @@ def new_worker_spec(self):
"env": {"CUDA_VISIBLE_DEVICES": visible_devices,},
"plugins": {
CPUAffinity(get_cpu_affinity(worker_count)),
RMMSetup(self.rmm_pool_size, self.rmm_managed_memory),
RMMSetup(
self.rmm_pool_size,
self.rmm_managed_memory,
self.rmm_log_directory,
),
},
}
)
Expand Down
26 changes: 26 additions & 0 deletions dask_cuda/tests/test_dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,32 @@ def test_rmm_managed(loop): # noqa: F811
assert v is rmm.mr.ManagedMemoryResource


def test_rmm_logging(loop): # noqa: F811
rmm = pytest.importorskip("rmm")
with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]):
with popen(
[
"dask-cuda-worker",
"127.0.0.1:9369",
"--host",
"127.0.0.1",
"--rmm-pool-size",
"2 GB",
"--rmm-log-directory",
".",
"--no-dashboard",
]
):
with Client("127.0.0.1:9369", loop=loop) as client:
assert wait_workers(client, n_gpus=get_n_gpus())

memory_resource_type = client.run(
rmm.mr.get_current_device_resource_type
)
for v in memory_resource_type.values():
assert v is rmm.mr.LoggingResourceAdaptor


def test_dashboard_address(loop): # noqa: F811
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]):
Expand Down
19 changes: 17 additions & 2 deletions dask_cuda/tests/test_local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ async def test_all_to_all():
async def test_rmm_pool():
rmm = pytest.importorskip("rmm")

async with LocalCUDACluster(rmm_pool_size="2GB", asynchronous=True) as cluster:
async with LocalCUDACluster(rmm_pool_size="2GB", asynchronous=True,) as cluster:
async with Client(cluster, asynchronous=True) as client:
memory_resource_type = await client.run(
rmm.mr.get_current_device_resource_type
Expand All @@ -145,7 +145,7 @@ async def test_rmm_pool():
async def test_rmm_managed():
rmm = pytest.importorskip("rmm")

async with LocalCUDACluster(rmm_managed_memory=True, asynchronous=True) as cluster:
async with LocalCUDACluster(rmm_managed_memory=True, asynchronous=True,) as cluster:
async with Client(cluster, asynchronous=True) as client:
memory_resource_type = await client.run(
rmm.mr.get_current_device_resource_type
Expand All @@ -154,6 +154,21 @@ async def test_rmm_managed():
assert v is rmm.mr.ManagedMemoryResource


@gen_test(timeout=20)
async def test_rmm_logging():
rmm = pytest.importorskip("rmm")

async with LocalCUDACluster(
rmm_pool_size="2GB", rmm_log_directory=".", asynchronous=True,
) as cluster:
async with Client(cluster, asynchronous=True) as client:
memory_resource_type = await client.run(
rmm.mr.get_current_device_resource_type
)
for v in memory_resource_type.values():
assert v is rmm.mr.LoggingResourceAdaptor


@gen_test(timeout=20)
async def test_cluster_worker():
async with LocalCUDACluster(
Expand Down
28 changes: 27 additions & 1 deletion dask_cuda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ def setup(self, worker=None):


class RMMSetup:
def __init__(self, nbytes, managed_memory):
def __init__(self, nbytes, managed_memory, log_directory):
self.nbytes = nbytes
self.managed_memory = managed_memory
self.logging = log_directory is not None
self.log_directory = log_directory

def setup(self, worker=None):
if self.nbytes is not None or self.managed_memory is True:
Expand All @@ -46,6 +48,10 @@ def setup(self, worker=None):
pool_allocator=pool_allocator,
managed_memory=self.managed_memory,
initial_pool_size=self.nbytes,
logging=self.logging,
log_file_name=get_rmm_log_file_name(
worker, self.logging, self.log_directory
),
)


Expand Down Expand Up @@ -319,6 +325,26 @@ def get_preload_options(
return preload_options


def get_rmm_log_file_name(dask_worker, logging=False, log_directory=None):
return (
os.path.join(
log_directory,
"rmm_log_%s.txt"
% (
(
dask_worker.name.split("/")[-1]
if isinstance(dask_worker.name, str)
else dask_worker.name
)
if hasattr(dask_worker, "name")
else "scheduler"
),
)
if logging
else None
)


def wait_workers(
client, min_timeout=10, seconds_per_gpu=2, n_gpus=None, timeout_callback=None
):
Expand Down

0 comments on commit f99a037

Please sign in to comment.