Skip to content

Commit

Permalink
Set MALLOC_TRIM_THRESHOLD_ before interpreter start (#6681)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Jul 7, 2022
1 parent f7f6501 commit 2fcd520
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 30 deletions.
5 changes: 5 additions & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,13 @@ distributed:
nanny:
preload: [] # Run custom modules with Nanny
preload-argv: [] # See https://docs.dask.org/en/latest/how-to/customize-initialization.html

# Override environment variables. Read important caveats at
# https://distributed.dask.org/en/stable/worker.html#nanny.
environ:
# See https://distributed.dask.org/en/stable/worker-memory.html#automatically-trim-memory
MALLOC_TRIM_THRESHOLD_: 65536
# Numpy configuration
OMP_NUM_THREADS: 1
MKL_NUM_THREADS: 1

Expand Down
20 changes: 20 additions & 0 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,19 @@ class Nanny(ServerNode):
2. Existing environment variables
3. Dask configuration
Note
----
Some environment variables, like ``OMP_NUM_THREADS``, must be set before
importing numpy to have effect. Others, like ``MALLOC_TRIM_THRESHOLD_`` (see
:ref:`memtrim`), must be set before starting the Linux process. So we need to
set them before spawning the subprocess, even if this means poisoning the
process running the Nanny.
For the same reason, be warned that changing
``distributed.worker.multiprocessing-method`` from ``spawn`` to ``fork`` or
``forkserver`` may inhibit some environment variables; if you do, you should
set the variables yourself in the shell before you start ``dask-worker``.
See Also
--------
Worker
Expand Down Expand Up @@ -666,6 +679,10 @@ async def start(self) -> Status:
self.stopped = asyncio.Event()
self.status = Status.starting

# Must set env variables before spawning the subprocess.
# See note in Nanny docstring.
os.environ.update(self.env)

try:
await self.process.start()
except OSError:
Expand Down Expand Up @@ -811,6 +828,9 @@ def _run(
Worker,
): # pragma: no cover
try:
# Set the environment variables again. This is to avoid race conditions
# where different nannies in the same process set different environment
# variables.
os.environ.update(env)
dask.config.set(config)

Expand Down
117 changes: 87 additions & 30 deletions distributed/tests/test_nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import random
import sys
from contextlib import suppress
from unittest import mock

import psutil
import pytest
Expand Down Expand Up @@ -275,42 +274,50 @@ async def test_wait_for_scheduler():
assert "restart" not in log.lower(), log


@gen_cluster(nthreads=[], client=True)
@gen_cluster(client=True, nthreads=[])
async def test_environment_variable(c, s):
a = Nanny(s.address, memory_limit=0, env={"FOO": "123"})
b = Nanny(s.address, memory_limit=0, env={"FOO": "456"})
await asyncio.gather(a, b)
results = await c.run(lambda: os.environ["FOO"])
assert results == {a.worker_address: "123", b.worker_address: "456"}
await asyncio.gather(a.close(), b.close())


@gen_cluster(nthreads=[], client=True)
async def test_environment_variable_by_config(c, s, monkeypatch):

with dask.config.set({"distributed.nanny.environ": "456"}):
with pytest.raises(TypeError, match="configuration must be of type dict"):
Nanny(s.address, memory_limit=0)

with dask.config.set({"distributed.nanny.environ": {"FOO": "456"}}):

# precedence
# kwargs > env var > config
Nanny(s.address)

try:
# precedence: kwargs > env var > config
with dask.config.set({"distributed.nanny.environ": {"FOO": "123"}}):
os.environ["FOO"] = "456"
async with Nanny(s.address, env={"FOO": "789"}) as n:
results = await c.run(lambda: os.environ["FOO"])
assert results == {n.worker_address: "789"}

# Nanny sets the env variables in the Nanny's own process.
# Need to manually clean up os.environ afterwards.
os.environ["FOO"] = "456"
async with Nanny(s.address) as n:
results = await c.run(lambda: os.environ["FOO"])
assert results == {n.worker_address: "456"}

with mock.patch.dict(os.environ, {"FOO": "BAR"}, clear=True):
a = Nanny(s.address, memory_limit=0, env={"FOO": "123"})
x = Nanny(s.address, memory_limit=0)
del os.environ["FOO"]
async with Nanny(s.address) as n:
results = await c.run(lambda: os.environ["FOO"])
assert results == {n.worker_address: "123"}
finally:
del os.environ["FOO"]

b = Nanny(s.address, memory_limit=0)

await asyncio.gather(a, b, x)
@gen_cluster(client=True, nthreads=[])
async def test_environment_variable_race_condition(c, s):
"""Nannies need to set the environment variables in their own process so that
special variables such as MALLOC_TRIM_THRESHOLD_ are picked up when the Worker's
interpreter is started. However, this creates a race condition with multiple Nannies
in the same process with different variables. Test that the environment variables
are set again inside their own process."""
nannies = [Nanny(s.address, env={"FOO": str(i)}) for i in range(8)]
try:
await asyncio.gather(*nannies)
results = await c.run(lambda: os.environ["FOO"])
assert results == {
a.worker_address: "123",
b.worker_address: "456",
x.worker_address: "BAR",
}
await asyncio.gather(a.close(), b.close(), x.close())
assert sorted(results.values()) == [str(i) for i in range(8)]
await asyncio.gather(*(n.close() for n in nannies))
finally:
del os.environ["FOO"]


@gen_cluster(
Expand Down Expand Up @@ -585,3 +592,53 @@ async def test_scheduler_crash_doesnt_restart(s, a):
await a.finished()
assert a.status == Status.closed
assert a.process is None


@pytest.mark.slow
@pytest.mark.skipif(not LINUX, reason="Requires GNU libc")
@gen_cluster(
client=True,
Worker=Nanny,
nthreads=[("", 2)],
worker_kwargs={"memory_limit": "2GiB"},
)
async def test_malloc_trim_threshold(c, s, a):
"""Test that the nanny sets the MALLOC_TRIM_THRESHOLD_ environment variable before
starting the worker process.
This test relies on these settings to work:
distributed.nanny.environ.MALLOC_TRIM_THRESHOLD_: 65536
distributed.worker.multiprocessing-method: spawn
We're deliberately not setting them explicitly in @gen_cluster above, as we want
this test to trip if somebody changes distributed.yaml.
Note
----
This test may start failing in a future Python version if CPython switches to
using mimalloc by default. If it does, a thorough benchmarking exercise is needed.
"""
da = pytest.importorskip("dask.array")

a = da.random.random(
2**29 // 8, # 0.5 GiB,
chunks=160 * 2**10 // 8, # 160 kiB
).persist()
await wait(a)
# Wait for heartbeat
while s.memory.process < 2**29:
await asyncio.sleep(0.01)
del a

# This is the delicate bit, as it relies on
# 1. PyMem_Free() to be quick to invoke glibc free() when memory becomes available
# 2. glibc free() to be quick to invoke the kernel's sbrk() when the same happens
#
# At the moment of writing, the readings are:
# - 122 MiB after starting a new worker
# - 139 MiB after computing a trivial dask.array collection
# - 185 MiB at the end of this test, with MALLOC_TRIM_THRESHOLD_=65536
# - 698 MiB at the end of this test, without MALLOC_TRIM_THRESHOLD_
while s.memory.process > 250 * 2**20:
await asyncio.sleep(0.01)

0 comments on commit 2fcd520

Please sign in to comment.