Skip to content

Commit

Permalink
Fix regressions in dask#4651 (dask#4719)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Apr 21, 2021
1 parent e4b534a commit f492aa7
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cancel.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Cancel

on:
workflow_run:
workflows: ["Tests"]
workflows: [Tests]
types:
- requested

Expand Down
11 changes: 9 additions & 2 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,9 @@ def metrics(self):
@property
def memory(self) -> MemoryState:
return MemoryState(
process=self._metrics["memory"],
# metrics["memory"] is None if the worker sent a heartbeat before its
# SystemMonitor ever had a chance to run
process=self._metrics["memory"] or 0,
managed=self._nbytes,
managed_spilled=self._metrics["spilled_nbytes"],
unmanaged_old=self._memory_unmanaged_old,
Expand Down Expand Up @@ -3833,7 +3835,12 @@ def heartbeat_worker(
if size == memory_unmanaged_old:
memory_unmanaged_old = 0 # recalculate min()

size = max(0, metrics["memory"] - ws._nbytes + ws._metrics["spilled_nbytes"])
# metrics["memory"] is None if the worker sent a heartbeat before its
# SystemMonitor ever had a chance to run.
# ws._nbytes is updated at a different time and sizeof() may not be accurate,
# so size may be (temporarily) negative; floor it to zero.
size = max(0, (metrics["memory"] or 0) - ws._nbytes + metrics["spilled_nbytes"])

ws._memory_other_history.append((local_now, size))
if not memory_unmanaged_old:
# The worker has just been started or the previous minimum has been expunged
Expand Down
31 changes: 23 additions & 8 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2330,6 +2330,10 @@ def assert_memory(scheduler_or_workerstate, attr: str, min_, max_, timeout=10):
sleep(0.1)


# This test is heavily influenced by hard-to-control factors such as memory management
# by the Python interpreter and the OS, so it occasionally glitches
@pytest.mark.flaky(reruns=3, reruns_delay=5)
# ~33s runtime, or distributed.memory.recent_to_old_time + 3s
@pytest.mark.slow
def test_memory():
pytest.importorskip("zict")
Expand Down Expand Up @@ -2379,34 +2383,45 @@ def test_memory():
]
sleep(2)
assert_memory(s, "managed_spilled", 1, 999)
# Wait for the spilling to finish. Note that this does not make the test take
# longer as we're waiting for recent_to_old_time anyway.
sleep(10)

# Delete spilled keys
prev = s.memory
del f1
del f2
assert_memory(s, "managed_spilled", 0, prev.managed_spilled / 2 ** 20 - 1)
assert s.memory.managed_in_memory == prev.managed_in_memory
assert_memory(s, "managed_spilled", 0, prev.managed_spilled / 2 ** 20 - 19)

# Empty the cluster, with the exception of leaked memory
del more_futs
assert_memory(s, "managed", 0, 0)

orig_unmanaged = s_m0.unmanaged / 2 ** 20
orig_old = s_m0.unmanaged_old / 2 ** 20

# Wait until 30s have passed since the spill to observe unmanaged_recent
# transition into unmanaged_old
c.run(gc.collect)
orig_unmanaged = s_m0.unmanaged / 2 ** 20
orig_old = s_m0.unmanaged_old / 2 ** 20
assert_memory(s, "unmanaged_old", orig_old + 90, orig_old + 190, timeout=40)
assert_memory(s, "unmanaged_recent", 0, 90, timeout=40)
assert_memory(
s,
"unmanaged_old",
orig_old + 90,
# On MacOS, the process memory of the Python interpreter does not shrink as
# fast as on Linux/Windows
9999 if MACOS else orig_old + 190,
timeout=40,
)

# When the leaked memory is cleared, unmanaged and unmanaged_old drop
# This doesn't happen on MacOS, where the process memory of the Python
# interpreter does not shrink (or takes much longer to shrink)
# On MacOS, the process memory of the Python interpreter does not shrink as fast
# as on Linux/Windows
if not MACOS:
c.run(clear_leak)
assert_memory(s, "unmanaged", 0, orig_unmanaged + 95)
assert_memory(s, "unmanaged_old", 0, orig_old + 95)
assert_memory(s, "unmanaged_recent", 0, 95)
assert_memory(s, "unmanaged_recent", 0, 90)


@gen_cluster(client=True, worker_kwargs={"memory_limit": 0})
Expand Down

0 comments on commit f492aa7

Please sign in to comment.