Skip to content

Commit

Permalink
Remove other test_reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed Apr 28, 2022
1 parent b59ccd0 commit a08b5dd
Showing 1 changed file with 1 addition and 70 deletions.
71 changes: 1 addition & 70 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,8 @@
wait,
)
from distributed.cluster_dump import load_cluster_dump
from distributed.comm import CommClosedError
from distributed.compatibility import LINUX, WINDOWS
from distributed.core import Server, Status
from distributed.core import Status
from distributed.metrics import time
from distributed.profile import wait_profiler
from distributed.scheduler import CollectTaskMetaDataPlugin, KilledWorker, Scheduler
Expand Down Expand Up @@ -3576,74 +3575,6 @@ async def test_scatter_raises_if_no_workers(c, s):
await c.scatter(1, timeout=0.5)


@pytest.mark.flaky(reruns=2)
@gen_test()
async def test_reconnect():
port = random.randint(10000, 50000)

async def hard_stop(s):
for pc in s.periodic_callbacks.values():
pc.stop()

s.stop_services()
for comm in list(s.stream_comms.values()):
comm.abort()
for comm in list(s.client_comms.values()):
comm.abort()

await s.rpc.close()
s.stop()
await Server.close(s)

futures = []
w = Worker(f"127.0.0.1:{port}")
futures.append(asyncio.ensure_future(w.start()))

s = await Scheduler(port=port)
c = await Client(f"127.0.0.1:{port}", asynchronous=True)
await c.wait_for_workers(1, timeout=10)
x = c.submit(inc, 1)
assert (await x) == 2
await hard_stop(s)

start = time()
while c.status != "connecting":
assert time() < start + 10
await asyncio.sleep(0.01)

assert x.status == "cancelled"
with pytest.raises(CancelledError):
await x

s = await Scheduler(port=port)
start = time()
while c.status != "running":
await asyncio.sleep(0.1)
assert time() < start + 10
start = time()
while len(await c.nthreads()) != 1:
await asyncio.sleep(0.05)
assert time() < start + 10

x = c.submit(inc, 1)
assert (await x) == 2
await hard_stop(s)

start = time()
while True:
assert time() < start + 10
try:
await x
assert False
except CommClosedError:
continue
except CancelledError:
break

await w.close(report=False)
await c._close(fast=True)


class UnhandledException(Exception):
pass

Expand Down

0 comments on commit a08b5dd

Please sign in to comment.