Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use async with Client: in tests #6921

Merged
merged 3 commits into from
Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 42 additions & 51 deletions distributed/deploy/tests/test_adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,50 +117,47 @@ def scale_up(self, n, **kwargs):

@gen_test()
async def test_min_max():
cluster = await LocalCluster(
async with LocalCluster(
n_workers=0,
silence_logs=False,
processes=False,
dashboard_address=":0",
asynchronous=True,
threads_per_worker=1,
)
try:
) as cluster:
adapt = cluster.adapt(minimum=1, maximum=2, interval="20 ms", wait_count=10)
c = await Client(cluster, asynchronous=True)

start = time()
while not cluster.scheduler.workers:
await asyncio.sleep(0.01)
assert time() < start + 1

await asyncio.sleep(0.2)
assert len(cluster.scheduler.workers) == 1
assert len(adapt.log) == 1 and adapt.log[-1][1] == {"status": "up", "n": 1}
async with Client(cluster, asynchronous=True) as c:
start = time()
while not cluster.scheduler.workers:
await asyncio.sleep(0.01)
assert time() < start + 1

futures = c.map(slowinc, range(100), delay=0.1)
await asyncio.sleep(0.2)
assert len(cluster.scheduler.workers) == 1
assert len(adapt.log) == 1 and adapt.log[-1][1] == {"status": "up", "n": 1}

start = time()
while len(cluster.scheduler.workers) < 2:
await asyncio.sleep(0.01)
assert time() < start + 1
futures = c.map(slowinc, range(100), delay=0.1)

assert len(cluster.scheduler.workers) == 2
await asyncio.sleep(0.5)
assert len(cluster.scheduler.workers) == 2
assert len(cluster.workers) == 2
assert len(adapt.log) == 2 and all(d["status"] == "up" for _, d in adapt.log)
start = time()
while len(cluster.scheduler.workers) < 2:
await asyncio.sleep(0.01)
assert time() < start + 1

assert len(cluster.scheduler.workers) == 2
await asyncio.sleep(0.5)
assert len(cluster.scheduler.workers) == 2
assert len(cluster.workers) == 2
assert len(adapt.log) == 2 and all(
d["status"] == "up" for _, d in adapt.log
)

del futures
del futures

start = time()
while len(cluster.scheduler.workers) != 1:
await asyncio.sleep(0.01)
assert time() < start + 2
assert adapt.log[-1][1]["status"] == "down"
finally:
await c.close()
await cluster.close()
start = time()
while len(cluster.scheduler.workers) != 1:
await asyncio.sleep(0.01)
assert time() < start + 2
assert adapt.log[-1][1]["status"] == "down"


@gen_test()
Expand Down Expand Up @@ -194,16 +191,14 @@ async def test_adapt_quickly():
Instead we want to wait a few beats before removing a worker in case the
user is taking a brief pause between work
"""
cluster = await LocalCluster(
async with LocalCluster(
n_workers=0,
asynchronous=True,
processes=False,
silence_logs=False,
dashboard_address=":0",
)
client = await Client(cluster, asynchronous=True)
adapt = cluster.adapt(interval="20 ms", wait_count=5, maximum=10)
try:
) as cluster, Client(cluster, asynchronous=True) as client:
adapt = cluster.adapt(interval="20 ms", wait_count=5, maximum=10)
future = client.submit(slowinc, 1, delay=0.100)
await wait(future)
assert len(adapt.log) == 1
Expand Down Expand Up @@ -241,9 +236,6 @@ async def test_adapt_quickly():

await asyncio.sleep(0.1)
assert len(cluster.workers) == 1
finally:
await client.close()
await cluster.close()


@gen_test()
Expand All @@ -255,20 +247,19 @@ async def test_adapt_down():
processes=False,
silence_logs=False,
dashboard_address=":0",
) as cluster:
async with Client(cluster, asynchronous=True) as client:
cluster.adapt(interval="20ms", maximum=5)
) as cluster, Client(cluster, asynchronous=True) as client:
cluster.adapt(interval="20ms", maximum=5)

futures = client.map(slowinc, range(1000), delay=0.1)
while len(cluster.scheduler.workers) < 5:
await asyncio.sleep(0.1)
futures = client.map(slowinc, range(1000), delay=0.1)
while len(cluster.scheduler.workers) < 5:
await asyncio.sleep(0.1)

cluster.adapt(maximum=2)
cluster.adapt(maximum=2)

start = time()
while len(cluster.scheduler.workers) != 2:
await asyncio.sleep(0.1)
assert time() < start + 60
start = time()
while len(cluster.scheduler.workers) != 2:
await asyncio.sleep(0.1)
assert time() < start + 60


@gen_test()
Expand Down
64 changes: 29 additions & 35 deletions distributed/deploy/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
import sys
from threading import Lock
from time import sleep
from unittest import mock
from urllib.parse import urlparse

import pytest
from tornado.httpclient import AsyncHTTPClient
from tornado.ioloop import IOLoop

from dask.system import CPU_COUNT

Expand Down Expand Up @@ -245,19 +245,21 @@ def test_Client_solo(loop):
@gen_test()
async def test_duplicate_clients():
pytest.importorskip("bokeh")
c1 = await Client(
async with Client(
processes=False, silence_logs=False, dashboard_address=9876, asynchronous=True
)
with pytest.warns(Warning) as info:
c2 = await Client(
processes=False,
silence_logs=False,
dashboard_address=9876,
asynchronous=True,
)

assert "dashboard" in c1.cluster.scheduler.services
assert "dashboard" in c2.cluster.scheduler.services
) as c1:
c1_services = c1.cluster.scheduler.services
with pytest.warns(Warning) as info:
async with Client(
processes=False,
silence_logs=False,
dashboard_address=9876,
asynchronous=True,
) as c2:
c2_services = c2.cluster.scheduler.services

assert c1_services == {"dashboard": mock.ANY}
assert c2_services == {"dashboard": mock.ANY}

assert any(
all(
Expand All @@ -266,8 +268,6 @@ async def test_duplicate_clients():
)
for msg in info.list
)
await c1.close()
await c2.close()


def test_Client_kwargs(loop):
Expand Down Expand Up @@ -824,35 +824,29 @@ class MyCluster(LocalCluster):
def scale_down(self, *args, **kwargs):
pass

loop = IOLoop.current()
cluster = await MyCluster(
async with MyCluster(
n_workers=0,
processes=False,
silence_logs=False,
dashboard_address=":0",
loop=loop,
loop=None,
asynchronous=True,
)
c = await Client(cluster, asynchronous=True)

assert not cluster.workers

await cluster.scale(2)
) as cluster, Client(cluster, asynchronous=True) as c:
assert not cluster.workers

start = time()
while len(cluster.scheduler.workers) != 2:
await asyncio.sleep(0.01)
assert time() < start + 3
await cluster.scale(2)

await cluster.scale(1)
start = time()
while len(cluster.scheduler.workers) != 2:
await asyncio.sleep(0.01)
assert time() < start + 3

start = time()
while len(cluster.scheduler.workers) != 1:
await asyncio.sleep(0.01)
assert time() < start + 3
await cluster.scale(1)

await c.close()
await cluster.close()
start = time()
while len(cluster.scheduler.workers) != 1:
await asyncio.sleep(0.01)
assert time() < start + 3


def test_local_tls_restart(loop):
Expand Down
Loading