Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flaky test_profile_server #4251

Open
jrbourbeau opened this issue Nov 18, 2020 · 4 comments
Open

Flaky test_profile_server #4251

jrbourbeau opened this issue Nov 18, 2020 · 4 comments
Labels
flaky test Intermittent failures on CI.

Comments

@jrbourbeau
Copy link
Member

We ran into a failure of test_profile_server in CI over in #4248 that was unrelated to the changes in the PR. Full traceback below.

Full traceback:
2020-11-17T21:43:17.4637490Z ================================== FAILURES ===================================
2020-11-17T21:43:17.4638364Z _____________________________ test_profile_server _____________________________
2020-11-17T21:43:17.4638784Z 
2020-11-17T21:43:17.4639380Z self = <closed TCP>, deserializers = None
2020-11-17T21:43:17.4639838Z 
2020-11-17T21:43:17.4640417Z     async def read(self, deserializers=None):
2020-11-17T21:43:17.4641238Z         stream = self.stream
2020-11-17T21:43:17.4641733Z         if stream is None:
2020-11-17T21:43:17.4642434Z             raise CommClosedError
2020-11-17T21:43:17.4642878Z     
2020-11-17T21:43:17.4643274Z         try:
2020-11-17T21:43:17.4643935Z >           n_frames = await stream.read_bytes(8)
2020-11-17T21:43:17.4644789Z E           tornado.iostream.StreamClosedError: Stream is closed
2020-11-17T21:43:17.4645373Z 
2020-11-17T21:43:17.4645937Z distributed\comm\tcp.py:187: StreamClosedError
2020-11-17T21:43:17.4646472Z 
2020-11-17T21:43:17.4647051Z The above exception was the direct cause of the following exception:
2020-11-17T21:43:17.4647545Z 
2020-11-17T21:43:17.4647915Z     def test_func():
2020-11-17T21:43:17.4648375Z         result = None
2020-11-17T21:43:17.4648771Z         workers = []
2020-11-17T21:43:17.4649377Z         with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:
2020-11-17T21:43:17.4649894Z     
2020-11-17T21:43:17.4650314Z             async def coro():
2020-11-17T21:43:17.4650810Z                 with dask.config.set(config):
2020-11-17T21:43:17.4651328Z                     s = False
2020-11-17T21:43:17.4651735Z                     for i in range(5):
2020-11-17T21:43:17.4652177Z                         try:
2020-11-17T21:43:17.4652622Z                             s, ws = await start_cluster(
2020-11-17T21:43:17.4653132Z                                 nthreads,
2020-11-17T21:43:17.4653602Z                                 scheduler,
2020-11-17T21:43:17.4654094Z                                 loop,
2020-11-17T21:43:17.4654702Z                                 security=security,
2020-11-17T21:43:17.4655226Z                                 Worker=Worker,
2020-11-17T21:43:17.4655774Z                                 scheduler_kwargs=scheduler_kwargs,
2020-11-17T21:43:17.4656457Z                                 worker_kwargs=worker_kwargs,
2020-11-17T21:43:17.4656912Z                             )
2020-11-17T21:43:17.4657414Z                         except Exception as e:
2020-11-17T21:43:17.4657915Z                             logger.error(
2020-11-17T21:43:17.4658509Z                                 "Failed to start gen_cluster, retrying",
2020-11-17T21:43:17.4659044Z                                 exc_info=True,
2020-11-17T21:43:17.4659483Z                             )
2020-11-17T21:43:17.4659929Z                             await asyncio.sleep(1)
2020-11-17T21:43:17.4660428Z                         else:
2020-11-17T21:43:17.4661000Z                             workers[:] = ws
2020-11-17T21:43:17.4661478Z                             args = [s] + workers
2020-11-17T21:43:17.4662156Z                             break
2020-11-17T21:43:17.4662607Z                     if s is False:
2020-11-17T21:43:17.4663135Z                         raise Exception("Could not start cluster")
2020-11-17T21:43:17.4663686Z                     if client:
2020-11-17T21:43:17.4664111Z                         c = await Client(
2020-11-17T21:43:17.4664590Z                             s.address,
2020-11-17T21:43:17.4665020Z                             loop=loop,
2020-11-17T21:43:17.4665520Z                             security=security,
2020-11-17T21:43:17.4666019Z                             asynchronous=True,
2020-11-17T21:43:17.4668294Z                             **client_kwargs,
2020-11-17T21:43:17.4668742Z                         )
2020-11-17T21:43:17.4669111Z                         args = [c] + args
2020-11-17T21:43:17.4669489Z                     try:
2020-11-17T21:43:17.4669949Z                         future = func(*args)
2020-11-17T21:43:17.4670379Z                         if timeout:
2020-11-17T21:43:17.4671515Z                             future = asyncio.wait_for(future, timeout)
2020-11-17T21:43:17.4672043Z                         result = await future
2020-11-17T21:43:17.4672481Z                         if s.validate:
2020-11-17T21:43:17.4672881Z                             s.validate_state()
2020-11-17T21:43:17.4673258Z                     finally:
2020-11-17T21:43:17.4673758Z                         if client and c.status not in ("closing", "closed"):
2020-11-17T21:43:17.4674333Z                             await c._close(fast=s.status == Status.closed)
2020-11-17T21:43:17.4674904Z                         await end_cluster(s, workers)
2020-11-17T21:43:17.4675454Z                         await asyncio.wait_for(cleanup_global_workers(), 1)
2020-11-17T21:43:17.4675938Z     
2020-11-17T21:43:17.4676223Z                     try:
2020-11-17T21:43:17.4676728Z                         c = await default_client()
2020-11-17T21:43:17.4677663Z                     except ValueError:
2020-11-17T21:43:17.4678277Z                         pass
2020-11-17T21:43:17.4678640Z                     else:
2020-11-17T21:43:17.4679031Z                         await c._close(fast=True)
2020-11-17T21:43:17.4679417Z     
2020-11-17T21:43:17.4679926Z                     def get_unclosed():
2020-11-17T21:43:17.4680480Z                         return [c for c in Comm._instances if not c.closed()] + [
2020-11-17T21:43:17.4680936Z                             c
2020-11-17T21:43:17.4681434Z                             for c in _global_clients.values()
2020-11-17T21:43:17.4681919Z                             if c.status != "closed"
2020-11-17T21:43:17.4682281Z                         ]
2020-11-17T21:43:17.4682606Z     
2020-11-17T21:43:17.4682897Z                     try:
2020-11-17T21:43:17.4683275Z                         start = time()
2020-11-17T21:43:17.4683663Z                         while time() < start + 5:
2020-11-17T21:43:17.4684348Z                             gc.collect()
2020-11-17T21:43:17.4684746Z                             if not get_unclosed():
2020-11-17T21:43:17.4685122Z                                 break
2020-11-17T21:43:17.4685567Z                             await asyncio.sleep(0.05)
2020-11-17T21:43:17.4686268Z                         else:
2020-11-17T21:43:17.4686713Z                             if allow_unclosed:
2020-11-17T21:43:17.4687164Z                                 print(f"Unclosed Comms: {get_unclosed()}")
2020-11-17T21:43:17.4687613Z                             else:
2020-11-17T21:43:17.4688079Z                                 raise RuntimeError("Unclosed Comms", get_unclosed())
2020-11-17T21:43:17.4688541Z                     finally:
2020-11-17T21:43:17.4688969Z                         Comm._instances.clear()
2020-11-17T21:43:17.4689594Z                         _global_clients.clear()
2020-11-17T21:43:17.4690174Z     
2020-11-17T21:43:17.4690498Z                     return result
2020-11-17T21:43:17.4690850Z     
2020-11-17T21:43:17.4691181Z             result = loop.run_sync(
2020-11-17T21:43:17.4691683Z >               coro, timeout=timeout * 2 if timeout else timeout
2020-11-17T21:43:17.4692143Z             )
2020-11-17T21:43:17.4692364Z 
2020-11-17T21:43:17.4692728Z distributed\utils_test.py:954: 
2020-11-17T21:43:17.4693336Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
2020-11-17T21:43:17.4693978Z C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\ioloop.py:576: in run_sync
2020-11-17T21:43:17.4694800Z     return future_cell[0].result()
2020-11-17T21:43:17.4695275Z distributed\utils_test.py:912: in coro
2020-11-17T21:43:17.4695695Z     result = await future
2020-11-17T21:43:17.4696329Z C:\Miniconda3\envs\dask-distributed\lib\asyncio\tasks.py:442: in wait_for
2020-11-17T21:43:17.4696884Z     return fut.result()
2020-11-17T21:43:17.4697434Z distributed\tests\test_client.py:5932: in test_profile_server
2020-11-17T21:43:17.4698028Z     p = await c.profile(scheduler=True)  #  Scheduler
2020-11-17T21:43:17.4698547Z distributed\client.py:3431: in _profile
2020-11-17T21:43:17.4699970Z     scheduler=scheduler,
2020-11-17T21:43:17.4700524Z distributed\core.py:883: in send_recv_from_rpc
2020-11-17T21:43:17.4701119Z     result = await send_recv(comm=comm, op=key, **kwargs)
2020-11-17T21:43:17.4701644Z distributed\core.py:666: in send_recv
2020-11-17T21:43:17.4702297Z     response = await comm.read(deserializers=deserializers)
2020-11-17T21:43:17.4702896Z distributed\comm\tcp.py:202: in read
2020-11-17T21:43:17.4703457Z     convert_stream_closed_error(self, e)
2020-11-17T21:43:17.4704045Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
2020-11-17T21:43:17.4704292Z 
2020-11-17T21:43:17.4704765Z obj = <closed TCP>, exc = StreamClosedError('Stream is closed')
2020-11-17T21:43:17.4705173Z 
2020-11-17T21:43:17.4705554Z     def convert_stream_closed_error(obj, exc):
2020-11-17T21:43:17.4705981Z         """
2020-11-17T21:43:17.4706539Z         Re-raise StreamClosedError as CommClosedError.
2020-11-17T21:43:17.4707071Z         """
2020-11-17T21:43:17.4707430Z         if exc.real_error is not None:
2020-11-17T21:43:17.4707959Z             # The stream was closed because of an underlying OS error
2020-11-17T21:43:17.4708492Z             exc = exc.real_error
2020-11-17T21:43:17.4708960Z             if ssl and isinstance(exc, ssl.SSLError):
2020-11-17T21:43:17.4709500Z                 if "UNKNOWN_CA" in exc.reason:
2020-11-17T21:43:17.4710016Z                     raise FatalCommClosedError(
2020-11-17T21:43:17.4710769Z                         "in %s: %s: %s" % (obj, exc.__class__.__name__, exc)
2020-11-17T21:43:17.4711158Z                     )
2020-11-17T21:43:17.4711584Z             raise CommClosedError(
2020-11-17T21:43:17.4712069Z                 "in %s: %s: %s" % (obj, exc.__class__.__name__, exc)
2020-11-17T21:43:17.4712466Z             ) from exc
2020-11-17T21:43:17.4712920Z         else:
2020-11-17T21:43:17.4713387Z >           raise CommClosedError("in %s: %s" % (obj, exc)) from exc
2020-11-17T21:43:17.4714462Z E           distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed
2020-11-17T21:43:17.4715293Z 
2020-11-17T21:43:17.4715735Z distributed\comm\tcp.py:126: CommClosedError
2020-11-17T21:43:17.4717062Z ---------------------------- Captured stderr call -----------------------------
2020-11-17T21:43:17.4718559Z distributed.protocol.core - CRITICAL - Failed to Serialize
2020-11-17T21:43:17.4719516Z Traceback (most recent call last):
2020-11-17T21:43:17.4720490Z   File "D:\a\distributed\distributed\distributed\protocol\core.py", line 39, in dumps
2020-11-17T21:43:17.4721691Z     small_header, small_payload = dumps_msgpack(msg, **compress_opts)
2020-11-17T21:43:17.4722852Z   File "D:\a\distributed\distributed\distributed\protocol\core.py", line 184, in dumps_msgpack
2020-11-17T21:43:17.4723714Z     payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
2020-11-17T21:43:17.4724660Z   File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\msgpack\__init__.py", line 35, in packb
2020-11-17T21:43:17.4725367Z     return Packer(**kwargs).pack(o)
2020-11-17T21:43:17.4726001Z   File "msgpack/_packer.pyx", line 286, in msgpack._cmsgpack.Packer.pack
2020-11-17T21:43:17.4726912Z   File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
2020-11-17T21:43:17.4727715Z   File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer.pack
2020-11-17T21:43:17.4728536Z   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4729355Z   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4730268Z   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4730905Z   [Previous line repeated 508 more times]
2020-11-17T21:43:17.4731581Z   File "msgpack/_packer.pyx", line 223, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4732352Z   File "msgpack/_packer.pyx", line 157, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4733133Z ValueError: recursion limit exceeded.
2020-11-17T21:43:17.4733831Z distributed.comm.utils - ERROR - recursion limit exceeded.
2020-11-17T21:43:17.4734461Z Traceback (most recent call last):
2020-11-17T21:43:17.4735153Z   File "D:\a\distributed\distributed\distributed\comm\utils.py", line 35, in _to_frames
2020-11-17T21:43:17.4735939Z     msg, serializers=serializers, on_error=on_error, context=context
2020-11-17T21:43:17.4736827Z   File "D:\a\distributed\distributed\distributed\protocol\core.py", line 39, in dumps
2020-11-17T21:43:17.4737585Z     small_header, small_payload = dumps_msgpack(msg, **compress_opts)
2020-11-17T21:43:17.4738408Z   File "D:\a\distributed\distributed\distributed\protocol\core.py", line 184, in dumps_msgpack
2020-11-17T21:43:17.4739267Z     payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
2020-11-17T21:43:17.4740209Z   File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\msgpack\__init__.py", line 35, in packb
2020-11-17T21:43:17.4740921Z     return Packer(**kwargs).pack(o)
2020-11-17T21:43:17.4741593Z   File "msgpack/_packer.pyx", line 286, in msgpack._cmsgpack.Packer.pack
2020-11-17T21:43:17.4742396Z   File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
2020-11-17T21:43:17.4743240Z   File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer.pack
2020-11-17T21:43:17.4744020Z   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4744782Z   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4745579Z   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4746213Z   [Previous line repeated 508 more times]
2020-11-17T21:43:17.4746944Z   File "msgpack/_packer.pyx", line 223, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4747792Z   File "msgpack/_packer.pyx", line 157, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4748484Z ValueError: recursion limit exceeded.
2020-11-17T21:43:17.4750030Z tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x0000022338FC03A8>, <Task finished coro=<BaseTCPListener._handle_stream() done, defined at D:\a\distributed\distributed\distributed\comm\tcp.py:447> exception=ValueError('recursion limit exceeded.')>)
2020-11-17T21:43:17.4751545Z Traceback (most recent call last):
2020-11-17T21:43:17.4752311Z   File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\ioloop.py", line 758, in _run_callback
2020-11-17T21:43:17.4753081Z     ret = callback()
2020-11-17T21:43:17.4753813Z   File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\stack_context.py", line 300, in null_wrapper
2020-11-17T21:43:17.4754593Z     return fn(*args, **kwargs)
2020-11-17T21:43:17.4755326Z   File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\tcpserver.py", line 297, in <lambda>
2020-11-17T21:43:17.4756084Z     lambda f: f.result())
2020-11-17T21:43:17.4756763Z   File "D:\a\distributed\distributed\distributed\comm\tcp.py", line 463, in _handle_stream
2020-11-17T21:43:17.4757456Z     await self.comm_handler(comm)
2020-11-17T21:43:17.4758090Z   File "D:\a\distributed\distributed\distributed\core.py", line 546, in handle_comm
2020-11-17T21:43:17.4758823Z     await comm.write(result, serializers=serializers)
2020-11-17T21:43:17.4759588Z   File "D:\a\distributed\distributed\distributed\comm\tcp.py", line 231, in write
2020-11-17T21:43:17.4760216Z     **self.handshake_options,
2020-11-17T21:43:17.4760909Z   File "D:\a\distributed\distributed\distributed\comm\utils.py", line 54, in to_frames
2020-11-17T21:43:17.4761503Z     return _to_frames()
2020-11-17T21:43:17.4762145Z   File "D:\a\distributed\distributed\distributed\comm\utils.py", line 35, in _to_frames
2020-11-17T21:43:17.4762928Z     msg, serializers=serializers, on_error=on_error, context=context
2020-11-17T21:43:17.4763763Z   File "D:\a\distributed\distributed\distributed\protocol\core.py", line 39, in dumps
2020-11-17T21:43:17.4764586Z     small_header, small_payload = dumps_msgpack(msg, **compress_opts)
2020-11-17T21:43:17.4765384Z   File "D:\a\distributed\distributed\distributed\protocol\core.py", line 184, in dumps_msgpack
2020-11-17T21:43:17.4768748Z     payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
2020-11-17T21:43:17.4770044Z   File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\msgpack\__init__.py", line 35, in packb
2020-11-17T21:43:17.4771195Z     return Packer(**kwargs).pack(o)
2020-11-17T21:43:17.4771834Z   File "msgpack/_packer.pyx", line 286, in msgpack._cmsgpack.Packer.pack
2020-11-17T21:43:17.4772686Z   File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
2020-11-17T21:43:17.4773487Z   File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer.pack
2020-11-17T21:43:17.4774327Z   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4775095Z   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4775900Z   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4777050Z   [Previous line repeated 508 more times]
2020-11-17T21:43:17.4777982Z   File "msgpack/_packer.pyx", line 223, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4778782Z   File "msgpack/_packer.pyx", line 157, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4907924Z ValueError: recursion limit exceeded.
@jrbourbeau jrbourbeau added the flaky test Intermittent failures on CI. label Nov 18, 2020
@mrocklin
Copy link
Member

mrocklin commented Nov 18, 2020 via email

@gjoseph92
Copy link
Collaborator

This is still failing frequently:
image

#6696 seems relevant? cc @graingert

@graingert
Copy link
Member

This is still failing frequently: image

#6696 seems relevant? cc @graingert

6696 won't fix this issue, it will only make it worse as my PR is using a deque to build a bigger dict than the stack can handle

@graingert
Copy link
Member

I think the only fix is to change the format of the profiler.merge return value so it's a flat object with internal references

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flaky test Intermittent failures on CI.
Projects
None yet
Development

No branches or pull requests

4 participants