Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KeyError: ('fetch', 'memory') in Dask 2017.7.2 (execution sometimes hangs) #5152

Closed
bsesar opened this issue Aug 2, 2021 · 7 comments · Fixed by #5157
Closed

KeyError: ('fetch', 'memory') in Dask 2017.7.2 (execution sometimes hangs) #5152

bsesar opened this issue Aug 2, 2021 · 7 comments · Fixed by #5157

Comments

@bsesar
Copy link

bsesar commented Aug 2, 2021

What happened: When running the code below, Dask reports the following error

distributed.utils - ERROR - ('fetch', 'memory')
Traceback (most recent call last):
  File "C:\Users\--\Anaconda3\envs\work2\lib\site-packages\distributed\utils.py", line 638, in log_errors
    yield
  File "C:\Users\--\Anaconda3\envs\work2\lib\site-packages\distributed\worker.py", line 2435, in gather_dep
    self.transition(ts, "memory", value=data[d])
  File "C:\Users\--\Anaconda3\envs\work2\lib\site-packages\distributed\worker.py", line 1716, in transition
    func = self._transitions[start, finish]
KeyError: ('fetch', 'memory')
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x0000024984112040>>, <Task finished name='Task-24082' coro=<Worker.gather_dep() done, defined at C:\Users\--\Anaconda3\envs\work2\lib\site-packages\distributed\worker.py:2291> exception=KeyError(('fetch', 'memory'))>)
Traceback (most recent call last):
  File "C:\Users\--\Anaconda3\envs\work2\lib\site-packages\tornado\ioloop.py", line 741, in _run_callback
    ret = callback()
  File "C:\Users\--\Anaconda3\envs\work2\lib\site-packages\tornado\ioloop.py", line 765, in _discard_future_result
    future.result()
  File "C:\Users\--\Anaconda3\envs\work2\lib\site-packages\distributed\worker.py", line 2435, in gather_dep
    self.transition(ts, "memory", value=data[d])
  File "C:\Users\--\Anaconda3\envs\work2\lib\site-packages\distributed\worker.py", line 1716, in transition
    func = self._transitions[start, finish]
KeyError: ('fetch', 'memory')

Sometimes the execution hangs with one task left, and sometimes it finishes. Changing the number of workers seems to affect the outcome (i.e., whether the code finishes or hangs).

In addition to the above error, I also get following errors

Traceback (most recent call last):
  File "C:\Users\--\Anaconda3\envs\work2\lib\asyncio\tasks.py", line 465, in wait_for
    fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\--\Anaconda3\envs\work2\lib\site-packages\distributed\comm\core.py", line 322, in connect
    await asyncio.wait_for(comm.write(local_info), time_left())
  File "C:\Users\--\Anaconda3\envs\work2\lib\asyncio\tasks.py", line 467, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\--\Anaconda3\envs\work2\lib\site-packages\distributed\worker.py", line 2341, in gather_dep
    response = await get_data_from_worker(
  File "C:\Users\--\Anaconda3\envs\work2\lib\site-packages\distributed\worker.py", line 3674, in get_data_from_worker
    return await retry_operation(_get_data, operation="get_data_from_worker")
  File "C:\Users\--\Anaconda3\envs\work2\lib\site-packages\distributed\utils_comm.py", line 385, in retry_operation
    return await retry(
  File "C:\Users\--\Anaconda3\envs\work2\lib\site-packages\distributed\utils_comm.py", line 370, in retry
    return await coro()
  File "C:\Users\--\Anaconda3\envs\work2\lib\site-packages\distributed\worker.py", line 3651, in _get_data
    comm = await rpc.connect(worker)
  File "C:\Users\--\Anaconda3\envs\work2\lib\site-packages\distributed\core.py", line 1051, in connect
    raise exc
  File "C:\Users\--\Anaconda3\envs\work2\lib\site-packages\distributed\core.py", line 1035, in connect
    comm = await fut
  File "C:\Users\--\Anaconda3\envs\work2\lib\site-packages\distributed\comm\core.py", line 326, in connect
    raise OSError(
OSError: Timed out during handshake while connecting to tcp://127.0.0.1:49177 after 30 s
distributed.worker - ERROR - Worker stream died during communication: tcp://127.0.0.1:49177
Traceback (most recent call last):
  File "C:\Users\--\Anaconda3\envs\work2\lib\asyncio\tasks.py", line 465, in wait_for
    fut.result()
asyncio.exceptions.CancelledError

distributed.utils_perf - WARNING - full garbage collections took 23% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 23% CPU time recently (threshold: 10%)
distributed.worker - WARNING - gc.collect() took 9.391s. This is usually a sign that some tasks handle too many Python objects at the same time. Rechunking the work into smaller tasks might help.
distributed.utils_perf - WARNING - full garbage collections took 20% CPU time recently (threshold: 10%)

EDIT - Added some additional errors that happen when running MCV code on fake data.

What you expected to happen: I did not expect such any errors to be reported.

Minimal Complete Verifiable Example:

from dask.distributed import Client
import dask.dataframe as dd
import numpy as np
import pandas as pd

Nrows = 195626892

# create fake data
df = pd.DataFrame(
  {'Date':np.random.choice(pd.date_range('2020-01-01', '2021-07-16'), Nrows),
   'Value':100*np.random.uniform(size=Nrows).astype('f8')},
  index=np.random.randint(1000128, 236822346, size=Nrows).astype('u8')
  )
df.index.name = 'Index'

client = Client()

# run Dask
ddf = dd.from_pandas(df, npartitions=53)
ddf = ddf.groupby(['Index', 'Date'])['Value'].count(split_out=8)
ddf = ddf.persist()

EDIT - Added MCV code that will hopefully reproduce the problem.

Anything else we need to know?: In #4721, a user reported that after upgrading to 2021.07.01 they started seeing the same error.

Environment:

  • Dask version: 2021.7.2
  • Python version: 3.8.10
  • Operating System: Window Server 2016 64-bit
  • Install method (conda, pip, source): conda
@bsesar
Copy link
Author

bsesar commented Aug 2, 2021

@gforsyth and @fjetter , is the above error indicative of a missing transition?

@bsesar
Copy link
Author

bsesar commented Aug 2, 2021

Once the MCV code finishes, Dask dashboard reports 13 GB of "unmanaged old" memory used by the cluster. Does this mean there is memory leak somewhere?

@fjetter
Copy link
Member

fjetter commented Aug 2, 2021

Minimal Complete Verifiable Example:

This example does not reproduce an error on my machine. Truth be told, it barely works since my machine is a bit too small for it. I am spending more time spilling than doing anything else.

is the above error indicative of a missing transition?

Yes and no. The KeyError is raised because this transition is not implemented. However, the problem is not the missing implementation but rather that this transition is not allowed to occur. This indicates an inconsistency in the worker state machine where a worker is trying to do two things simultaneously for a given task, e.g. compute and fetch. Long story short, this can cause such a transition and raise this exception. while not pretty, this should be harmless.

The title of your issue says "execution sometimes hangs" which is likely another problem.

Does this mean there is memory leak somewhere?

Might be the case that some workers never release their tasks properly. There is likely not a "true memory leak"

@fjetter
Copy link
Member

fjetter commented Aug 2, 2021

FWIW I could find an issue which can cause a deadlock associated with this exception. I'm working on a patch

@pentschev
Copy link
Member

FYI, I've also been seeing this issue intermittently on CuPy-backed workflows, I'll make sure to test the #5157 .

@pentschev
Copy link
Member

So far, I haven't seen this issue anymore after #5157 , it seems like it was resolved in my case. I'll report back should I experience it again. Thanks @fjetter for the quick fix!

@DahnJ
Copy link

DahnJ commented Oct 7, 2021

I have experienced this as well, tested on 2021.07.01 and 2021.09.01. My pipeline also sometimes hangs, not sure if it's related.

I am not planning to share a reproducible example, as my workflow is too complex to narrow the problem down easily, but I thought I'd still share the fact that I seem to get this error even after updating.

Error

distributed.utils - ERROR - ('fetch', 'memory')
Traceback (most recent call last):
  File "/root/miniconda3/lib/python3.9/site-packages/distributed/utils.py", line 638, in log_errors
    yield
  File "/root/miniconda3/lib/python3.9/site-packages/distributed/worker.py", line 2411, in gather_dep
    self.transition(ts, "memory", value=data[d])
  File "/root/miniconda3/lib/python3.9/site-packages/distributed/worker.py", line 1692, in transition
    func = self._transitions[start, finish]
KeyError: ('fetch', 'memory')
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f60aa93dcd0>>, <Task finished name='Task-1600' coro=<Worker.gather_dep() done, defined at /root/miniconda3/lib/python3.9/site-packages/distributed/worker.py:2267> exception=KeyError(('fetch', 'memory'))>)
Traceback (most recent call last):
  File "/root/miniconda3/lib/python3.9/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/root/miniconda3/lib/python3.9/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/root/miniconda3/lib/python3.9/site-packages/distributed/worker.py", line 2411, in gather_dep
    self.transition(ts, "memory", value=data[d])
  File "/root/miniconda3/lib/python3.9/site-packages/distributed/worker.py", line 1692, in transition
    func = self._transitions[start, finish]
KeyError: ('fetch', 'memory')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants