We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
test_steal_twice
On Windows, 3.8: https://github.com/dask/distributed/pull/4925/checks?check_run_id=2889177384#step:10:2191
______________________________ test_steal_twice _______________________________ def test_func(): result = None workers = [] with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop: async def coro(): with dask.config.set(config): s = False for _ in range(60): try: s, ws = await start_cluster( nthreads, scheduler, loop, security=security, Worker=Worker, scheduler_kwargs=scheduler_kwargs, worker_kwargs=worker_kwargs, ) except Exception as e: logger.error( "Failed to start gen_cluster: " f"{e.__class__.__name__}: {e}; retrying", exc_info=True, ) await asyncio.sleep(1) else: workers[:] = ws args = [s] + workers break if s is False: raise Exception("Could not start cluster") if client: c = await Client( s.address, loop=loop, security=security, asynchronous=True, **client_kwargs, ) args = [c] + args try: future = func(*args) if timeout: future = asyncio.wait_for(future, timeout) result = await future if s.validate: s.validate_state() finally: if client and c.status not in ("closing", "closed"): await c._close(fast=s.status == Status.closed) await end_cluster(s, workers) await asyncio.wait_for(cleanup_global_workers(), 1) try: c = await default_client() except ValueError: pass else: await c._close(fast=True) def get_unclosed(): return [c for c in Comm._instances if not c.closed()] + [ c for c in _global_clients.values() if c.status != "closed" ] try: start = time() while time() < start + 60: gc.collect() if not get_unclosed(): break await asyncio.sleep(0.05) else: if allow_unclosed: print(f"Unclosed Comms: {get_unclosed()}") else: raise RuntimeError("Unclosed Comms", get_unclosed()) finally: Comm._instances.clear() _global_clients.clear() return result > result = loop.run_sync( coro, timeout=timeout * 2 if timeout else timeout ) distributed\utils_test.py:966: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\ioloop.py:530: in run_sync return future_cell[0].result() distributed\utils_test.py:925: in coro result = await future _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ fut = <Task cancelled name='Task-157632' coro=<test_steal_twice() done, defined at D:\a\distributed\distributed\distributed\tests\test_steal.py:657>> timeout = 30 async def wait_for(fut, timeout, *, loop=None): """Wait for the single Future or coroutine to complete, with timeout. Coroutine will be wrapped in Task. Returns result of the Future or coroutine. When a timeout occurs, it cancels the task and raises TimeoutError. To avoid the task cancellation, wrap it in shield(). If the wait is cancelled, the task is also cancelled. This function is a coroutine. """ if loop is None: loop = events.get_running_loop() else: warnings.warn("The loop argument is deprecated since Python 3.8, " "and scheduled for removal in Python 3.10.", DeprecationWarning, stacklevel=2) if timeout is None: return await fut if timeout <= 0: fut = ensure_future(fut, loop=loop) if fut.done(): return fut.result() await _cancel_and_wait(fut, loop=loop) try: fut.result() except exceptions.CancelledError as exc: raise exceptions.TimeoutError() from exc else: raise exceptions.TimeoutError() waiter = loop.create_future() timeout_handle = loop.call_later(timeout, _release_waiter, waiter) cb = functools.partial(_release_waiter, waiter) fut = ensure_future(fut, loop=loop) fut.add_done_callback(cb) try: # wait until the future completes or the timeout try: await waiter except exceptions.CancelledError: if fut.done(): return fut.result() else: fut.remove_done_callback(cb) # We must ensure that the task is not running # after wait_for() returns. # See https://bugs.python.org/issue32751 await _cancel_and_wait(fut, loop=loop) raise if fut.done(): return fut.result() else: fut.remove_done_callback(cb) # We must ensure that the task is not running # after wait_for() returns. # See https://bugs.python.org/issue32751 await _cancel_and_wait(fut, loop=loop) > raise exceptions.TimeoutError() E asyncio.exceptions.TimeoutError C:\Miniconda3\envs\dask-distributed\lib\asyncio\tasks.py:501: TimeoutError
The text was updated successfully, but these errors were encountered:
decide_worker
I ran this test many times per environment between 6 and 17 aug over 3 mass stress test runs.
It failed 38 times out of 66 runs on Windows 3.7. It failed only once out of 512 runs on all other environments combined.
The error I'm getting is, invariably,
assert a.in_flight_tasks == 0 > assert b.in_flight_tasks == 0 E assert 1 == 0 E +1 E -0
Sorry, something went wrong.
Mark test_steal_twice as flaky (dask#4957)
ff5bb13
test_steal_twice has not appeared as a flaking test on the test report for the last 50 workflows, so this seems to be fixed.
No branches or pull requests
On Windows, 3.8: https://github.com/dask/distributed/pull/4925/checks?check_run_id=2889177384#step:10:2191
The traceback is just an asyncio.TimeoutError
The text was updated successfully, but these errors were encountered: