diff --git a/distributed/tests/test_client_executor.py b/distributed/tests/test_client_executor.py index 49c1fc884aa..1019c2d59f8 100644 --- a/distributed/tests/test_client_executor.py +++ b/distributed/tests/test_client_executor.py @@ -14,9 +14,10 @@ import pytest from tlz import take +from distributed.event import Event from distributed.metrics import time from distributed.utils import CancelledError -from distributed.utils_test import inc, slowadd, slowdec, slowinc, throws, varying +from distributed.utils_test import inc, slowadd, slowinc, throws, varying def number_of_processing_tasks(client): @@ -52,26 +53,38 @@ def test_as_completed(client): def test_wait(client): + def block_inc(x, ev): + ev.wait() + return x + 1 + with client.get_executor(pure=False) as e: + ev = Event() N = 10 - fs = [e.submit(slowinc, i, delay=0.05) for i in range(N)] + fs = [e.submit(block_inc, i, ev, pure=False) for i in range(N)] res = wait(fs, timeout=0.01) assert len(res.not_done) > 0 + ev.set() res = wait(fs) assert len(res.not_done) == 0 assert res.done == set(fs) + ev.clear() - fs = [e.submit(slowinc, i, delay=0.05) for i in range(N)] + nthreads = sum(client.nthreads().values()) + fs = [e.submit(block_inc, i, ev, pure=False) for i in range(nthreads - 1)] + fs.append(e.submit(inc, 0)) + fs.extend([e.submit(block_inc, i, ev, pure=False) for i in range(nthreads, N)]) res = wait(fs, return_when=FIRST_COMPLETED) assert len(res.not_done) > 0 assert len(res.done) >= 1 + ev.set() res = wait(fs) assert len(res.not_done) == 0 assert res.done == set(fs) + ev.clear() - fs = [e.submit(slowinc, i, delay=0.05) for i in range(N)] + fs = [e.submit(inc, i) for i in range(N)] fs += [e.submit(throws, None)] - fs += [e.submit(slowdec, i, delay=0.05) for i in range(N)] + fs += [e.submit(block_inc, i, ev, pure=False) for i in range(N)] res = wait(fs, return_when=FIRST_EXCEPTION) assert any(f.exception() for f in res.done) assert res.not_done @@ -85,6 +98,7 @@ def test_wait(client): assert len(errors) == 1 assert "hello" in str(errors[0]) + ev.set() def test_cancellation(client):