Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Computation deadlocks due to worker rapidly running out of memory instead of spilling #6110

Closed
fjetter opened this issue Apr 12, 2022 · 64 comments · Fixed by #6189
Closed

Computation deadlocks due to worker rapidly running out of memory instead of spilling #6110

fjetter opened this issue Apr 12, 2022 · 64 comments · Fixed by #6189
Labels
deadlock The cluster appears to not make any progress

Comments

@fjetter
Copy link
Member

fjetter commented Apr 12, 2022

The below script is pretty reliably triggering deadlocks. I'm sure this can be reduced further but I haven't had time to do so, yet.

import coiled.v2
from distributed import Client
cluster = coiled.v2.Cluster(
    n_workers=20
)
client = Client(cluster)

from distributed import Client
from dask.datasets import timeseries
ddf = timeseries(
    "2020",
    "2025",
    partition_freq='2w',
)
ddf2 = timeseries(
    "2020",
    "2023",
    partition_freq='2w',
)
def slowident(df):
    import random
    import time
    time.sleep(random.randint(1, 5))
    return df
               
while True:
    client.restart()
    demo1 = ddf.map_partitions(slowident)
    (demo1.x + demo1.y).mean().compute()

    demo2 = ddf.merge(ddf2)
    demo2 = demo2.map_partitions(slowident)
    (demo2.x + demo2.y).mean().compute()

We could confirm that version 2022.1.1 is not affected by this but it appears that all follow up versions might be affected (haven't tested all of them, can definitely confirm for 2022.4.0)

@mrocklin
Copy link
Member

mrocklin commented Apr 12, 2022 via email

@martindurant
Copy link
Member

is pretty reliably triggering

Do you have an estimate of what fraction of runs? As @mrocklin says, this script is ideal for a bisect (behind a timeout failure).

@mrocklin
Copy link
Member

I'm currently trying to run this locally without Coiled. I encourage other folks to hop on as well, but I'll at least try to run with this for the rest of the day when I have free time.

@mrocklin
Copy link
Member

I'm also curious about how often this fails. Currently running this locally and not running into anything. I'm wondering how long I should wait before moving up to Coiled.

@mrocklin
Copy link
Member

I'm getting these when running locally, but I suspect that they're due to being unclean on restart

Traceback
Future exception was never retrieved
future: <Future finished exception=CommClosedError("Exception while trying to call remote method 'restart' before comm was established.")>
Traceback (most recent call last):
  File "/home/mrocklin/workspace/distributed/distributed/comm/tcp.py", line 226, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/mrocklin/workspace/distributed/distributed/core.py", line 864, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/home/mrocklin/workspace/distributed/distributed/core.py", line 709, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/home/mrocklin/workspace/distributed/distributed/comm/tcp.py", line 242, in read
    convert_stream_closed_error(self, e)
  File "/home/mrocklin/workspace/distributed/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) rpc.restart local=tcp://127.0.0.1:51626 remote=tcp://127.0.0.1:33741>: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/mrocklin/mambaforge/lib/python3.9/site-packages/tornado/gen.py", line 769, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/home/mrocklin/workspace/distributed/distributed/utils.py", line 220, in quiet
    yield task
  File "/home/mrocklin/mambaforge/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/home/mrocklin/workspace/distributed/distributed/core.py", line 867, in send_recv_from_rpc
    raise type(e)(
distributed.comm.core.CommClosedError: Exception while trying to call remote method 'restart' before comm was established.
2022-04-12 10:34:22,259 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f1dc605be80>>, <Task finished name='Task-33593' coro=<Scheduler.restart() done, defined at /home/mrocklin/workspace/distributed/distributed/scheduler.py:6061> exception=CommClosedError("Exception while trying to call remote method 'restart' before comm was established.")>)
Traceback (most recent call last):
  File "/home/mrocklin/workspace/distributed/distributed/comm/tcp.py", line 226, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/mrocklin/workspace/distributed/distributed/core.py", line 864, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/home/mrocklin/workspace/distributed/distributed/core.py", line 709, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/home/mrocklin/workspace/distributed/distributed/comm/tcp.py", line 242, in read
    convert_stream_closed_error(self, e)
  File "/home/mrocklin/workspace/distributed/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) rpc.restart local=tcp://127.0.0.1:51462 remote=tcp://127.0.0.1:35655>: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/mrocklin/mambaforge/lib/python3.9/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/home/mrocklin/mambaforge/lib/python3.9/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/home/mrocklin/workspace/distributed/distributed/scheduler.py", line 6116, in restart
    resps = await asyncio.wait_for(resps, timeout)
  File "/home/mrocklin/mambaforge/lib/python3.9/asyncio/tasks.py", line 481, in wait_for
    return fut.result()
  File "/home/mrocklin/workspace/distributed/distributed/utils.py", line 207, in All
    result = await tasks.next()
  File "/home/mrocklin/workspace/distributed/distributed/core.py", line 867, in send_recv_from_rpc
    raise type(e)(
distributed.comm.core.CommClosedError: Exception while trying to call remote method 'restart' before comm was established.
Task exception was never retrieved
future: <Task finished name='Task-33599' coro=<rpc.__getattr__.<locals>.send_recv_from_rpc() done, defined at /home/mrocklin/workspace/distributed/distributed/core.py:855> exception=CommClosedError("Exception while trying to call remote method 'restart' before comm was established.")>
Traceback (most recent call last):
  File "/home/mrocklin/workspace/distributed/distributed/comm/tcp.py", line 226, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/mrocklin/workspace/distributed/distributed/core.py", line 864, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/home/mrocklin/workspace/distributed/distributed/core.py", line 709, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/home/mrocklin/workspace/distributed/distributed/comm/tcp.py", line 242, in read
    convert_stream_closed_error(self, e)
  File "/home/mrocklin/workspace/distributed/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) rpc.restart local=tcp://127.0.0.1:50694 remote=tcp://127.0.0.1:35275>: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/mrocklin/workspace/distributed/distributed/core.py", line 867, in send_recv_from_rpc
    raise type(e)(
distributed.comm.core.CommClosedError: Exception while trying to call remote method 'restart' before comm was established.
Task exception was never retrieved
future: <Task finished name='Task-33600' coro=<rpc.__getattr__.<locals>.send_recv_from_rpc() done, defined at /home/mrocklin/workspace/distributed/distributed/core.py:855> exception=CommClosedError("Exception while trying to call remote method 'restart' before comm was established.")>
Traceback (most recent call last):
  File "/home/mrocklin/workspace/distributed/distributed/comm/tcp.py", line 226, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/mrocklin/workspace/distributed/distributed/core.py", line 864, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/home/mrocklin/workspace/distributed/distributed/core.py", line 709, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/home/mrocklin/workspace/distributed/distributed/comm/tcp.py", line 242, in read
    convert_stream_closed_error(self, e)
  File "/home/mrocklin/workspace/distributed/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) rpc.restart local=tcp://127.0.0.1:45222 remote=tcp://127.0.0.1:37231>: Stream is closed

The above exception was the direct cause of the following exception:

@mrocklin
Copy link
Member

Shifting to #5951 , which seems easier to iterate on

@mrocklin
Copy link
Member

Shifting back. I'm now running this at scale on Coiled. I'm not yet seeing a deadlock. @fjetter can you say a bit more about how often you ran into this?

@fjetter
Copy link
Member Author

fjetter commented Apr 13, 2022

We reverted the default environment on coiled so the above will not trigger an issue anymore. With 2022.4.1 it will deadlock within seconds / few minutes

@gjoseph92
Copy link
Collaborator

gjoseph92 commented Apr 14, 2022

I've been running this on Coiled. It reliably deadlocks within 2-3 iterations.

Script
import coiled.v2
from distributed import Client

cluster = coiled.v2.Cluster(
    name="florian-deadlock",
    n_workers=20,
    software="florian-deadlock",
    # environ=dict(
    #     DASK_LOGGING__DISTRIBUTED="debug",
    #     DASK_DISTRIBUTED__ADMIN__LOG_LENGTH="1000000"
    # )
)
client = Client(cluster)

from dask.datasets import timeseries

ddf = timeseries(
    "2020",
    "2025",
    partition_freq="2w",
)
ddf2 = timeseries(
    "2020",
    "2023",
    partition_freq="2w",
)


def slowident(df):
    import random
    import time

    time.sleep(random.randint(1, 5))
    return df


i = 1
while True:
    print(f"Iteration {i}")
    client.restart()
    demo1 = ddf.map_partitions(slowident)
    (demo1.x + demo1.y).mean().compute()

    demo2 = ddf.merge(ddf2)
    demo2 = demo2.map_partitions(slowident)
    (demo2.x + demo2.y).mean().compute()
    i += 1
[tool.poetry]
name = "florian-deadlock"
version = "0.1.0"
description = ""
authors = ["Gabe Joseph <[email protected]>"]

[tool.poetry.dependencies]
python = "=3.9.1"
dask = {extras = ["complete"], version = "^2022.4.0"}
ipython = "^8.2.0"
coiled = "^0.0.73"
scheduler-profilers = {git = "https://github.com/gjoseph92/scheduler-profilers.git", rev = "main"}

[tool.poetry.dev-dependencies]

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
#!/bin/bash

cat > postbuild.sh <<EOF
#!/bin/bash

pip install --no-deps "git+https://github.com/dask/distributed.git@main"

echo "export DASK_LOGGING__DISTRIBUTED=debug" >> ~/.bashrc
echo "export DASK_DISTRIBUTED__ADMIN__LOG_LENGTH=1000000" >> ~/.bashrc
EOF

poetry export --without-hashes > requirements.txt

coiled env create -n florian-deadlock --pip requirements.txt --post-build postbuild.sh

rm requirements.txt
rm postbuild.sh

I've tried distributed from main to get #6112. Still deadlocks.

The overall pattern I'm seeing is at least 1 worker which is effectively dead, but hasn't been removed from the scheduler yet.

Other works seem to still want to fetch data from that worker.

Screen Shot 2022-04-14 at 11 39 17 AM

In [9]: client.run(lambda dask_worker: [ts.who_has for ts in dask_worker.data_needed], workers=['tls://10.6.0.70:34327'])
Out[9]: 
{'tls://10.6.0.70:34327': [{'tls://10.6.12.222:42533'},
  {'tls://10.6.12.222:42533'},
  {'tls://10.6.12.222:42533'}]}
# notice the `who_has` addresses are all the same, and all reference the unresponsive red worker above

Here are logs (via Coiled) from one of the unresponsive workers:

2022-04-14 18:55:24,233 - distributed.worker - ERROR - Worker stream died during communication: tls://10.6.3.191:45383
Traceback (most recent call last):
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/tornado/iostream.py", line 867, in _read_to_buffer
    bytes_read = self.read_from_fd(buf)
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/tornado/iostream.py", line 1592, in read_from_fd
    return self.socket.recv_into(buf, len(buf))
  File "/opt/conda/envs/coiled/lib/python3.9/ssl.py", line 1241, in recv_into
    return self.read(nbytes, buffer)
  File "/opt/conda/envs/coiled/lib/python3.9/ssl.py", line 1099, in read
    return self._sslobj.read(len, buffer)
ConnectionResetError: [Errno 104] Connection reset by peer
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/worker.py", line 3019, in gather_dep
    response = await get_data_from_worker(
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/worker.py", line 4320, in get_data_from_worker
    return await retry_operation(_get_data, operation="get_data_from_worker")
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/utils_comm.py", line 381, in retry_operation
    return await retry(
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/utils_comm.py", line 366, in retry
    return await coro()
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/worker.py", line 4300, in _get_data
    response = await send_recv(
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/core.py", line 709, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/comm/tcp.py", line 242, in read
    convert_stream_closed_error(self, e)
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/comm/tcp.py", line 148, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TLS (closed) Ephemeral Worker->Worker for gather local=tls://10.6.3.169:44316 remote=tls://10.6.3.191:45383>: ConnectionResetError: [Errno 104] Connection reset by peer
2022-04-14 18:56:15,295 - distributed.worker - INFO - Stopping worker at tls://10.6.3.169:43833
2022-04-14 18:56:15,318 - distributed.nanny - INFO - Worker closed
2022-04-14 18:56:16,231 - distributed.worker - INFO -       Start worker at:     tls://10.6.3.169:40133
2022-04-14 18:56:16,231 - distributed.worker - INFO -          Listening to:     tls://10.6.3.169:40133
2022-04-14 18:56:16,231 - distributed.worker - INFO -          dashboard at:           10.6.3.169:39063
2022-04-14 18:56:16,231 - distributed.worker - INFO - Waiting to connect to:   tls://34.221.160.71:8786
2022-04-14 18:56:16,231 - distributed.worker - INFO - -------------------------------------------------
2022-04-14 18:56:16,231 - distributed.worker - INFO -               Threads:                          2
2022-04-14 18:56:16,233 - distributed.worker - INFO -                Memory:                   3.78 GiB
2022-04-14 18:56:16,233 - distributed.worker - INFO -       Local Directory: /dask-worker-space/worker-c7603wej
2022-04-14 18:56:16,234 - distributed.worker - INFO - -------------------------------------------------
2022-04-14 18:56:16,477 - distributed.worker - WARNING - Mismatched versions found
+-------------+-----------------------+-----------------------+---------------------------------------+
| Package     | This Worker           | scheduler             | workers                               |
+-------------+-----------------------+-----------------------+---------------------------------------+
| blosc       | MISSING               | MISSING               | {None, 'MISSING'}                     |
| distributed | 2022.4.0+33.g41ecbca2 | 2022.4.0+33.g41ecbca2 | {'2022.4.0+33.g41ecbca2', '2022.4.0'} |
| python      | 3.9.12.final.0        | 3.9.12.final.0        | {'3.9.12.final.0', '3.9.1.final.0'}   |
+-------------+-----------------------+-----------------------+---------------------------------------+
2022-04-14 18:56:16,478 - distributed.worker - INFO - Starting Worker plugin coiled-aws-env
2022-04-14 18:56:16,480 - distributed.worker - INFO -         Registered to:   tls://34.221.160.71:8786
2022-04-14 18:56:16,482 - distributed.worker - INFO - -------------------------------------------------
2022-04-14 18:56:16,483 - distributed.core - INFO - Starting established connection

You'll notice:

  • Connection to a different worker fails
  • The worker shuts down for some reason. I don't know why. But the nanny restarts it, and it reconnects.

I'm seeing the nanny-restart for all three unresponsive workers in my most recent run. Other workers don't show that pattern. I feel like the reconnection could have something to do with it.

I'm also confused why the workers seem to think they've reconnected to the scheduler, yet the scheduler still sees their last_seen time as many minutes ago.

I have cluster dumps, but I've found them less useful so far than just poking at things manually.

@mrocklin
Copy link
Member

I'm playing with this. I've gotten only about as far as @gjoseph92 has. I'm seeing the same thing. I don't see any reason why the worker would have restarted, which is also strange. (I'm not seeing the same comm issue)

Logs

Apr 14 22:35:49 ip-10-4-8-234 cloud-init[981]: {"msg":"Successfully pulled docker image","id":"7c4d42","username":"mrocklin","account":"mrocklin","level":20,"event_type":"vm_event","elapsed":50,"logging_context":{"user.username":"mrocklin","user.id":5,"user.is_authenticated":true}}COILED SERVER IS: https://cloud.coiled.io
Apr 14 22:35:50 ip-10-4-8-234 cloud-init[981]: Creating tmp_worker_1 ...
Apr 14 22:35:50 ip-10-4-8-234 systemd[1]: var-lib-docker-overlay2-2af07f68a80e4655a49a5f17b672a3a1c3a2cea0cbdec1b5e6efd24ba2de339b\x2dinit-merged.mount: Succeeded.
Apr 14 22:35:51 ip-10-4-8-234 systemd[1]: var-lib-docker-overlay2-2af07f68a80e4655a49a5f17b672a3a1c3a2cea0cbdec1b5e6efd24ba2de339b-merged.mount: Succeeded.
Apr 14 22:35:51 ip-10-4-8-234 containerd[477]: time="2022-04-14T22:35:51.834555695Z" level=info msg="starting signal loop" namespace=moby path=/run/containerd/io.containerd.runtime.v2.task/moby/ca0f33aa36f274dd26955208983421f1f1237cd88739058b216fc2a4980c988b pid=1926
Apr 14 22:35:51 ip-10-4-8-234 systemd[1]: run-docker-runtime\x2drunc-moby-ca0f33aa36f274dd26955208983421f1f1237cd88739058b216fc2a4980c988b-runc.AXTQoI.mount: Succeeded.
Apr 14 22:35:52 ip-10-4-8-234 cloud-init[981]: Creating tmp_worker_1 ... done
Apr 14 22:35:53 ip-10-4-8-234 cloud-init[981]: Attaching to tmp_worker_1
Apr 14 22:35:53 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:35:53,249 - distributed.preloading - INFO - Downloading preload at https://cloud.coiled.io/api/v2/cluster_facing/preload/worker
Apr 14 22:35:53 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:35:53,424 - distributed.preloading - INFO - Run preload setup function: https://cloud.coiled.io/api/v2/cluster_facing/preload/worker
Apr 14 22:35:53 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:35:53,433 - distributed.nanny - INFO -         Start Nanny at: 'tls://10.4.8.234:37539'
Apr 14 22:35:53 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:35:53,986 - distributed.worker - INFO -       Start worker at:     tls://10.4.8.234:34059
Apr 14 22:35:53 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:35:53,986 - distributed.worker - INFO -          Listening to:     tls://10.4.8.234:34059
Apr 14 22:35:53 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:35:53,987 - distributed.worker - INFO -          dashboard at:           10.4.8.234:45439
Apr 14 22:35:53 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:35:53,987 - distributed.worker - INFO - Waiting to connect to:   tls://18.215.64.220:8786
Apr 14 22:35:53 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:35:53,990 - distributed.worker - INFO - -------------------------------------------------
Apr 14 22:35:53 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:35:53,990 - distributed.worker - INFO -               Threads:                          2
Apr 14 22:35:53 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:35:53,990 - distributed.worker - INFO -                Memory:                   3.78 GiB
Apr 14 22:35:53 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:35:53,991 - distributed.worker - INFO -       Local Directory: /dask-worker-space/worker-hos3u3lc
Apr 14 22:35:53 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:35:53,991 - distributed.worker - INFO - -------------------------------------------------
Apr 14 22:35:54 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:35:54,522 - distributed.worker - WARNING - Mismatched versions found
Apr 14 22:35:54 ip-10-4-8-234 cloud-init[981]: +-------------+-----------------------+-----------------------+----------------------------------------------------+
Apr 14 22:35:54 ip-10-4-8-234 cloud-init[981]: | Package     | This Worker           | scheduler             | workers                                            |
Apr 14 22:35:54 ip-10-4-8-234 cloud-init[981]: +-------------+-----------------------+-----------------------+----------------------------------------------------+
Apr 14 22:35:54 ip-10-4-8-234 cloud-init[981]: | distributed | 2022.4.0+38.g0e937b3b | 2022.4.0+38.g0e937b3b | {'2022.4.0+33.g7dffe7d5', '2022.4.0+38.g0e937b3b'} |
Apr 14 22:35:54 ip-10-4-8-234 cloud-init[981]: +-------------+-----------------------+-----------------------+----------------------------------------------------+
Apr 14 22:35:54 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:35:54,523 - distributed.worker - INFO - Starting Worker plugin coiled-aws-env
Apr 14 22:35:54 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:35:54,523 - distributed.worker - INFO -         Registered to:   tls://18.215.64.220:8786
Apr 14 22:35:54 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:35:54,524 - distributed.worker - INFO - -------------------------------------------------
Apr 14 22:35:54 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:35:54,540 - distributed.core - INFO - Starting established connection
Apr 14 22:36:17 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:36:17,226 - distributed.worker - INFO - Stopping worker at tls://10.4.8.234:34059
Apr 14 22:36:17 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:36:17,241 - distributed.nanny - INFO - Worker closed
Apr 14 22:36:18 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:36:18,047 - distributed.worker - INFO -       Start worker at:     tls://10.4.8.234:42651
Apr 14 22:36:18 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:36:18,047 - distributed.worker - INFO -          Listening to:     tls://10.4.8.234:42651
Apr 14 22:36:18 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:36:18,047 - distributed.worker - INFO -          dashboard at:           10.4.8.234:46367
Apr 14 22:36:18 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:36:18,047 - distributed.worker - INFO - Waiting to connect to:   tls://18.215.64.220:8786
Apr 14 22:36:18 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:36:18,047 - distributed.worker - INFO - -------------------------------------------------
Apr 14 22:36:18 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:36:18,047 - distributed.worker - INFO -               Threads:                          2
Apr 14 22:36:18 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:36:18,047 - distributed.worker - INFO -                Memory:                   3.78 GiB
Apr 14 22:36:18 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:36:18,047 - distributed.worker - INFO -       Local Directory: /dask-worker-space/worker-m62zv9vy
Apr 14 22:36:18 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:36:18,049 - distributed.worker - INFO - -------------------------------------------------
Apr 14 22:36:18 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:36:18,364 - distributed.worker - WARNING - Mismatched versions found
Apr 14 22:36:18 ip-10-4-8-234 cloud-init[981]: +-------------+-----------------------+-----------------------+----------------------------------------------------+
Apr 14 22:36:18 ip-10-4-8-234 cloud-init[981]: | Package     | This Worker           | scheduler             | workers                                            |
Apr 14 22:36:18 ip-10-4-8-234 cloud-init[981]: +-------------+-----------------------+-----------------------+----------------------------------------------------+
Apr 14 22:36:18 ip-10-4-8-234 cloud-init[981]: | distributed | 2022.4.0+38.g0e937b3b | 2022.4.0+38.g0e937b3b | {'2022.4.0+33.g7dffe7d5', '2022.4.0+38.g0e937b3b'} |
Apr 14 22:36:18 ip-10-4-8-234 cloud-init[981]: +-------------+-----------------------+-----------------------+----------------------------------------------------+
Apr 14 22:36:18 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:36:18,364 - distributed.worker - INFO - Starting Worker plugin coiled-aws-env
Apr 14 22:36:18 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:36:18,365 - distributed.worker - INFO -         Registered to:   tls://18.215.64.220:8786
Apr 14 22:36:18 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:36:18,366 - distributed.worker - INFO - -------------------------------------------------
Apr 14 22:36:18 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:36:18,383 - distributed.core - INFO - Starting established connection
Apr 14 22:37:54 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:37:54,192 - distributed.worker - INFO - Stopping worker at tls://10.4.8.234:42651
Apr 14 22:37:54 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:37:54,215 - distributed.nanny - INFO - Worker closed
Apr 14 22:37:55 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:37:55,950 - distributed.worker - INFO -       Start worker at:     tls://10.4.8.234:33597
Apr 14 22:37:55 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:37:55,950 - distributed.worker - INFO -          Listening to:     tls://10.4.8.234:33597
Apr 14 22:37:55 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:37:55,950 - distributed.worker - INFO -          dashboard at:           10.4.8.234:33789
Apr 14 22:37:55 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:37:55,950 - distributed.worker - INFO - Waiting to connect to:   tls://18.215.64.220:8786
Apr 14 22:37:55 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:37:55,951 - distributed.worker - INFO - -------------------------------------------------
Apr 14 22:37:55 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:37:55,951 - distributed.worker - INFO -               Threads:                          2
Apr 14 22:37:55 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:37:55,951 - distributed.worker - INFO -                Memory:                   3.78 GiB
Apr 14 22:37:55 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:37:55,953 - distributed.worker - INFO -       Local Directory: /dask-worker-space/worker-ig_y0ab9
Apr 14 22:37:55 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:37:55,953 - distributed.worker - INFO - -------------------------------------------------
Apr 14 22:37:56 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:37:56,601 - distributed.worker - WARNING - Mismatched versions found
Apr 14 22:37:56 ip-10-4-8-234 cloud-init[981]: +-------------+-----------------------+-----------------------+----------------------------------------------------+
Apr 14 22:37:56 ip-10-4-8-234 cloud-init[981]: | Package     | This Worker           | scheduler             | workers                                            |
Apr 14 22:37:56 ip-10-4-8-234 cloud-init[981]: +-------------+-----------------------+-----------------------+----------------------------------------------------+
Apr 14 22:37:56 ip-10-4-8-234 cloud-init[981]: | distributed | 2022.4.0+38.g0e937b3b | 2022.4.0+38.g0e937b3b | {'2022.4.0+33.g7dffe7d5', '2022.4.0+38.g0e937b3b'} |
Apr 14 22:37:56 ip-10-4-8-234 cloud-init[981]: +-------------+-----------------------+-----------------------+----------------------------------------------------+
Apr 14 22:37:56 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:37:56,602 - distributed.worker - INFO - Starting Worker plugin coiled-aws-env
Apr 14 22:37:56 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:37:56,603 - distributed.worker - INFO -         Registered to:   tls://18.215.64.220:8786
Apr 14 22:37:56 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:37:56,604 - distributed.worker - INFO - -------------------------------------------------
Apr 14 22:37:56 ip-10-4-8-234 cloud-init[981]: 2022-04-14 22:37:56,605 - distributed.core - INFO - Starting established connection

I see a few things to inspect:

  1. Try logging way more information to explain why a worker might be closing (good to do anyway)
  2. Maybe instrument the worker to check in on its heartbeat messages to the scheduler and start to complain if they're not being listened to
  3. See if maybe the event loop is stuck on something (also with printing to logs)

None of these will find the answer, but I think that they're all increase visibility. These are my next steps. I'm also open to other suggestions.

@mrocklin
Copy link
Member

Also, two quick notes, my recent PRs don't seem to help with this.

If I use m5.large instances (my default type) this problem doesn't occur. My guess is that there is some disk thing happening here. We're running out of space and so failing hard when trying to touch disk. I'm going to aim towards that as a cause and just try to increase visibility along the way.

@mrocklin
Copy link
Member

cc @ntabris without SSDs Coiled instances have 10GB drives right now. Is that right?

@ntabris
Copy link
Contributor

ntabris commented Apr 14, 2022

without SSDs Coiled instances have 10GB drives right now. Is that right?

No, prod and staging are still putting 100GB boot (EBS) drives on all instances.

@mrocklin
Copy link
Member

Good to know. It doesn't seem like we could dump that much data this quickly with this workload. I did throw up a test to see what happens with Disk stops cooperating. We're not graceful here. #6128

I also tried to see what happens when we fail writing, and things were actually ok.

@mrocklin
Copy link
Member

Oh, I'm being dumb. The closed workers are due to the client.restart command 🤦

@gjoseph92
Copy link
Collaborator

Sorry, should have posted that. I was also confused until I actually read the code 😁

@mrocklin
Copy link
Member

So if I set freq="5s", effectively reducing the data by 80% then this issue doesn't arise. I continue to suspect disk.

@mrocklin
Copy link
Member

Setting worker_options={"data": dict()} (which should disable disk) I actually found that I stopped receiving heartbeats from lots of workers.

I'm going to switch away from t3s to m5.larges. I now suspect that this is just due to Amazon's t-type nodes, which are known to have variable performance. I wonder if maybe they're getting angry at us for abusing them. I'll stress out disk to compensate.

@mrocklin
Copy link
Member

Nope. Still an issue with m5's. t-type nodes and disk are both in the clear it seems. I can reliably recreate the situation way more easily now though 🙂

@mrocklin
Copy link
Member

Playing with triggering worker failures. I was able to kill workers in a way such that they didn't come back somehow.

Apr 15 14:17:49 ip-10-4-4-178 cloud-init[1012]: 2022-04-15 14:17:49,992 - distributed.worker - INFO - Starting Worker plugin coiled-aws-env
Apr 15 14:17:49 ip-10-4-4-178 cloud-init[1012]: 2022-04-15 14:17:49,992 - distributed.worker - INFO -         Registered to:    tls://34.201.27.34:8786
Apr 15 14:17:49 ip-10-4-4-178 cloud-init[1012]: 2022-04-15 14:17:49,992 - distributed.worker - INFO - -------------------------------------------------
Apr 15 14:17:50 ip-10-4-4-178 cloud-init[1012]: 2022-04-15 14:17:49,994 - distributed.core - INFO - Starting established connection
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]: 2022-04-15 14:18:29,747 - distributed.worker - ERROR - Worker stream died during communication: tls://10.4.4.203:45141
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]: Traceback (most recent call last):
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/comm/tcp.py", line 236, in read
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]:     n = await stream.read_into(chunk)
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]: tornado.iostream.StreamClosedError: Stream is closed
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]: The above exception was the direct cause of the following exception:
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]: Traceback (most recent call last):
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/worker.py", line 3021, in gather_dep
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]:     response = await get_data_from_worker(
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/worker.py", line 4325, in get_data_from_worker
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]:     return await retry_operation(_get_data, operation="get_data_from_worker")
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/utils_comm.py", line 381, in retry_operation
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]:     return await retry(
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/utils_comm.py", line 366, in retry
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]:     return await coro()
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/worker.py", line 4305, in _get_data
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]:     response = await send_recv(
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/core.py", line 709, in send_recv
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]:     response = await comm.read(deserializers=deserializers)
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/comm/tcp.py", line 242, in read
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]:     convert_stream_closed_error(self, e)
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]:     raise CommClosedError(f"in {obj}: {exc}") from exc
Apr 15 14:18:29 ip-10-4-4-178 cloud-init[1012]: distributed.comm.core.CommClosedError: in <TLS (closed) Ephemeral Worker->Worker for gather local=tls://10.4.4.178:34448 remote=tls://10.4.4.203:45141>: Stream is closed
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]: 2022-04-15 14:18:45,763 - distributed.worker - ERROR - Worker stream died during communication: tls://10.4.10.64:37185
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]: Traceback (most recent call last):
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/comm/tcp.py", line 236, in read
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]:     n = await stream.read_into(chunk)
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]: tornado.iostream.StreamClosedError: Stream is closed
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]: The above exception was the direct cause of the following exception:
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]: Traceback (most recent call last):
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/worker.py", line 3021, in gather_dep
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]:     response = await get_data_from_worker(
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/worker.py", line 4325, in get_data_from_worker
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]:     return await retry_operation(_get_data, operation="get_data_from_worker")
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/utils_comm.py", line 381, in retry_operation
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]:     return await retry(
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/utils_comm.py", line 366, in retry
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]:     return await coro()
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/worker.py", line 4305, in _get_data
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]:     response = await send_recv(
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/core.py", line 709, in send_recv
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]:     response = await comm.read(deserializers=deserializers)
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/comm/tcp.py", line 242, in read
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]:     convert_stream_closed_error(self, e)
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]:     raise CommClosedError(f"in {obj}: {exc}") from exc
Apr 15 14:18:45 ip-10-4-4-178 cloud-init[1012]: distributed.comm.core.CommClosedError: in <TLS (closed) Ephemeral Worker->Worker for gather local=tls://10.4.4.178:36410 remote=tls://10.4.10.64:37185>: Stream is closed
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]: 2022-04-15 14:18:47,912 - distributed.worker - ERROR - Worker stream died during communication: tls://10.4.5.79:34281
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]: Traceback (most recent call last):
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/comm/tcp.py", line 226, in read
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]:     frames_nbytes = await stream.read_bytes(fmt_size)
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]: tornado.iostream.StreamClosedError: Stream is closed
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]: The above exception was the direct cause of the following exception:
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]: Traceback (most recent call last):
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/worker.py", line 3021, in gather_dep
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]:     response = await get_data_from_worker(
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/worker.py", line 4325, in get_data_from_worker
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]:     return await retry_operation(_get_data, operation="get_data_from_worker")
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/utils_comm.py", line 381, in retry_operation
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]:     return await retry(
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/utils_comm.py", line 366, in retry
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]:     return await coro()
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/worker.py", line 4305, in _get_data
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]:     response = await send_recv(
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/core.py", line 709, in send_recv
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]:     response = await comm.read(deserializers=deserializers)
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/comm/tcp.py", line 242, in read
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]:     convert_stream_closed_error(self, e)
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]:     raise CommClosedError(f"in {obj}: {exc}") from exc
Apr 15 14:18:48 ip-10-4-4-178 cloud-init[1012]: distributed.comm.core.CommClosedError: in <TLS (closed) Ephemeral Worker->Worker for gather local=tls://10.4.4.178:56458 remote=tls://10.4.5.79:34281>: Stream is closed
Apr 15 14:18:51 ip-10-4-4-178 cloud-init[1012]: 2022-04-15 14:18:51,740 - distributed.worker_memory - WARNING - Worker exceeded 95% memory budget. Restarting
Apr 15 14:18:51 ip-10-4-4-178 cloud-init[1012]: 2022-04-15 14:18:51,841 - distributed.nanny - INFO - Worker process 30 was killed by signal 15
Apr 15 14:24:11 ip-10-4-4-178 systemd[1]: Starting Daily apt upgrade and clean activities...
Apr 15 14:24:15 ip-10-4-4-178 dbus-daemon[461]: [system] Activating via systemd: service name='org.freedesktop.PackageKit' unit='packagekit.service' requested by ':1.17' (uid=0 pid=1456 comm="/usr/bin/gdbus call --system --dest org.freedeskto" label="unconfined")
Apr 15 14:24:15 ip-10-4-4-178 systemd[1]: Starting PackageKit Daemon...
Apr 15 14:24:15 ip-10-4-4-178 PackageKit: daemon start
Apr 15 14:24:15 ip-10-4-4-178 dbus-daemon[461]: [system] Successfully activated service 'org.freedesktop.PackageKit'
Apr 15 14:24:15 ip-10-4-4-178 systemd[1]: Started PackageKit Daemon.
Apr 15 14:24:25 ip-10-4-4-178 kernel: [  534.068725] audit: type=1400 audit(1650032665.145:34): apparmor="STATUS" operation="profile_replace" info="same as current profile, skipping" profile="unconfined" name="/usr/sbin/tcpdump" pid=2846 comm="apparmor_parser"
Apr 15 14:24:26 ip-10-4-4-178 systemd[1]: apt-daily-upgrade.service: Succeeded.
Apr 15 14:24:26 ip-10-4-4-178 systemd[1]: Finished Daily apt upgrade and clean activities.

@mrocklin
Copy link
Member

After merging in a bunch of things to make the other failures go away I'm back at this issue. I tried running with data=dict() again (no disk) and am still able to recreate the issue far more often.

I tried checking to see if the event loop was stuck on something by sending in a plugin that writes what's going on in a separate thread

from dask.distributed import WorkerPlugin
from dask.utils import parse_timedelta
import threading
import time
from distributed import profile
import sys

class PrintThread(WorkerPlugin):
    name = "print-event-loop-thread"
    def __init__(self, delay="10s"):
        self.delay = parse_timedelta(delay)
    
    async def setup(self, worker):
        self.worker = worker
        self.thread_id = threading.get_ident()
        self.stop = threading.Event()
        self._thread = threading.Thread(target=self.go, daemon=True)
        self._thread.start()
        
    def go(self):
        while True:
            frame = sys._current_frames()[self.thread_id]
            
            print("-" * 40)
            for line in profile.call_stack(frame):
                print(line, end="")
            
            if self.stop.wait(timeout=self.delay):
                break
            
    def teardown(self, worker):
        self.stop.set()
        self._thread.join()
            
client.register_worker_plugin(PrintThread("5s"))

And while we get interesting output in logs, we stop getting that output once the worker hangs. So the problem isn't in the worker necessarily. I would expect this to run in Python pretty much no matter what. The problem is deeper I think.

@mrocklin
Copy link
Member

On restart the frozen workers are cleared out of the scheduler (which makes sense, it cleans metadata) but they don't come back. This means that even the Nanny is borked. Something is wrong with these instances I think. Coiled folks, I might want to engage you all on this next week.

@mrocklin
Copy link
Member

Example failed cluster: https://cloud.coiled.io/mrocklin/clusters/7589/details
Example failed worker: play-worker-821298da7e / i-0d288f69dd47e247e

@mrocklin
Copy link
Member

Yup, I just changed the worker plugin to a nanny plugin and we lose track of any ongoing output. Somehow this process has frozen in such a way that the network connection isn't severed (Dask would react well in this case) but that Python itself is also totally non-responsive. This is atypical.

@mrocklin
Copy link
Member

Sorry, and not just the worker process (which might have some crazy user code or something (but in this case it's just pandas)) but also the nanny process, which generally doesn't do anything at all. Something has frozen the entire box somehow.

If I were a cloud provider, I might freeze VMs in this way if they were behaving badly.

@ntabris
Copy link
Contributor

ntabris commented Apr 16, 2022

I connected to the serial console and it was logging something like this quickly and repeatedly:

Apr 16 16:20:52 ip-10-4-7-226 cloud-init[1004]:   File "<string>", line 1, in <module>
--
Apr 16 16:20:52 ip-10-4-7-226 cloud-init[1004]: #011  File "/opt/conda/envs/coiled/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
Apr 16 16:20:52 ip-10-4-7-226 cloud-init[1004]: #011exitcode = _main(fd, parent_sentinel)
Apr 16 16:20:52 ip-10-4-7-226 cloud-init[1004]:   File "/opt/conda/envs/coiled/lib/python3.9/multiprocessing/spawn.py", line 129, in _main
Apr 16 16:20:52 ip-10-4-7-226 cloud-init[1004]: #011return self._bootstrap(parent_sentinel)
Apr 16 16:20:52 ip-10-4-7-226 cloud-init[1004]:   File "/opt/conda/envs/coiled/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
Apr 16 16:20:52 ip-10-4-7-226 cloud-init[1004]: #011self.run()
Apr 16 16:20:52 ip-10-4-7-226 cloud-init[1004]:   File "/opt/conda/envs/coiled/lib/python3.9/multiprocessing/process.py", line 108, in run
Apr 16 16:20:53 ip-10-4-7-226 cloud-init[1004]: #011self._target(*self._args, **self._kwargs)
Apr 16 16:20:53 ip-10-4-7-226 cloud-init[1004]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/process.py", line 175, in _run
Apr 16 16:20:53 ip-10-4-7-226 cloud-init[1004]: #011target(*args, **kwargs)
Apr 16 16:20:53 ip-10-4-7-226 cloud-init[1004]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/nanny.py", line 907, in _run
Apr 16 16:20:53 ip-10-4-7-226 cloud-init[1004]: #011loop.run_sync(run)
Apr 16 16:20:53 ip-10-4-7-226 cloud-init[1004]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/tornado/ioloop.py", line 524, in run_sync
Apr 16 16:20:53 ip-10-4-7-226 cloud-init[1004]: #011self.start()
Apr 16 16:20:53 ip-10-4-7-226 cloud-init[1004]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/tornado/platform/asyncio.py", line 199, in start
Apr 16 16:20:53 ip-10-4-7-226 cloud-init[1004]: #011self.asyncio_loop.run_forever()
Apr 16 16:20:53 ip-10-4-7-226 cloud-init[1004]:   File "/opt/conda/envs/coiled/lib/python3.9/asyncio/base_events.py", line 601, in run_forever
Apr 16 16:20:53 ip-10-4-7-226 cloud-init[1004]: #011self._run_once()
Apr 16 16:20:53 ip-10-4-7-226 cloud-init[1004]:   File "/opt/conda/envs/coiled/lib/python3.9/asyncio/base_events.py", line 1869, in _run_once
Apr 16 16:20:53 ip-10-4-7-226 cloud-init[1004]: #011event_list = self._selector.select(timeout)
Apr 16 16:20:53 ip-10-4-7-226 cloud-init[1004]:   File "/opt/conda/envs/coiled/lib/python3.9/selectors.py", line 469, in select
Apr 16 16:20:53 ip-10-4-7-226 cloud-init[1004]: #011fd_event_list = self._selector.poll(timeout, max_ev)

After a while dask did stop with message below and the VM started the shutdown process after that:

Apr 16 16:21:10 ip-10-4-7-226 cloud-init[1004]:   File "<string>", line 1, in <module>
--
Apr 16 16:21:10 ip-10-4-7-226 cloud-init[1004]: #011  File "/opt/conda/envs/coiled/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
Apr 16 16:21:10 ip-10-4-7-226 cloud-init[1004]: #011exitcode = _main(fd, parent_sentinel)
Apr 16 16:21:10 ip-10-4-7-226 cloud-init[1004]:   File "/opt/conda/envs/coiled/lib/python3.9/multiprocessing/spawn.py", line 129, in _main
Apr 16 16:21:10 ip-10-4-7-226 cloud-init[1004]: #011return self._bootstrap(parent_sentinel)
Apr 16 16:21:10 ip-10-4-7-226 cloud-init[1004]:   File "/opt/conda/envs/coiled/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
Apr 16 16:21:10 ip-10-4-7-226 cloud-init[1004]: #011self.run()
Apr 16 16:21:10 ip-10-4-7-226 cloud-init[1004]:   File "/opt/conda/envs/coiled/lib/python3.9/multiprocessing/process.py", line 108, in run
Apr 16 16:21:10 ip-10-4-7-226 cloud-init[1004]: #011self._target(*self._args, **self._kwargs)
Apr 16 16:21:10 ip-10-4-7-226 cloud-init[1004]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/process.py", line 175, in _run
Apr 16 16:21:10 ip-10-4-7-226 cloud-init[1004]: #011target(*args, **kwargs)
Apr 16 16:21:10 ip-10-4-7-226 cloud-init[1004]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/nanny.py", line 907, in _run
Apr 16 16:21:10 ip-10-4-7-226 cloud-init[1004]: #011loop.run_sync(run)
Apr 16 16:21:10 ip-10-4-7-226 cloud-init[1004]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/tornado/ioloop.py", line 524, in run_sync
Apr 16 16:21:11 ip-10-4-7-226 cloud-init[1004]: #011self.start()
Apr 16 16:21:11 ip-10-4-7-226 cloud-init[1004]:   File "/opt/conda/envs/coiled/lib/python3.9/site-packages/tornado/platform/asyncio.py", line 199, in start
Apr 16 16:21:11 ip-10-4-7-226 cloud-init[1004]: #011self.asyncio_loop.run_forever()
Apr 16 16:21:11 ip-10-4-7-226 cloud-init[1004]:   File "/opt/conda/envs/coiled/lib/python3.9/asyncio/base_events.py", line 601, in run_forever
Apr 16 16:21:11 ip-10-4-7-226 cloud-init[1004]: #011self._run_once()
Apr 16 16:21:11 ip-10-4-7-226 cloud-init[1004]:   File "/opt/conda/envs/coiled/lib/python3.9/asyncio/base_events.py", line 1869, in _run_once
Apr 16 16:21:11 ip-10-4-7-226 cloud-init[1004]: #011event_list = self._selector.select(timeout)
Apr 16 16:21:11 ip-10-4-7-226 cloud-init[1004]:   File "/opt/conda/envs/coiled/lib/python3.9/selectors.py", line 469, in select
Apr 16 16:21:11 ip-10-4-7-226 cloud-init[1004]: #011fd_event_list = self._selector.poll(timeout, max_ev)
Apr 16 16:21:11 ip-10-4-7-226 cloud-init[1004]: 2022-04-16 16:21:06,237 - distributed.nanny - INFO - Closing Nanny at 'tls://10.4.7.226:36233'. Report closure to scheduler: None

@mrocklin
Copy link
Member

yeah, so the repeated thing is the plugin. I'm asking it to report what the main event loop thread is doing every 10s. My experience with the downloaded logs is that once the worker seems to stall, those logs stop coming. Is what you're saying that it is still consistently printing every ten seconds? Or is there possibly a gap of a few minutes? If it's consistently printing every ten seconds then what is happening that they're no longer showing up in the downloaded logs?

@mrocklin
Copy link
Member

I don't ask because I care about the downloadable logs (although I do) I ask because something is happening at that time that is causing the whole worker to appear to freeze.

@gjoseph92
Copy link
Collaborator

Confirmed. I've run the reproducer on #6174 and workers aren't locking up anymore. So that's clearly the problem, but I'm not sure yet what the best solution is.

Blocking the event loop like so to prevent more computations or fetches from running doesn't seem like the best long-term strategy though? Especially once disk access becomes async, this won't work. Related:

Maybe we'd want to temporarily pause the worker (once pausing includes pausing gather_deps) until all the evictions have processed?

Also, though distributed can clearly be better about preventing workers from getting into this high-memory state (where the OS is probably flailing so much that nothing can run?), it's always possible that user code could do this on its own. So we should be more robust to it. That includes #6148 and #6159 (comment), but also looking into why the nanny doesn't/can't kill the worker in this situation.

Perhaps Nannies should use ulimits on the child processes, instead of relying on the application-level periodic callback for termination?

@mrocklin
Copy link
Member

mrocklin commented Apr 22, 2022 via email

@mrocklin
Copy link
Member

I would be curious how this would react if we weren't writing to EBS volumes, but were instead writing to SSDs.

@ntabris can you provide an example coiled command to get a machine with SSDs? Then maybe @gjoseph92 could you try with those? Do you have any reason to think that they might behave differently? I'm curious why this comes up on Coiled but not as often on other systems.

@ntabris
Copy link
Contributor

ntabris commented Apr 22, 2022

Something like ClusterBeta(n_workers=2, worker_vm_types=["i3.large"]) or (i3.xlarge) should work. Those each have large NVMe drive that will be used for dask temp storage. i3.large is 2 vCPU, 15.25GiB mem; i3.xlarge is 4 vCPU, 30.5GiB.

@mrocklin
Copy link
Member

mrocklin commented Apr 22, 2022 via email

@gjoseph92
Copy link
Collaborator

Thanks! (Note that testing this will require tweaking the size of the dataframes to ensure data is still being spilled—don't assume that just because the script works on an i3.large, that NVMe resolves the problem. As @jrbourbeau found, getting this OOM hang to occur is pretty dependent on machine and data size.)

@gjoseph92
Copy link
Collaborator

@fjetter for posterity & future searches, could you rename this to "Computation deadlocks due to worker rapidly running out of memory instead of spilling" or something like that, now that we've found a root cause?

Also, to summarize:

@mrocklin mrocklin changed the title Computation deadlocks Computation deadlocks due to worker rapidly running out of memory instead of spilling Apr 25, 2022
@mrocklin
Copy link
Member

@fjetter for posterity & future searches, could you rename this to "Computation deadlocks due to worker rapidly running out of memory instead of spilling" or something like that, now that we've found a root cause?

Done

@gjoseph92
Copy link
Collaborator

gjoseph92 commented Apr 25, 2022

Update here others might find interesting: the lock-up likely is caused by swapping, but not in the way you'd think. It points to #6177 being a very worthwhile thing to do.

#4345 (comment) got my curious: yeah, why isn't the kernel's OOMKiller getting involved here and just killing the Python process that's using too much memory?

I spun up a t3.medium node with Coiled (what we were using here) and SSHd on. First interesting thing: free reports 0B of swap available. So the heap of our Python processes aren't actually going to get swapped out.

Let's see what happens when we use up some memory (based on https://unix.stackexchange.com/a/254976). free is reporting we have 3.2Gi available between free RAM and purgeable cache pages:

ubuntu@ip-10-0-9-34:~$ cat <( </dev/zero head -c 3GiB) <(sleep 10) | tail
# 3GiB goes fine, unsurprisingly.

ubuntu@ip-10-0-9-34:~$ cat <( </dev/zero head -c 4GiB) <(sleep 10) | tail
Killed
# 4GiB is more than the total physically available space. So this also goes well.

ubuntu@ip-10-0-9-34:~$ cat <( </dev/zero head -c 3200MiB) <(sleep 10) | tail
# This has now been unresponsive for 12min!
# My SSH connection is still up, but tmux panes are unresponsive.
# Clearly this is very locked up, because the 10-sec timeout isn't even working

It turns out the OOMKiller is very conservative and doesn't kill things even when system performance is extremely degraded, so long as there are any pages it can reclaim. And even without a swap partition, the kernel can and will still swap pages to disk if they were on disk to begin with—such as executable code. (See https://askubuntu.com/a/432827 for an explanation.) Really, this just means the kernel is willing to remove almost everything from the disk cache, so executing new instructions in the Python interpreter or any other executables means constantly reloading pages from disk. It's all still running... just really... really... slowly.

Basically, getting into the "almost all physical memory is used, but not quite all of it" terrain leads to very bad behavior. People complain/rant about this a bit: https://unix.stackexchange.com/questions/373312/oom-killer-doesnt-work-properly-leads-to-a-frozen-os, https://serverfault.com/questions/390623/how-do-i-prevent-linux-from-freezing-when-out-of-memory, etc. There doesn't seem to be a good answer for how to avoid this in linux generically.

But we don't have to worry about the generic case. For dask, we can reasonably expect that the worker process tree is the only thing allocating significant amounts of memory. If we just set hard memory limits at the OS level (#6177) and don't let dask go above 95% of physically available memory (our current default), we should be able to terminate before we get into this thrashing scenario.

And if we don't set hard limits at the OS level (we currently don't), we should have no reason to believe that we'd be able to successfully prevent or recover from this via our application-level memory monitor. Prevention is the only option, because once we get into this state, no userland processes can run effectively.

@mrocklin
Copy link
Member

Whoa, nice find. The cat-sleep example is a super-clear demonstration.

I'm open to thinking about cgroups at the Dask level. If that doesn't work for some reason, we could probably also consider handling this on the Coiled side. A couple of thoughts:

  1. @ntabris would it make sense to direct the OS to treat EBS as swap? (or is this a bad idea / too painful)
  2. We could run the dask-spec process with cgroup limits as well (might as well do this regardless?)

@gjoseph92
Copy link
Collaborator

We could run the dask-spec process with cgroup limits as well (might as well do this regardless?)

Coiled actually used to do this (via Docker), but it was removed a while ago so that if you got heterogeneous node sizes, you'd fully use the memory on all of them (why this had to be removed is a bit of a Coiled implementation detail though).

But I think Dask should do this. We already offer a memory limit parameter; this would just be an implementation fix to make it work correctly.

@ntabris
Copy link
Contributor

ntabris commented Apr 26, 2022

would it make sense to direct the OS to treat EBS as swap? (or is this a bad idea / too painful)

I haven't tried this but it sounds like it's not recommended. In my mind, one big reason to be in the cloud is that it's really easy to scale up if you (say) need more memory. Is there a reason someone would prefer slow disk swap rather than instance with more memory? (Maybe if they only needed swap space very occasionally? Or didn't realize they'd need more memory but don't want their workers to die?)

@fjetter
Copy link
Member Author

fjetter commented Apr 26, 2022

Great find @gjoseph92 .

Just to summarize what I understand. We saw this issue appear in the 2022.4.? release but didn't in 2022.1.1 and we bisected and found #6189 The working theory is that #6189 indeed allows us to spill data faster which helps us to avoid the swap scenario

@crusaderky
Copy link
Collaborator

I ran the stress test.
Main git tip deadlocks in less than 5 minutes.
#6189 + #6195 have been running for 30 minutes and seem to be stable.

As a side note, that's a lot of unmanaged memory. Screenshot from #6189 + #6195:

Screenshot from 2022-04-27 11-11-01
Do we understand what it is? Is it because of the shuffle?

@gjoseph92
Copy link
Collaborator

that's a lot of unmanaged memory

Maybe #5971? I'd be curious what it looks like setting the env var manually on the cluster (easy with Coiled). I'd be unsurprised to hear it's something else though.

@avriiil
Copy link
Contributor

avriiil commented Jun 13, 2022

I'm running into a similar issue here where a dask-geopandas computation is 'deadlocking' without error messages. A worker is running out of memory and the deadlocking task is getting shipped around workers. More info here: geopandas/dask-geopandas#197

@mrocklin
Copy link
Member

mrocklin commented Oct 11, 2022 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
deadlock The cluster appears to not make any progress
Projects
None yet
8 participants