Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KeyError: ('ready', 'memory') #4721

Closed
mdering opened this issue Apr 20, 2021 · 8 comments
Closed

KeyError: ('ready', 'memory') #4721

mdering opened this issue Apr 20, 2021 · 8 comments

Comments

@mdering
Copy link

mdering commented Apr 20, 2021

What happened:

ERROR - 2021-04-20 08:43:37,220 - distributed.worker.execute.l2800 - ('ready', 'memory')
Traceback (most recent call last):
  File "\miniconda3\lib\site-packages\distributed\worker.py", line 2765, in execute
    self.transition(ts, "memory", value=value)
  File "\miniconda3\lib\site-packages\distributed\worker.py", line 1584, in transition
    func = self._transitions[start, finish]
KeyError: ('ready', 'memory')

We see this key error occasionally during execution of our application. I checked through the worker code and theres already a function defined for this transition, its just not in the _transitions table. Is this an oversight?

def transition_ready_memory(self, ts, value=None):

What you expected to happen:
I would expect this to not happen

Minimal Complete Verifiable Example:

I don't have one, we just see it sometimes. I'm opening this issue just to see if this is an oversight or not.

Anything else we need to know?:

Environment:

  • Dask version: 2021.04.0
  • Python version: 3.7
  • Operating System: windows
  • Install method (conda, pip, source): conda
@fjetter
Copy link
Member

fjetter commented Apr 20, 2021

Thanks for the report. We've seen issues around this lately. There is a bigger issue open #4413

I believe for this particular instance it indeed looks like an oversight. You don't happen to have a minimal example which reproduces this error, do you?

@gforsyth
Copy link
Contributor

Looks like I missed out adding it to the _transitions table -- thanks for flagging it @mdering

@mdering
Copy link
Author

mdering commented Apr 20, 2021

I'm sorry, I don't. We have a largeish application and we just see this in the logs sometimes, the error message doesn't even tell me which future it is so I can't deduce anything about it from that either. I appreciate the quick responses!

gforsyth added a commit to gforsyth/distributed that referenced this issue Apr 21, 2021
This should remove the error message in dask#4721 but is more of a band-aid
than a fix.  Ready->memory transitions shouldn't happen, but since they
do occassionally crop up, we might as well dispatch them appropriately
until the worker state machine is ready.
fjetter pushed a commit that referenced this issue Apr 22, 2021
* Add "ready->memory" to transitions in worker

This should remove the error message in #4721 but is more of a band-aid
than a fix.  Ready->memory transitions shouldn't happen, but since they
do occassionally crop up, we might as well dispatch them appropriately
until the worker state machine is ready.

* Default to `no_value`

* Update distributed/worker.py

Co-authored-by: James Bourbeau <[email protected]>

Co-authored-by: James Bourbeau <[email protected]>
@mdering
Copy link
Author

mdering commented May 6, 2021

Hi everyone, I was able to reproduce this finally. Our experience with dask had lead us to notice that some of the workers would jam up inexplicably, and this would prevent our application from proceeding. These workers would be given tasks, but the tasks would never start nor be completed. This lead me to use a method based on calling reschedule on the scheduler, if a key took too long to progress. This seems to have mostly worked, though the workers eventually end up in strange states. I may raise a different issue about that based on the feedback I get here.

Anyway, on the example

import distributed
import time
client = distributed.Client("127.0.0.1:8786") # i have a scheduler with 4 workers here locally
def square(x):
    return x ** 2
def neg(x):
    return -x
def test_wait(x):
    for i in range(20):
        time.sleep(1)
    return x
def reschedule_keys(dask_scheduler=None, keys=None):
    for k in keys:
        try:
            dask_scheduler.reschedule(key=k)
        except Exception:
            import traceback
            traceback.print_exc()
for i in range(20):
    A = client.map(test_wait, range(10))
    B = client.map(square, A)
    C = client.map(neg, B)
    time.sleep(2)
    client.run_on_scheduler(reschedule_keys, keys=[a.key for a in A])
    processing = client.processing()
    stacks = client.call_stack()
    for w in processing.keys():
        if not set(stacks[w]).issubset(set(processing[w])):
            print(stacks[w].keys(), processing[w])

this gives me two sets of errors in the worker logs, seen below.

Traceback (most recent call last):
  File "/anaconda3/envs/dra/lib/python3.7/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/anaconda3/envs/dra/lib/python3.7/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/anaconda3/envs/dra/lib/python3.7/site-packages/distributed/worker.py", line 957, in heartbeat
    for key in self.active_threads.values()
  File "/anaconda3/envs/dra/lib/python3.7/site-packages/distributed/worker.py", line 958, in <dictcomp>
    if key in self.tasks
TypeError: unsupported operand type(s) for -: 'float' and 'NoneType'
ERROR - 2021-05-06 10:15:48 - distributed.worker.execute.l2800 - ('ready', 'memory')
Traceback (most recent call last):
  File "//anaconda3/envs/dra/lib/python3.7/site-packages/distributed/worker.py", line 2765, in execute
    self.transition(ts, "memory", value=value)
  File "/anaconda3/envs/dra/lib/python3.7/site-packages/distributed/worker.py", line 1584, in transition
    func = self._transitions[start, finish]
KeyError: ('ready', 'memory')

I know this use of reschedule is probably off-label, but I wanted to follow up with a working example so you can take a better look. This script also will show whenever a workers processing queue and call stack get out of sync, which is I believe the root cause of this issue where the workers are accepting new tasks but no longer processing. Please let me know if you think this other issue (where workers keep accepting tasks but no longer run them) is worth opening as well.

@mdering
Copy link
Author

mdering commented May 7, 2021

hi @fjetter i just wanted to make sure you saw this

@fjetter
Copy link
Member

fjetter commented May 7, 2021

Thanks for the example. Unfortunately, it does not reproduce the issue on my machine. That's very likely an issue of speed since these issues are often extremely timing sensitive race conditions. That's also the reason why we didn't catch this regression reliably in CI, so far

@mdering
Copy link
Author

mdering commented May 7, 2021

maybe try upping the number of cycles, and size of A, and decreasing the sleep time before the reschedule?

import distributed
import time
client = distributed.Client("127.0.0.1:8786") # i have a scheduler with 4 workers here locally
def square(x):
    return x ** 2
def neg(x):
    return -x
def test_wait(x):
    for i in range(20):
        time.sleep(1)
    return x
def reschedule_keys(dask_scheduler=None, keys=None):
    for k in keys:
        try:
            dask_scheduler.reschedule(key=k)
        except Exception:
            import traceback
            traceback.print_exc()
for i in range(50):
    A = client.map(test_wait, range(100))
    B = client.map(square, A)
    C = client.map(neg, B)
    time.sleep(1)
    client.run_on_scheduler(reschedule_keys, keys=[a.key for a in A])
    processing = client.processing()
    stacks = client.call_stack()
    for w in processing.keys():
        if not set(stacks[w]).issubset(set(processing[w])):
            print(stacks[w].keys(), processing[w])

@adonig
Copy link

adonig commented Jul 24, 2021

I'm not sure whether this is related but after upgrading to 2021.07.01 I observed this error:

distributed.utils - ERROR - ('fetch', 'memory')
Traceback (most recent call last):
  File "venv/lib/python3.9/site-packages/distributed/utils.py", line 638, in log_errors
    yield
  File "venv/lib/python3.9/site-packages/distributed/worker.py", line 2411, in gather_dep
    self.transition(ts, "memory", value=data[d])
  File "venv/lib/python3.9/site-packages/distributed/worker.py", line 1692, in transition
    func = self._transitions[start, finish]
KeyError: ('fetch', 'memory')
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f26cbb88910>>, <Task finished name='Task-642' coro=<Worker.gather_dep() done, defined at venv/lib/python3.9/site-packages/distributed/worker.py:2267> exception=KeyError(('fetch', 'memory'))>)
Traceback (most recent call last):
  File "venv/lib/python3.9/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "venv/lib/python3.9/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "venv/lib/python3.9/site-packages/distributed/worker.py", line 2411, in gather_dep
    self.transition(ts, "memory", value=data[d])
  File "venv/lib/python3.9/site-packages/distributed/worker.py", line 1692, in transition
    func = self._transitions[start, finish]
KeyError: ('fetch', 'memory')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants