We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
____________________________ test_release_failure _____________________________ c = <Client: No scheduler connected> s = <Scheduler 'tcp://127.0.0.1:63277', workers: 0, cores: 0, tasks: 0> a = <Worker 'tcp://127.0.0.1:63278', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0> b = <Worker 'tcp://127.0.0.1:63280', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0> @gen_cluster( client=True, config={ "distributed.scheduler.locks.lease-timeout": "100ms", "distributed.scheduler.locks.lease-validation-interval": "100ms", }, ) async def test_release_failure(c, s, a, b): """Don't raise even if release fails: lease will be cleaned up by the lease-validation after a specified interval anyways (see config parameters used).""" with dask.config.set({"distributed.comm.retry.count": 1}): pool = await FlakyConnectionPool(failing_connections=5) semaphore = await Semaphore( max_leases=2, name="resource_we_want_to_limit", scheduler_rpc=pool(s.address), ) await semaphore.acquire() pool.activate() # Comm chaos starts # Release fails (after a single retry) because of broken connections with captured_logger( "distributed.semaphore", level=logging.ERROR ) as semaphore_log: with captured_logger("distributed.utils_comm") as retry_log: assert await semaphore.release() is False with captured_logger( "distributed.semaphore", level=logging.DEBUG ) as semaphore_cleanup_log: pool.deactivate() # comm chaos stops assert await semaphore.get_value() == 1 # lease is still registered await asyncio.sleep(0.2) # Wait for lease to be cleaned up # Check release was retried retry_log = retry_log.getvalue().split("\n")[0] assert retry_log.startswith( "Retrying semaphore release:" ) and retry_log.endswith("after exception in attempt 0/1: ") # Check release failed semaphore_log = semaphore_log.getvalue().split("\n")[0] assert semaphore_log.startswith( "Release failed for id=" ) and semaphore_log.endswith("Cluster network might be unstable?") # Check lease has timed out > assert any( log.startswith("Lease") and "timed out after" in log for log in semaphore_cleanup_log.getvalue().split("\n") ) E assert False E + where False = any(<generator object test_release_failure.<locals>.<genexpr> at 0x000002064636ABA0>) distributed\tests\test_semaphore.py:596: AssertionError
cc @fjetter
https://github.com/dask/distributed/runs/6148590819?check_suite_focus=true
The text was updated successfully, but these errors were encountered:
test_semaphore.py
fjetter
Successfully merging a pull request may close this issue.
cc @fjetter
https://github.com/dask/distributed/runs/6148590819?check_suite_focus=true
The text was updated successfully, but these errors were encountered: