diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 3cf75a23298..2aa291cfd4f 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -164,8 +164,13 @@ distributed: nanny: preload: [] # Run custom modules with Nanny preload-argv: [] # See https://docs.dask.org/en/latest/how-to/customize-initialization.html + + # Override environment variables. Read important caveats at + # https://distributed.dask.org/en/stable/worker.html#nanny. environ: + # See https://distributed.dask.org/en/stable/worker-memory.html#automatically-trim-memory MALLOC_TRIM_THRESHOLD_: 65536 + # Numpy configuration OMP_NUM_THREADS: 1 MKL_NUM_THREADS: 1 diff --git a/distributed/nanny.py b/distributed/nanny.py index 1e3304eaad9..5d5a12e6573 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -87,6 +87,19 @@ class Nanny(ServerNode): 2. Existing environment variables 3. Dask configuration + Note + ---- + Some environment variables, like ``OMP_NUM_THREADS``, must be set before + importing numpy to have effect. Others, like ``MALLOC_TRIM_THRESHOLD_`` (see + :ref:`memtrim`), must be set before starting the Linux process. So we need to + set them before spawning the subprocess, even if this means poisoning the + process running the Nanny. + + For the same reason, be warned that changing + ``distributed.worker.multiprocessing-method`` from ``spawn`` to ``fork`` or + ``forkserver`` may inhibit some environment variables; if you do, you should + set the variables yourself in the shell before you start ``dask-worker``. + See Also -------- Worker @@ -666,6 +679,10 @@ async def start(self) -> Status: self.stopped = asyncio.Event() self.status = Status.starting + # Must set env variables before spawning the subprocess. + # See note in Nanny docstring. + os.environ.update(self.env) + try: await self.process.start() except OSError: @@ -811,6 +828,9 @@ def _run( Worker, ): # pragma: no cover try: + # Set the environment variables again. This is to avoid race conditions + # where different nannies in the same process set different environment + # variables. os.environ.update(env) dask.config.set(config) diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index b908236c4ab..f3d87cfbb1c 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -8,7 +8,6 @@ import random import sys from contextlib import suppress -from unittest import mock import psutil import pytest @@ -275,42 +274,50 @@ async def test_wait_for_scheduler(): assert "restart" not in log.lower(), log -@gen_cluster(nthreads=[], client=True) +@gen_cluster(client=True, nthreads=[]) async def test_environment_variable(c, s): - a = Nanny(s.address, memory_limit=0, env={"FOO": "123"}) - b = Nanny(s.address, memory_limit=0, env={"FOO": "456"}) - await asyncio.gather(a, b) - results = await c.run(lambda: os.environ["FOO"]) - assert results == {a.worker_address: "123", b.worker_address: "456"} - await asyncio.gather(a.close(), b.close()) - - -@gen_cluster(nthreads=[], client=True) -async def test_environment_variable_by_config(c, s, monkeypatch): - with dask.config.set({"distributed.nanny.environ": "456"}): with pytest.raises(TypeError, match="configuration must be of type dict"): - Nanny(s.address, memory_limit=0) - - with dask.config.set({"distributed.nanny.environ": {"FOO": "456"}}): - - # precedence - # kwargs > env var > config + Nanny(s.address) + + try: + # precedence: kwargs > env var > config + with dask.config.set({"distributed.nanny.environ": {"FOO": "123"}}): + os.environ["FOO"] = "456" + async with Nanny(s.address, env={"FOO": "789"}) as n: + results = await c.run(lambda: os.environ["FOO"]) + assert results == {n.worker_address: "789"} + + # Nanny sets the env variables in the Nanny's own process. + # Need to manually clean up os.environ afterwards. + os.environ["FOO"] = "456" + async with Nanny(s.address) as n: + results = await c.run(lambda: os.environ["FOO"]) + assert results == {n.worker_address: "456"} - with mock.patch.dict(os.environ, {"FOO": "BAR"}, clear=True): - a = Nanny(s.address, memory_limit=0, env={"FOO": "123"}) - x = Nanny(s.address, memory_limit=0) + del os.environ["FOO"] + async with Nanny(s.address) as n: + results = await c.run(lambda: os.environ["FOO"]) + assert results == {n.worker_address: "123"} + finally: + del os.environ["FOO"] - b = Nanny(s.address, memory_limit=0) - await asyncio.gather(a, b, x) +@gen_cluster(client=True, nthreads=[]) +async def test_environment_variable_race_condition(c, s): + """Nannies need to set the environment variables in their own process so that + special variables such as MALLOC_TRIM_THRESHOLD_ are picked up when the Worker's + interpreter is started. However, this creates a race condition with multiple Nannies + in the same process with different variables. Test that the environment variables + are set again inside their own process.""" + nannies = [Nanny(s.address, env={"FOO": str(i)}) for i in range(8)] + try: + await asyncio.gather(*nannies) results = await c.run(lambda: os.environ["FOO"]) - assert results == { - a.worker_address: "123", - b.worker_address: "456", - x.worker_address: "BAR", - } - await asyncio.gather(a.close(), b.close(), x.close()) + assert sorted(results.values()) == [str(i) for i in range(8)] + await asyncio.gather(*(n.close() for n in nannies)) + finally: + del os.environ["FOO"] @gen_cluster( @@ -585,3 +592,53 @@ async def test_scheduler_crash_doesnt_restart(s, a): await a.finished() assert a.status == Status.closed assert a.process is None + + +@pytest.mark.slow +@pytest.mark.skipif(not LINUX, reason="Requires GNU libc") +@gen_cluster( + client=True, + Worker=Nanny, + nthreads=[("", 2)], + worker_kwargs={"memory_limit": "2GiB"}, +) +async def test_malloc_trim_threshold(c, s, a): + """Test that the nanny sets the MALLOC_TRIM_THRESHOLD_ environment variable before + starting the worker process. + + This test relies on these settings to work: + + distributed.nanny.environ.MALLOC_TRIM_THRESHOLD_: 65536 + distributed.worker.multiprocessing-method: spawn + + We're deliberately not setting them explicitly in @gen_cluster above, as we want + this test to trip if somebody changes distributed.yaml. + + Note + ---- + This test may start failing in a future Python version if CPython switches to + using mimalloc by default. If it does, a thorough benchmarking exercise is needed. + """ + da = pytest.importorskip("dask.array") + + a = da.random.random( + 2**29 // 8, # 0.5 GiB, + chunks=160 * 2**10 // 8, # 160 kiB + ).persist() + await wait(a) + # Wait for heartbeat + while s.memory.process < 2**29: + await asyncio.sleep(0.01) + del a + + # This is the delicate bit, as it relies on + # 1. PyMem_Free() to be quick to invoke glibc free() when memory becomes available + # 2. glibc free() to be quick to invoke the kernel's sbrk() when the same happens + # + # At the moment of writing, the readings are: + # - 122 MiB after starting a new worker + # - 139 MiB after computing a trivial dask.array collection + # - 185 MiB at the end of this test, with MALLOC_TRIM_THRESHOLD_=65536 + # - 698 MiB at the end of this test, without MALLOC_TRIM_THRESHOLD_ + while s.memory.process > 250 * 2**20: + await asyncio.sleep(0.01)