Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow pausing and choke event loop while spilling #6189

Merged
merged 14 commits into from
Apr 27, 2022

Conversation

@crusaderky crusaderky self-assigned this Apr 24, 2022
@crusaderky crusaderky changed the title Pause in the middle of spilling Allow pause and choke event loop while spilling Apr 24, 2022
@crusaderky crusaderky mentioned this pull request Apr 24, 2022
3 tasks
@github-actions
Copy link
Contributor

github-actions bot commented Apr 24, 2022

Unit Test Results

       15 files   -        1         15 suites   - 1   6h 40m 17s ⏱️ - 1h 6m 35s
  2 741 tests +       2    2 659 ✔️ +       2    80 💤  -     1  2 +1 
20 277 runs   - 1 520  19 332 ✔️  - 1 414  939 💤  - 111  6 +5 

For more details on these failures, see this check.

Results for commit 4f7b8e9. ± Comparison against base commit 84cbb09.

♻️ This comment has been updated with latest results.

@crusaderky crusaderky added stability Issue or feature related to cluster stability (e.g. deadlock) memory labels Apr 25, 2022
@crusaderky
Copy link
Collaborator Author

The test failure has nothing to do with the test itself - it seems that the OS simply made the spill directory disappear halfway through!!!

2022-04-25 11:46:04,776 - distributed.spill - ERROR - Spill to disk failed; keeping data in memory
[...]
  File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.10/site-packages/zict/file.py", line 99, in __setitem__
    with open(fn, "wb") as fh:
FileNotFoundError: [Errno 2] No such file or directory: '/Users/runner/work/distributed/distributed/dask-worker-space/worker-b9pix35w/storage/SlowSpill-69cfb69f-e595-4a37-9f59-c1cdd6e79f44'

@crusaderky crusaderky changed the title Allow pause and choke event loop while spilling Allow pausing and choke event loop while spilling Apr 25, 2022
crusaderky added a commit to crusaderky/distributed that referenced this pull request Apr 25, 2022
# of small values. This artificially chokes the rest of the event loop -
# namely, the reception of new data from other workers.
# DO NOT tweak this without thorough stress testing.
# See https://github.com/dask/distributed/issues/6110.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a note for future readers that this event loop blocking is a bit of a hack? I'm assuming in the future, we might properly pause during eviction once we have #5702?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's closed by #6195 :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added note

await asyncio.sleep(0)
last_yielded = now = monotonic()

# Spilling may potentially take multiple seconds; we may pass the pause
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not check for pause before we potentially yield? If we've made it 0.5sec without yielding, we've already passed the default memory monitor interval (100ms). So before we return control to the event loop, we should probably make sure the event loop doesn't immediately try to execute new tasks, transfer data, etc. if memory is high enough that it should be paused.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call; switched

# https://github.com/dask/distributed/issues/1371).
# We should regain control of the event loop every 0.5s.
c = Counter(round(t1 - t0, 1) for t0, t1 in zip(ts, ts[1:]))
# with sleep(0) every 0.5s: {0.0: 315, 0.5: 4}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify that you're referring to the last_yielded sleep(0) in _maybe_spill, not the sleep(0) in this test a few lines above?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"distributed.worker.memory.monitor-interval": "10ms",
},
)
async def test_pause_while_spilling(c, s, a):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to test not just that the paused state gets set, but that the pausing actually happens fast enough to prevent more work from running? (Corollary to my other comment.)

This is testing a race condition (how long tasks take to run vs the memory monitor interval).

For example, if you add a sleep(memory_monitor_interval / 5) to SlowSpill.__init__ and submit N*2 tasks, it would be good to verify that between N and N + 5 tasks have executed (possibly 1 fewer would be in memory) once we've paused. And the other ~N are waiting to run, but blocked by the pause.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds very brittle, as it would be heavily dependent on the responsiveness of the disk on the github CI.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about instead of sleep(0.002) in SlowSpill.__init__, which makes it hard to know exactly how many tasks will run before the memory monitor kicks in, we have a threading.Semaphore(50) that SlowSpill.__init__ acquires? So we're guaranteed that exactly 50 SlowSpills can be created.

Then I'm not following why it depends on disk? Nothing should be written to disk until we enter _maybe_spill. At that point there would be 50 SlowSpills in self.data to evict.

  1. We evict the first one. This immediately means the patched get_process_memory will indicate we're over the paused threshold. The worker isn't paused yet, so __reduce__ sleeps for 100ms. Let's also add a semaphore.release() in __reduce__.
  2. This single eviction takes longer than the monitor-interval. So we check _maybe_pause_or_unpause, and pause the worker. But we don't give control back to the event loop yet, because it hasn't been 0.5 sec.
  3. The worker is now paused, so subsequent evictions don't sleep. If disk is fast, maybe we can evict all of them. If it's slow, we may return control the event loop multiple times before eviction is complete.

But regardless, since we paused the worker almost immediately, and the patched get_process_memory will never let it be un-paused, returning control to the event loop should be innocuous. We should be able to assert that no new tasks started running. len(a.data) should be 50 or 51 (there might be 1 extra that started before pause, blocked on the semaphore, then completed during eviction), there should be no tasks running on the threadpool, and all the other tasks should still be in a.ready.

This wouldn't have been the case before https://github.com/dask/distributed/pull/6189/files#r857947472, that's why I'd like to test it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, but threading.Semaphore and multiprocessing.Semaphore can't be pickled and distributed.Semaphore.release does not seem to work inside SlowSpill.__reduce__. This is hardly surprising - get_worker() and get_client() were never designed to work outside ot task execution.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can play with it yourself: crusaderky@e19c996

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah true, that's annoying. I got this to work

diff --git a/distributed/tests/test_worker_memory.py b/distributed/tests/test_worker_memory.py
index c4ce4fd6..83d751f3 100644
--- a/distributed/tests/test_worker_memory.py
+++ b/distributed/tests/test_worker_memory.py
@@ -3,6 +3,7 @@ from __future__ import annotations
 import asyncio
 import logging
 from collections import Counter, UserDict
+import threading
 from time import sleep
 
 import pytest
@@ -649,8 +650,11 @@ async def test_nanny_terminate(c, s, a):
     },
 )
 async def test_pause_while_spilling(c, s, a):
+    N_PAUSE = 3
+    N_TOTAL = 5
+
     def get_process_memory():
-        if len(a.data) < 3:
+        if len(a.data) < N_PAUSE:
             # Don't trigger spilling until after all tasks have completed
             return 0
         elif a.data.fast and not a.data.slow:
@@ -663,25 +667,44 @@ async def test_pause_while_spilling(c, s, a):
     a.monitor.get_process_memory = get_process_memory
 
     class SlowSpill:
-        def __init__(self, _, sem: distributed.Semaphore):
-            self.sem = sem
-            # Block if there are 50 tasks in a.data.fast
-            sem.acquire()
+        # Can't pickle a `Semaphore`, so instead of a default value, we create it in `__init__`.
+        # Don't worry about race conditions; the worker is single-threaded.
+        sem: threading.Semaphore
+
+        def __init__(self, _):
+            try:
+                sem = self.sem
+            except AttributeError:
+                type(self).sem = threading.Semaphore(N_PAUSE)
+
+            # Block if there are `N_PAUSE` tasks in a.data.fast
+            self.sem.acquire()
 
         def __reduce__(self):
             paused = distributed.get_worker().status == Status.paused
+            self.sem.release()
             if not paused:
                 sleep(0.1)  # This is 10x the memory monitor interval
-            self.sem.release()
             return bool, (paused,)
 
-    sem = await distributed.Semaphore(3)
-    futs = c.map(SlowSpill, range(5), sem=sem)
-    while len(a.data.slow) < 3:
+    futs = c.map(SlowSpill, range(N_TOTAL))
+    while len(a.data.slow) < N_PAUSE:
         await asyncio.sleep(0.01)
     assert a.status == Status.paused
-    assert any(sp is True for sp in a.data.slow.values())
-    assert sum(ts.state == "ready" for ts in a.tasks.values()) == 2
+
+    assert not a.data.fast
+    assert len(a.data.slow) == N_PAUSE
+    # Worker should have become paused after the first `SlowSpill` was evicted,
+    # because the pickling (to write to disk) slept for longer than the memory monitor interval.
+    assert sum(sp is True for sp in a.data.slow.values()) == N_PAUSE - 1
+    # ^ NOTE: our hacked `__reduce__` means `SlowSpill` pickles into `bool(paused)`
+
+    # Due to pausing, no more tasks should have started. However, 1 might have already been executing
+    # (and is now complete) when the memory monitor kicked in.
+    assert sum(ts.state == "ready" for ts in a.tasks.values()) in (
+        N_TOTAL - N_PAUSE,
+        N_TOTAL - N_PAUSE - 1,
+    )

But I played with it a bit, and I can't get the test to fail. Changing the order of the _maybe_pause_or_unpause vs sleep(0), adding more tasks, adding more sleeps in __reduce__, etc. The only thing that could break it was reordering the statements and changing the sleep(0) to a longer value (like await sleep(0.5)). So it seems like sleep(0) doesn't give much chance for other things to use the event loop.

I'm feeling like we probably don't need to test this.

crusaderky added a commit to crusaderky/distributed that referenced this pull request Apr 25, 2022
@crusaderky
Copy link
Collaborator Author

@gjoseph92 addressed review comments

"distributed.worker.memory.monitor-interval": "10ms",
},
)
async def test_pause_while_spilling(c, s, a):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah true, that's annoying. I got this to work

diff --git a/distributed/tests/test_worker_memory.py b/distributed/tests/test_worker_memory.py
index c4ce4fd6..83d751f3 100644
--- a/distributed/tests/test_worker_memory.py
+++ b/distributed/tests/test_worker_memory.py
@@ -3,6 +3,7 @@ from __future__ import annotations
 import asyncio
 import logging
 from collections import Counter, UserDict
+import threading
 from time import sleep
 
 import pytest
@@ -649,8 +650,11 @@ async def test_nanny_terminate(c, s, a):
     },
 )
 async def test_pause_while_spilling(c, s, a):
+    N_PAUSE = 3
+    N_TOTAL = 5
+
     def get_process_memory():
-        if len(a.data) < 3:
+        if len(a.data) < N_PAUSE:
             # Don't trigger spilling until after all tasks have completed
             return 0
         elif a.data.fast and not a.data.slow:
@@ -663,25 +667,44 @@ async def test_pause_while_spilling(c, s, a):
     a.monitor.get_process_memory = get_process_memory
 
     class SlowSpill:
-        def __init__(self, _, sem: distributed.Semaphore):
-            self.sem = sem
-            # Block if there are 50 tasks in a.data.fast
-            sem.acquire()
+        # Can't pickle a `Semaphore`, so instead of a default value, we create it in `__init__`.
+        # Don't worry about race conditions; the worker is single-threaded.
+        sem: threading.Semaphore
+
+        def __init__(self, _):
+            try:
+                sem = self.sem
+            except AttributeError:
+                type(self).sem = threading.Semaphore(N_PAUSE)
+
+            # Block if there are `N_PAUSE` tasks in a.data.fast
+            self.sem.acquire()
 
         def __reduce__(self):
             paused = distributed.get_worker().status == Status.paused
+            self.sem.release()
             if not paused:
                 sleep(0.1)  # This is 10x the memory monitor interval
-            self.sem.release()
             return bool, (paused,)
 
-    sem = await distributed.Semaphore(3)
-    futs = c.map(SlowSpill, range(5), sem=sem)
-    while len(a.data.slow) < 3:
+    futs = c.map(SlowSpill, range(N_TOTAL))
+    while len(a.data.slow) < N_PAUSE:
         await asyncio.sleep(0.01)
     assert a.status == Status.paused
-    assert any(sp is True for sp in a.data.slow.values())
-    assert sum(ts.state == "ready" for ts in a.tasks.values()) == 2
+
+    assert not a.data.fast
+    assert len(a.data.slow) == N_PAUSE
+    # Worker should have become paused after the first `SlowSpill` was evicted,
+    # because the pickling (to write to disk) slept for longer than the memory monitor interval.
+    assert sum(sp is True for sp in a.data.slow.values()) == N_PAUSE - 1
+    # ^ NOTE: our hacked `__reduce__` means `SlowSpill` pickles into `bool(paused)`
+
+    # Due to pausing, no more tasks should have started. However, 1 might have already been executing
+    # (and is now complete) when the memory monitor kicked in.
+    assert sum(ts.state == "ready" for ts in a.tasks.values()) in (
+        N_TOTAL - N_PAUSE,
+        N_TOTAL - N_PAUSE - 1,
+    )

But I played with it a bit, and I can't get the test to fail. Changing the order of the _maybe_pause_or_unpause vs sleep(0), adding more tasks, adding more sleeps in __reduce__, etc. The only thing that could break it was reordering the statements and changing the sleep(0) to a longer value (like await sleep(0.5)). So it seems like sleep(0) doesn't give much chance for other things to use the event loop.

I'm feeling like we probably don't need to test this.

@crusaderky
Copy link
Collaborator Author

But I played with it a bit, and I can't get the test to fail. Changing the order of the _maybe_pause_or_unpause vs sleep(0), adding more tasks, adding more sleeps in reduce, etc. The only thing that could break it was reordering the statements and changing the sleep(0) to a longer value (like await sleep(0.5)). So it seems like sleep(0) doesn't give much chance for other things to use the event loop.

Why would any of those matter? The test checks that we're invoking _maybe_pause_or_unpause in the middle of the spill cycle. If I comment out that one line, the test breaks.
I've applied your suggestion.

@crusaderky crusaderky merged commit 6d1c68a into dask:main Apr 27, 2022
@crusaderky crusaderky deleted the pause_while_spilling branch April 27, 2022 13:53
@mrocklin
Copy link
Member

Woot. Thanks everyone for this.

@gjoseph92
Copy link
Collaborator

@crusaderky I was trying to also test that if the event loop was released in the middle of a spill cycle, the worker wouldn't start executing a new task (because it released the event loop before invoking _maybe_pause_or_unpause).

@crusaderky
Copy link
Collaborator Author

At the very least, you'll need one event loop cycle to run _ensure_computing, another to run execute, and a few cycles (and some actual sleep, or you'll have GIL contention) to let self.loop.run_in_executor finish.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
memory stability Issue or feature related to cluster stability (e.g. deadlock)
Projects
None yet
3 participants