Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mock process memory readings in test_worker.py (v2) #5878

Merged
merged 20 commits into from
Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from inspect import isawaitable
from queue import Empty
from time import sleep as sync_sleep
from typing import ClassVar
from typing import TYPE_CHECKING, ClassVar, Literal

import psutil
from tornado import gen
Expand Down Expand Up @@ -45,6 +45,9 @@
)
from .worker import Worker, parse_memory_limit, run

if TYPE_CHECKING:
from .diagnostics.plugin import NannyPlugin

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -94,6 +97,7 @@ def __init__(
services=None,
name=None,
memory_limit="auto",
memory_terminate_fraction: float | Literal[False] | None = None,
reconnect=True,
validate=False,
quiet=False,
Expand Down Expand Up @@ -203,8 +207,10 @@ def __init__(
self.worker_kwargs = worker_kwargs

self.contact_address = contact_address
self.memory_terminate_fraction = dask.config.get(
"distributed.worker.memory.terminate"
self.memory_terminate_fraction = (
memory_terminate_fraction
if memory_terminate_fraction is not None
else dask.config.get("distributed.worker.memory.terminate")
)

self.services = services
Expand All @@ -231,7 +237,7 @@ def __init__(
"plugin_remove": self.plugin_remove,
}

self.plugins = {}
self.plugins: dict[str, NannyPlugin] = {}

super().__init__(
handlers=handlers, io_loop=self.loop, connection_args=self.connection_args
Expand Down
10 changes: 9 additions & 1 deletion distributed/system_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,18 @@ def recent(self):
except IndexError:
return {k: None for k, v in self.quantities.items()}

def get_process_memory(self) -> int:
"""Sample process memory, as reported by the OS.
This one-liner function exists so that it can be easily mocked in unit tests,
as the OS allocating and releasing memory is highly volatile and a constant
source of flakiness.
"""
return self.proc.memory_info().rss

def update(self):
with self.proc.oneshot():
cpu = self.proc.cpu_percent()
memory = self.proc.memory_info().rss
memory = self.get_process_memory()
now = time()

self.cpu.append(cpu)
Expand Down
237 changes: 91 additions & 146 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import distributed
from distributed import (
Client,
Event,
Nanny,
Reschedule,
default_client,
Expand Down Expand Up @@ -1287,6 +1288,7 @@ async def test_spill_constrained(c, s, w):
nthreads=[("", 1)],
client=True,
worker_kwargs=dict(
memory_limit="1000 MB",
memory_monitor_interval="10ms",
memory_target_fraction=False,
memory_spill_fraction=0.7,
Expand All @@ -1298,193 +1300,136 @@ async def test_spill_spill_threshold(c, s, a):
Test that the spill threshold uses the process memory and not the managed memory
reported by sizeof(), which may be inaccurate.
"""
# Reach 'spill' threshold after 400MB of managed data. We need to be generous in
# order to avoid flakiness due to fluctuations in unmanaged memory.
# FIXME https://github.com/dask/distributed/issues/5367
# This works just by luck for the purpose of the spill and pause thresholds,
# and does NOT work for the target threshold.
memory = psutil.Process().memory_info().rss
a.memory_limit = (memory + 300e6) / 0.7

class UnderReport:
"""100 MB process memory, 10 bytes reported managed memory"""

def __init__(self, *args):
self.data = "x" * int(100e6)

def __sizeof__(self):
return 10

def __reduce__(self):
"""Speed up test by writing very little to disk when spilling"""
return UnderReport, ()

futures = c.map(UnderReport, range(8))

a.monitor.get_process_memory = lambda: 800_000_000 if a.data.fast else 0
x = c.submit(inc, 0, key="x")
while not a.data.disk:
await asyncio.sleep(0.01)


async def assert_not_everything_is_spilled(w: Worker) -> None:
start = time()
while time() < start + 0.5:
assert w.data
if not w.data.memory: # type: ignore
# The hysteresis system fails on Windows and MacOSX because process memory
# is very slow to shrink down after calls to PyFree. As a result,
# Worker.memory_monitor will continue spilling until there's nothing left.
# Nothing we can do about this short of finding either a way to change this
# behaviour at OS level or a better measure of allocated memory.
assert not LINUX, "All data was spilled to disk"
raise pytest.xfail("https://github.com/dask/distributed/issues/5840")
await asyncio.sleep(0)
assert await x == 1


@requires_zict
@gen_cluster(
nthreads=[("", 1)],
client=True,
worker_kwargs=dict(
# FIXME https://github.com/dask/distributed/issues/5367
# Can't reconfigure the absolute target threshold after the worker
# started, so we're setting it here to something extremely small and then
# increasing the memory_limit dynamically below in order to test the
# spill threshold.
memory_limit=1,
memory_monitor_interval="10ms",
memory_target_fraction=False,
memory_spill_fraction=0.7,
memory_pause_fraction=False,
),
@pytest.mark.parametrize(
"memory_target_fraction,managed,expect_spilled",
[
# no target -> no hysteresis
# Over-report managed memory to test that the automated LRU eviction based on
# target is never triggered
(False, int(10e9), 1),
# Under-report managed memory, so that we reach the spill threshold for process
# memory without first reaching the target threshold for managed memory
# target == spill -> no hysteresis
(0.7, 0, 1),
# target < spill -> hysteresis from spill to target
(0.4, 0, 7),
],
)
async def test_spill_no_target_threshold(c, s, a):
"""Test that you can enable the spill threshold while leaving the target threshold
to False
@gen_cluster(nthreads=[], client=True)
async def test_spill_hysteresis(c, s, memory_target_fraction, managed, expect_spilled):
"""
1. Test that you can enable the spill threshold while leaving the target threshold
to False
2. Test the hysteresis system where, once you reach the spill threshold, the worker
won't stop spilling until the target threshold is reached
"""
memory = psutil.Process().memory_info().rss
a.memory_limit = (memory + 300e6) / 0.7 # 300 MB before we start spilling

class OverReport:
"""Configurable process memory, 10 GB reported managed memory"""

def __init__(self, size):
self.data = "x" * size

class C:
def __sizeof__(self):
return int(10e9)

def __reduce__(self):
"""Speed up test by writing very little to disk when spilling"""
return OverReport, (len(self.data),)

f1 = c.submit(OverReport, 0, key="f1")
await wait(f1)
assert set(a.data.memory) == {"f1"}

# 800 MB. Use large chunks to stimulate timely release of process memory.
futures = c.map(OverReport, range(int(100e6), int(100e6) + 8))

while not a.data.disk:
await asyncio.sleep(0.01)
assert "f1" in a.data.disk

# Spilling normally starts at the spill threshold and stops at the target threshold.
# In this special case, it stops as soon as the process memory goes below the spill
# threshold, e.g. without a hysteresis cycle. Test that we didn't instead dump the
# whole data to disk (memory_limit * target = 0)
await assert_not_everything_is_spilled(a)
return managed


@pytest.mark.slow
@requires_zict
@gen_cluster(
nthreads=[("", 1)],
client=True,
worker_kwargs=dict(
memory_limit="1 GiB", # See FIXME note in previous test
async with Worker(
s.address,
memory_limit="1000 MB",
memory_monitor_interval="10ms",
memory_target_fraction=0.4,
memory_target_fraction=memory_target_fraction,
memory_spill_fraction=0.7,
memory_pause_fraction=False,
),
)
async def test_spill_hysteresis(c, s, a):
memory = psutil.Process().memory_info().rss
a.memory_limit = (memory + 1e9) / 0.7 # Start spilling after 1 GB
) as a:
a.monitor.get_process_memory = lambda: 50_000_000 * len(a.data.fast)

# Under-report managed memory, so that we reach the spill threshold for process
# memory without first reaching the target threshold for managed memory
class UnderReport:
def __init__(self):
self.data = "x" * int(100e6) # 100 MB
# Add 500MB (reported) process memory. Spilling must not happen.
futures = [c.submit(C, pure=False) for _ in range(10)]
await wait(futures)
await asyncio.sleep(0.1)
assert not a.data.disk

def __sizeof__(self):
return 1
# Add another 250MB unmanaged memory. This must trigger the spilling.
futures += [c.submit(C, pure=False) for _ in range(5)]
await wait(futures)

def __reduce__(self):
"""Speed up test by writing very little to disk when spilling"""
return UnderReport, ()
# Wait until spilling starts. Then, wait until it stops.
prev_n = 0
while not a.data.disk or len(a.data.disk) > prev_n:
prev_n = len(a.data.disk)
await asyncio.sleep(0)
Comment on lines +1361 to +1363
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit fragile. As soon as the while loop in the memory monitor changes anything about how it awaits, e.g. another async call to evict data, this condition will break since we rely on skipping exactly one loop iteration with sleep(0).

I can't come up with a nicer way other than sleeping for a longer time so this is fine for me. I just wanted to raise this in case somebody else thinks of something

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another ugly thing we can do but I think I dislike this even more is to wait for _memory_monitoring to toggle since then we know for sure the memory_monitor is done. I think I prefer sticking with sleep(0) and deal with a test failure/test rewrite if that ever happens


max_in_memory = 0
futures = []
while not a.data.disk:
futures.append(c.submit(UnderReport, pure=False))
max_in_memory = max(max_in_memory, len(a.data.memory))
await wait(futures)
await asyncio.sleep(0.05)
max_in_memory = max(max_in_memory, len(a.data.memory))

# If there were no hysteresis, we would lose exactly 1 key.
# Note that, for this test to be meaningful, memory must shrink down readily when
# we deallocate Python objects. This is not always the case on Windows and MacOSX;
# on Linux we set MALLOC_TRIM to help in that regard.
# To verify that this test is useful, set target=spill and watch it fail.
while len(a.data.memory) > max_in_memory - 3:
await asyncio.sleep(0.01)
await assert_not_everything_is_spilled(a)
assert len(a.data.disk) == expect_spilled


@pytest.mark.slow
@gen_cluster(
nthreads=[("", 1)],
client=True,
worker_kwargs=dict(
memory_limit="1000 MB",
memory_monitor_interval="10ms",
memory_target_fraction=False,
memory_spill_fraction=False,
memory_pause_fraction=0.8,
),
)
async def test_pause_executor(c, s, a):
# See notes in test_spill_spill_threshold
memory = psutil.Process().memory_info().rss
a.memory_limit = (memory + 160e6) / 0.8 # Pause after 200 MB
mocked_rss = 0
a.monitor.get_process_memory = lambda: mocked_rss

# Note: it's crucial to have a very large single chunk of memory that gets descoped
# all at once in order to instigate release of process memory.
# Read: https://github.com/dask/distributed/issues/5840
def f():
# Add 400 MB unmanaged memory
x = "x" * int(400e6)
w = get_worker()
while w.status != Status.paused:
sleep(0.01)
# Task that is running when the worker pauses
ev_x = Event()

def f(ev):
ev.wait()
return 1

x = c.submit(f, ev_x, key="x")
while a.executing_count != 1:
await asyncio.sleep(0.01)

with captured_logger(logging.getLogger("distributed.worker")) as logger:
future = c.submit(f, key="x")
futures = c.map(slowinc, range(30), delay=0.1)
# Task that is queued on the worker when the worker pauses
y = c.submit(inc, 1, key="y")
while "y" not in a.tasks:
await asyncio.sleep(0.01)

while a.status != Status.paused:
# Hog the worker with 900MB unmanaged memory
mocked_rss = 900_000_000
while s.workers[a.address].status != Status.paused:
await asyncio.sleep(0.01)

assert "Pausing worker" in logger.getvalue()
assert sum(f.status == "finished" for f in futures) < 4

while a.status != Status.running:
# Task that is queued on the scheduler when the worker pauses.
# It is not sent to the worker.
z = c.submit(inc, 2, key="z")
while "z" not in s.tasks or s.tasks["z"].state != "no-worker":
await asyncio.sleep(0.01)

# Test that a task that already started when the worker paused can complete
# and its output can be retrieved. Also test that the now free slot won't be
# used by other tasks.
await ev_x.set()
assert await x == 1
await asyncio.sleep(0.05)

assert a.executing_count == 0
assert len(a.ready) == 1
assert a.tasks["y"].state == "ready"
assert "z" not in a.tasks

# Release the memory. Tasks that were queued on the worker are executed.
# Tasks that were stuck on the scheduler are sent to the worker and executed.
mocked_rss = 0
assert await y == 2
assert await z == 3

assert a.status == Status.running
assert "Resuming worker" in logger.getvalue()
await wait(futures)


@gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "50 ms"})
Expand Down
Loading