Skip to content

Commit

Permalink
Fix flaky test_in_flight_lost_after_resumed (#6372)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored May 19, 2022
1 parent 4d29246 commit 33fc50c
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions distributed/tests/test_cancelled_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import distributed
from distributed import Event, Lock, Worker
from distributed.client import wait
from distributed.utils_test import (
_LockedCommPool,
assert_story,
Expand Down Expand Up @@ -264,11 +265,12 @@ async def test_in_flight_lost_after_resumed(c, s, b):
block_get_data = asyncio.Lock()
in_get_data = asyncio.Event()

await block_get_data.acquire()
lock_executing = Lock()

def block_execution(lock):
with lock:
return
return 1

class BlockedGetData(Worker):
async def get_data(self, comm, *args, **kwargs):
Expand All @@ -281,15 +283,12 @@ async def get_data(self, comm, *args, **kwargs):
block_execution,
lock_executing,
workers=[a.address],
allow_other_workers=True,
key="fut1",
)
# Ensure fut1 is in memory but block any further execution afterwards to
# ensure we control when the recomputation happens
await fut1
await wait(fut1)
await lock_executing.acquire()
in_get_data.clear()
await block_get_data.acquire()
fut2 = c.submit(inc, fut1, workers=[b.address], key="fut2")

# This ensures that B already fetches the task, i.e. after this the task
Expand All @@ -298,6 +297,7 @@ async def get_data(self, comm, *args, **kwargs):
assert fut1.key in b.tasks
assert b.tasks[fut1.key].state == "flight"

s.set_restrictions({fut1.key: [a.address, b.address]})
# It is removed, i.e. get_data is guaranteed to fail and f1 is scheduled
# to be recomputed on B
await s.remove_worker(a.address, "foo", close=False, safe=True)
Expand Down

0 comments on commit 33fc50c

Please sign in to comment.