Skip to content

Commit

Permalink
Generic polish
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Feb 26, 2024
1 parent 0bb1e85 commit fe55f41
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 9 deletions.
12 changes: 5 additions & 7 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1497,8 +1497,7 @@ async def __aexit__(self, exc_type, exc_value, traceback):
await self._close(
# if we're handling an exception, we assume that it's more
# important to deliver that exception than shutdown gracefully.
fast=exc_type
is not None
fast=(exc_type is not None)
)

def __exit__(self, exc_type, exc_value, traceback):
Expand Down Expand Up @@ -1669,16 +1668,15 @@ async def _wait_for_handle_report_task(self, fast=False):
await wait_for(handle_report_task, 0 if fast else 2)

@log_errors
async def _close(self, fast=False):
"""
Send close signal and wait until scheduler completes
async def _close(self, fast: bool = False) -> None:

Check warning on line 1671 in distributed/client.py

View check run for this annotation

Codecov / codecov/patch

distributed/client.py#L1671

Added line #L1671 was not covered by tests
"""Send close signal and wait until scheduler completes
If fast is True, the client will close forcefully, by cancelling tasks
the background _handle_report_task.
"""
# TODO: aclose more forcefully by aborting the RPC and cancelling all
# TODO: close more forcefully by aborting the RPC and cancelling all
# background tasks.
# see https://trio.readthedocs.io/en/stable/reference-io.html#trio.aclose_forcefully
# See https://trio.readthedocs.io/en/stable/reference-io.html#trio.aclose_forcefully
if self.status == "closed":
return

Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5196,7 +5196,7 @@ def test_quiet_client_close(loop):
threads_per_worker=4,
) as c:
futures = c.map(slowinc, range(1000), delay=0.01)
sleep(0.200) # stop part-way
sleep(0.2) # stop part-way
sleep(0.1) # let things settle

out = logger.getvalue()
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3058,7 +3058,7 @@ async def connect(self, *args, **kwargs):


@gen_cluster(client=True)
async def test_gather_failing_cnn_recover(c, s, a, b):
async def test_gather_failing_can_recover(c, s, a, b):
x = await c.scatter({"x": 1}, workers=a.address)
rpc = await FlakyConnectionPool(failing_connections=1)
with mock.patch.object(s, "rpc", rpc), dask.config.set(
Expand Down

0 comments on commit fe55f41

Please sign in to comment.