Skip to content

Commit

Permalink
Break the state machine
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Jun 30, 2022
1 parent c857e92 commit f0bf899
Showing 1 changed file with 27 additions and 0 deletions.
27 changes: 27 additions & 0 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1206,6 +1206,33 @@ def f(x):
assert all(f.key in b.data for f in futures)


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_reschedule_released(c, s, a):
"""By the time the task raises Reschedule(), the client has released the task."""
ev1 = Event()
ev2 = Event()

def f(ev1, ev2):
ev1.set()
ev2.wait()
raise Reschedule()

x = c.submit(f, ev1, ev2, key="x")
await ev1.wait()
x.release()
while "x" in s.tasks:
await asyncio.sleep(0.01)

with captured_logger("distributed.scheduler") as logger:
await ev2.set()
while "x" in a.tasks:
await asyncio.sleep(0.01)
assert (
"Attempting to reschedule task x, which was not found on the scheduler"
in logger.getvalue()
)


@gen_cluster(nthreads=[])
async def test_deque_handler(s):
from distributed.worker import logger
Expand Down

0 comments on commit f0bf899

Please sign in to comment.