Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test_file_descriptor intermittent failures #4045

Open
mrocklin opened this issue Aug 14, 2020 · 0 comments
Open

test_file_descriptor intermittent failures #4045

mrocklin opened this issue Aug 14, 2020 · 0 comments
Labels
flaky test Intermittent failures on CI.

Comments

@mrocklin
Copy link
Member

____________________________ test_file_descriptors _____________________________
    def test_func():
        result = None
        workers = []
        with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:
    
            async def coro():
                with dask.config.set(config):
                    s = False
                    for i in range(5):
                        try:
                            s, ws = await start_cluster(
                                nthreads,
                                scheduler,
                                loop,
                                security=security,
                                Worker=Worker,
                                scheduler_kwargs=scheduler_kwargs,
                                worker_kwargs=worker_kwargs,
                            )
                        except Exception as e:
                            logger.error(
                                "Failed to start gen_cluster, retrying",
                                exc_info=True,
                            )
                            await asyncio.sleep(1)
                        else:
                            workers[:] = ws
                            args = [s] + workers
                            break
                    if s is False:
                        raise Exception("Could not start cluster")
                    if client:
                        c = await Client(
                            s.address,
                            loop=loop,
                            security=security,
                            asynchronous=True,
                            **client_kwargs,
                        )
                        args = [c] + args
                    try:
                        future = func(*args)
                        if timeout:
                            future = asyncio.wait_for(future, timeout)
                        result = await future
                        if s.validate:
                            s.validate_state()
                    finally:
                        if client and c.status not in ("closing", "closed"):
                            await c._close(fast=s.status == Status.closed)
                        await end_cluster(s, workers)
                        await asyncio.wait_for(cleanup_global_workers(), 1)
    
                    try:
                        c = await default_client()
                    except ValueError:
                        pass
                    else:
                        await c._close(fast=True)
    
                    def get_unclosed():
                        return [c for c in Comm._instances if not c.closed()] + [
                            c
                            for c in _global_clients.values()
                            if c.status != "closed"
                        ]
    
                    try:
                        start = time()
                        while time() < start + 5:
                            gc.collect()
                            if not get_unclosed():
                                break
                            await asyncio.sleep(0.05)
                        else:
                            if allow_unclosed:
                                print(f"Unclosed Comms: {get_unclosed()}")
                            else:
                                raise RuntimeError("Unclosed Comms", get_unclosed())
                    finally:
                        Comm._instances.clear()
                        _global_clients.clear()
    
                    return result
    
>           result = loop.run_sync(
                coro, timeout=timeout * 2 if timeout else timeout
            )
distributed/utils_test.py:953: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda/envs/dask-distributed/lib/python3.8/site-packages/tornado/ioloop.py:532: in run_sync
    return future_cell[0].result()
distributed/utils_test.py:912: in coro
    result = await future
../../../miniconda/envs/dask-distributed/lib/python3.8/asyncio/tasks.py:483: in wait_for
    return fut.result()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
c = <Client: 'tcp://127.0.0.1:36000' processes=0 threads=0, memory=0 B>
s = <Scheduler: "tcp://127.0.0.1:36000" processes: 0 cores: 0>
    @pytest.mark.slow
    @pytest.mark.skipif(
        sys.platform.startswith("win"), reason="file descriptors not really a thing"
    )
    @gen_cluster(client=True, nthreads=[], timeout=240)
    async def test_file_descriptors(c, s):
        await asyncio.sleep(0.1)
        psutil = pytest.importorskip("psutil")
        da = pytest.importorskip("dask.array")
        proc = psutil.Process()
        num_fds_1 = proc.num_fds()
    
        N = 20
        nannies = await asyncio.gather(*[Nanny(s.address, loop=s.loop) for _ in range(N)])
    
        while len(s.nthreads) < N:
            await asyncio.sleep(0.1)
    
        num_fds_2 = proc.num_fds()
    
        await asyncio.sleep(0.2)
    
        num_fds_3 = proc.num_fds()
        assert num_fds_3 <= num_fds_2 + N  # add some heartbeats
    
        x = da.random.random(size=(1000, 1000), chunks=(25, 25))
        x = c.persist(x)
        await wait(x)
    
        num_fds_4 = proc.num_fds()
        assert num_fds_4 <= num_fds_2 + 2 * N
    
        y = c.persist(x + x.T)
        await wait(y)
    
        num_fds_5 = proc.num_fds()
        assert num_fds_5 < num_fds_4 + N
    
        await asyncio.sleep(1)
    
        num_fds_6 = proc.num_fds()
        assert num_fds_6 < num_fds_5 + N
    
        await asyncio.gather(*[n.close() for n in nannies])
        await c.close()
    
        assert not s.rpc.open
        for addr, occ in c.rpc.occupied.items():
            for comm in occ:
>               assert comm.closed() or comm.peer_address != s.address, comm
E               AssertionError: <TCP ConnectionPool.identity local=tcp://127.0.0.1:41158 remote=tcp://127.0.0.1:36000>
E               assert (False or 'tcp://127.0.0.1:36000' != 'tcp://127.0.0.1:36000')
E                +  where False = <bound method TCP.closed of <TCP ConnectionPool.identity local=tcp://127.0.0.1:41158 remote=tcp://127.0.0.1:36000>>()
E                +    where <bound method TCP.closed of <TCP ConnectionPool.identity local=tcp://127.0.0.1:41158 remote=tcp://127.0.0.1:36000>> = <TCP ConnectionPool.identity local=tcp://127.0.0.1:41158 remote=tcp://127.0.0.1:36000>.closed
E                +  and   'tcp://127.0.0.1:36000' = <TCP ConnectionPool.identity local=tcp://127.0.0.1:41158 remote=tcp://127.0.0.1:36000>.peer_address
E                +  and   'tcp://127.0.0.1:36000' = <Scheduler: "tcp://127.0.0.1:36000" processes: 0 cores: 0>.address
@jrbourbeau jrbourbeau added the flaky test Intermittent failures on CI. label Jun 10, 2021
@fjetter fjetter mentioned this issue Feb 17, 2022
51 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flaky test Intermittent failures on CI.
Projects
None yet
Development

No branches or pull requests

2 participants