Skip to content

Commit

Permalink
Revert dask#5883
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Mar 18, 2022
1 parent 2d3fddc commit d72d87a
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 48 deletions.
34 changes: 0 additions & 34 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3306,37 +3306,3 @@ async def test_Worker__to_dict(c, s, a):
}
assert d["tasks"]["x"]["key"] == "x"
assert d["data"] == ["x"]


@gen_cluster(nthreads=[])
async def test_do_not_block_event_loop_during_shutdown(s):
loop = asyncio.get_running_loop()
called_handler = threading.Event()
block_handler = threading.Event()

w = await Worker(s.address)
executor = w.executors["default"]

# The block wait must be smaller than the test timeout and smaller than the
# default value for timeout in `Worker.close``
async def block():
def fn():
called_handler.set()
assert block_handler.wait(20)

await loop.run_in_executor(executor, fn)

async def set_future():
while True:
try:
await loop.run_in_executor(executor, sleep, 0.1)
except RuntimeError: # executor has started shutting down
block_handler.set()
return

async def close():
called_handler.wait()
# executor_wait is True by default but we want to be explicit here
await w.close(executor_wait=True)

await asyncio.gather(block(), close(), set_future())
19 changes: 5 additions & 14 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
from distributed.comm import connect, get_address_host
from distributed.comm.addressing import address_from_user_args, parse_address
from distributed.comm.utils import OFFLOAD_THRESHOLD
from distributed.compatibility import to_thread
from distributed.core import (
CommClosedError,
Status,
Expand Down Expand Up @@ -1478,19 +1477,11 @@ async def close(
for executor in self.executors.values():
if executor is utils._offload_executor:
continue # Never shutdown the offload executor

def _close():
if isinstance(executor, ThreadPoolExecutor):
executor._work_queue.queue.clear()
executor.shutdown(wait=executor_wait, timeout=timeout)
else:
executor.shutdown(wait=executor_wait)

# Waiting for the shutdown can block the event loop causing
# weird deadlocks particularly if the task that is executing in
# the thread is waiting for a server reply, e.g. when using
# worker clients, semaphores, etc.
await to_thread(_close)
if isinstance(executor, ThreadPoolExecutor):
executor._work_queue.queue.clear()
executor.shutdown(wait=executor_wait, timeout=timeout)
else:
executor.shutdown(wait=executor_wait)

self.stop()
await self.rpc.close()
Expand Down

0 comments on commit d72d87a

Please sign in to comment.