Skip to content

Commit

Permalink
improve resilience of test_pause_executor
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Feb 23, 2022
1 parent ab561c2 commit ef0e44b
Showing 1 changed file with 25 additions and 22 deletions.
47 changes: 25 additions & 22 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1449,44 +1449,47 @@ async def nspilled() -> int:
@pytest.mark.slow
@gen_cluster(
nthreads=[("", 1)],
# Run the test in a freshly spawned interpreter to ensure a clear memory situation,
# as opposed to the potentially heavily fragmented and unpredictable condition of
# the process used to run all the tests so far
Worker=Nanny,
client=True,
worker_kwargs=dict(
# pause after ~300 MiB, assuming ~100 MiB unmanaged memory
memory_limit="500 MiB",
memory_monitor_interval="10ms",
memory_target_fraction=False,
memory_spill_fraction=False,
memory_pause_fraction=0.8,
memory_terminate_fraction=False,
),
)
async def test_pause_executor(c, s, a):
# See notes in test_spill_spill_threshold
memory = psutil.Process().memory_info().rss
a.memory_limit = (memory + 160e6) / 0.8 # Pause after 200 MB
async def test_pause_executor(c, s, nanny):
ws = s.workers[nanny.worker_address]

# Note: it's crucial to have a very large single chunk of memory that gets descoped
# all at once in order to instigate release of process memory.
# Read: https://github.com/dask/distributed/issues/5840
def f():
# Add 400 MB unmanaged memory
x = "x" * int(400e6)
# Add 1 GiB unmanaged memory
# Note: it's crucial to have a very large single chunk of memory that gets
# descoped all at once in order to instigate release of process memory. Read:
# https://github.com/dask/distributed/issues/5840
x = "x" * 2**30
w = get_worker()
while w.status != Status.paused:
sleep(0.01)

with captured_logger(logging.getLogger("distributed.worker")) as logger:
future = c.submit(f, key="x")
futures = c.map(slowinc, range(30), delay=0.1)

while a.status != Status.paused:
await asyncio.sleep(0.01)

assert "Pausing worker" in logger.getvalue()
assert sum(f.status == "finished" for f in futures) < 4
assert ws.status == Status.running
x = c.submit(f, key="x")
while "x" not in s.tasks:
await asyncio.sleep(0.01)
futures = c.map(slowinc, range(8), delay=0.1)

while a.status != Status.running:
await asyncio.sleep(0.01)
while ws.status != Status.paused:
await asyncio.sleep(0.01)

assert "Resuming worker" in logger.getvalue()
await wait(futures)
assert sum(f.status == "finished" for f in futures) < 4
# Wait for unpause
await wait(futures)
assert ws.status == Status.running


@gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "50 ms"})
Expand Down

0 comments on commit ef0e44b

Please sign in to comment.