-
-
Notifications
You must be signed in to change notification settings - Fork 720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add back Worker.transition_fetch_missing #6112
Conversation
@fjetter I'd like your review on this if possible. |
end="2000-01-10", | ||
) | ||
s = df.shuffle("id", shuffle="tasks") | ||
await c.compute(s.size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kudos to @gjoseph92 and @nils-braun for the test
|
||
@pytest.mark.slow | ||
@gen_cluster(client=True, Worker=BreakingWorker) | ||
async def test_broken_comm(c, s, a, b): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am somewhat ok with this test, since it does reliably trigger the behavior. But I think @fjetter was hoping to see a more minimized case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with that desire. I encourage folks to work on that. I think that this suffices.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test does not reliably trigger the condition for me. I do hit it but it is not deterministic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can increase the data volume and it will become more and more likely. I don't have a deterministic test. I think that it would be good to have. I think that this suffices though.
@@ -1929,6 +1930,14 @@ def transition_flight_missing( | |||
ts.done = False | |||
return {}, [] | |||
|
|||
def transition_fetch_missing( | |||
self, ts: TaskState, *, stimulus_id: str | |||
) -> RecsInstrs: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to see some assertions about ts
in here. Likely at least
assert not ts.done # ??? really not sure about this. I find `done` confusingly named.
if self.validate:
assert ts.state == "fetch"
assert not ts.who_has
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in validate_task_fetch
already, which gets called at the end of every transitions call. I think that we're safe here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean validate_task_missing
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both validation methods assert the correct (I think) who_has state
def validate_task_fetch(self, ts):
assert ts.key not in self.data
assert self.address not in ts.who_has
assert not ts.done
assert ts in self.data_needed
assert ts.who_has
for w in ts.who_has:
assert ts.key in self.has_what[w]
assert ts in self.pending_data_per_worker[w]
def validate_task_missing(self, ts):
assert ts.key not in self.data
assert not ts.who_has
assert not ts.done
assert not any(ts.key in has_what for has_what in self.has_what.values())
assert ts in self._missing_dep_flight
So after this transition is called, the validate_task_missing method will be called, and verify that not ts.who_has
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrm, it looks like we are asserting the incoming state by default though.
This seems a little odd to me given how few lines of code there are in between getting the state and choosing this method (below for convenience)
start = ts.state
func = self._transitions_table.get((start, cast(str, finish)))
I'd be happy to add it for now as convention is you like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like it's inconsistently done. I'm going to pass on this one unless you feel strongly about it. If you do speak up and I'll add it. I'm probably hesitating here just because it feels weird to keep doing something just because we've been doing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I strongly suggest not to rely on these validate calls. They are helpful but do not replace testing. They raise an exception. The exception is lost in tornado and the only thing we see is an error log. Sometimes that causes the tests to get stuck but it's not reliable.
I haven't seen these problems recently but it's been a big problem a few months ago
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I strongly suggest not to rely on these validate calls. They are helpful but do not replace testing.
We should talk about this in more depth if you want to rely less on them. Validation testing has, historically, been invaluable in maintaining correctness and stability.
This is still testing. The differnce now is that the tests themselves trigger certain behaviors, and assertions are checked in a more systematic way. It is, I think, a better way of verifying state than explicitly checking state in every test. This would be, I think, an inefficient way of writing our tests.
The exception is lost in tornado and the only thing we see is an error log. Sometimes that causes the tests to get stuck but it's not reliable
If tests are passing even when these are not then that's certainly an issue and we should address it quickly. You might not be saying this though. If these aren't as ergonomic as we'd like then let's see if we can make them more ergonomic.
Alternatively, if we have a good alternative for the validation methods then I'm happy to engage. I would be -1 to putting explicit state testing at this into all of the tests though. I'm curious to learn what other alternatives there might be. Could I ask you to raise an issue with thoughts and we can move the conversation there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you, they are useful and I don't want to get rid of it. I'm just saying that we cannot blindly rely on them at this point in time. The way our concurrency model with tornado works is that AssertionError
are just lost and logged. Sometimes that causes a worker to deadlock which then is a good thing because the test times and fails. However, it depends on where this assert is exactly called and relying on this "implicit deadlock" is not great.
To counter this, I proposed #4735 a while ago which proposes to log an exception and close the worker if a transition error occurs. I believe this would be a drastic behaviour but still a sane one, even for production. If anything goes wrong during state transitions, we should throw the worker away and rely on the scheduler to clean up the mess.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only run in validation mode in tests anyway, so I'm totally fine with it.
distributed/worker.py
Outdated
@@ -2671,6 +2680,10 @@ def ensure_communicating(self) -> None: | |||
if ts.state != "fetch": | |||
continue | |||
|
|||
if not ts.who_has: | |||
self.transition(ts, "missing", stimulus_id=stimulus_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like having this safety net. I still think it would also be appropriate to add
diff --git a/distributed/worker.py b/distributed/worker.py
index 7a062876..5e72f007 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -2991,6 +2991,9 @@ class Worker(ServerNode):
for d in has_what:
ts = self.tasks[d]
ts.who_has.remove(worker)
+ if not ts.who_has:
+ # TODO send `missing-data` to scheduler?
+ recommendations[ts] = "missing"
except Exception as e:
logger.exception(e)
In this case where we know we're making a task missing, it seems better to immediately transition it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll take a look and try to incorporate this shortly. Thank you for the suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the suggestion. I've verified that this independently fixes the problem and pushed up this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like these "safety nets". We had "safety nets" all over the place that not only covered up actually severe problems but also made everything much more complicated than necessary, harder to debug, harder to understand, etc.
This is essentially uncovered, dead code. If we ever hit this line something went wrong. If something goes wrong, we should raise and not try to guess what might be a good resolution.
I prefer the fix in gather_dep over this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm happy to remove this. Should I add in a if self.validate:
check here? That would have caught things previously.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the validate instead of the transition would be just fine. I do believe this is the only way one could even trigger the transition_fetch_missing transition and I believe we should get rid of it as well
@@ -2999,6 +3012,8 @@ async def gather_dep( | |||
for d in has_what: | |||
ts = self.tasks[d] | |||
ts.who_has.remove(worker) | |||
if not ts.who_has: | |||
recommendations[ts] = "missing" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logging here might be helpful for future debugging. Probably shouldn't call it missing-dep
, so it can be differentiated from the finally
case, but something in the same spirit.
It also might be worth a comment on why we don't send missing-data
to the scheduler in this case, but do send it in the other case of a missing dep. (Because in this case, we don't know whether the dep is actually missing from the worker—we're just preemptively assuming it is because we're assuming the whole worker has died, but we don't want to send the scheduler wrong information based on that assumption.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added logging. I haven't added the comment. I felt that I didn't understand the reasoning here sufficiently well to argue for one way or the other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two things were this code path becomes relevant
- Worker is actually dead. The way we implement this handler here acts as if the worker was dead. We basically purge the entire info we know about this worker because we don't expect him to come back.
The scheduler will detect the dead worker eventually and reschedule the task. By not sending the missing-data
signal we open ourselves to a minor but harmless race condition where the scheduler distributes "faulty" who_has to other workers for a brief time such that multiple workers may run into this OSError. That's unfortunate from a performance perspective, particularly if they configure very high connect timeouts. The "correct" way would be to send the signal but it shouldn't matter a lot in the end.
- There was a network blip. We should actually not do anything in case of a network blip but just retry (w/ backoff). Sending a missing-data in this case might actually be very harmful since the scheduler removes this worker then from its who_has and the worker will never receive the "free-keys" signal, i.e. we'd acquire zombie tasks.
We currently cannot distinguish 1. and 2. so we need to find a middle ground. Purging data is safe because we can reacquire this information. Sending missing-data
in the wrong situation has the potential for being unsafe so we should not do it and live with this very minor race condition mentioned above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that the fetch->missing
transition has been removed, I don't understand how this will work. Not all tasks in has_what
are guaranteed to be in flight.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We currently cannot distinguish 1. and 2. so we need to find a middle ground. Purging data is safe because we can reacquire this information. Sending missing-data in the wrong situation has the potential for being unsafe so we should not do it and live with this very minor race condition mentioned above
This explanation is basically what I was looking for in the comment.
(this relies on internal state, I'm not too worried about it)
@fjetter if you have time tomorrow this could use your reivew. If you're ok with it (or even ok with mild reservations) I encourage you to hit the green button. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to not introduce the fetch->missing transition since it is not supposed to be there. Whenever this transition would be hit, our state was already corrupted.
The proper fix is the exception handler in gather_dep. What bothers me is that I'm pretty sure I already fixed this exact problem before. (the finally clause only iterates over fetched tasks, not all tasks). I'll try to find the PR and want to figure out why this is no tested
@@ -1929,6 +1930,14 @@ def transition_flight_missing( | |||
ts.done = False | |||
return {}, [] | |||
|
|||
def transition_fetch_missing( | |||
self, ts: TaskState, *, stimulus_id: str | |||
) -> RecsInstrs: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I strongly suggest not to rely on these validate calls. They are helpful but do not replace testing. They raise an exception. The exception is lost in tornado and the only thing we see is an error log. Sometimes that causes the tests to get stuck but it's not reliable.
I haven't seen these problems recently but it's been a big problem a few months ago
distributed/worker.py
Outdated
@@ -2671,6 +2680,10 @@ def ensure_communicating(self) -> None: | |||
if ts.state != "fetch": | |||
continue | |||
|
|||
if not ts.who_has: | |||
self.transition(ts, "missing", stimulus_id=stimulus_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like these "safety nets". We had "safety nets" all over the place that not only covered up actually severe problems but also made everything much more complicated than necessary, harder to debug, harder to understand, etc.
This is essentially uncovered, dead code. If we ever hit this line something went wrong. If something goes wrong, we should raise and not try to guess what might be a good resolution.
I prefer the fix in gather_dep over this
|
||
@pytest.mark.slow | ||
@gen_cluster(client=True, Worker=BreakingWorker) | ||
async def test_broken_comm(c, s, a, b): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test does not reliably trigger the condition for me. I do hit it but it is not deterministic
@@ -2999,6 +3012,8 @@ async def gather_dep( | |||
for d in has_what: | |||
ts = self.tasks[d] | |||
ts.who_has.remove(worker) | |||
if not ts.who_has: | |||
recommendations[ts] = "missing" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two things were this code path becomes relevant
- Worker is actually dead. The way we implement this handler here acts as if the worker was dead. We basically purge the entire info we know about this worker because we don't expect him to come back.
The scheduler will detect the dead worker eventually and reschedule the task. By not sending the missing-data
signal we open ourselves to a minor but harmless race condition where the scheduler distributes "faulty" who_has to other workers for a brief time such that multiple workers may run into this OSError. That's unfortunate from a performance perspective, particularly if they configure very high connect timeouts. The "correct" way would be to send the signal but it shouldn't matter a lot in the end.
- There was a network blip. We should actually not do anything in case of a network blip but just retry (w/ backoff). Sending a missing-data in this case might actually be very harmful since the scheduler removes this worker then from its who_has and the worker will never receive the "free-keys" signal, i.e. we'd acquire zombie tasks.
We currently cannot distinguish 1. and 2. so we need to find a middle ground. Purging data is safe because we can reacquire this information. Sending missing-data
in the wrong situation has the potential for being unsafe so we should not do it and live with this very minor race condition mentioned above
I spoke with @fjetter , made his requested changes. I plan to merge after tests pass. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to not introduce the fetch->missing transition since it is not supposed to be there.
@fjetter last time we talked, I thought you were in favor of doing this. What changed?
FWIW I also don't like having the transition, because I don't think we should be doing what we currently do in the OSError handler. (I don't think OSError is the appropriate signal to do what we're doing.) But if we're not going to remove that behavior, then we do need the transition, because the behavior there is that we're making tasks missing
which may be in fetch
.
distributed/worker.py
Outdated
|
||
if not ts.who_has: | ||
recommendations[ts] = "missing" | ||
logger.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, please not logger.info
, but self.log.append
. I want to see this in stories.
@@ -2999,6 +3012,8 @@ async def gather_dep( | |||
for d in has_what: | |||
ts = self.tasks[d] | |||
ts.who_has.remove(worker) | |||
if not ts.who_has: | |||
recommendations[ts] = "missing" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that the fetch->missing
transition has been removed, I don't understand how this will work. Not all tasks in has_what
are guaranteed to be in flight.
CI seems to agree wth @gjoseph92 Traceback (most recent call last):
File "d:\a\distributed\distributed\distributed\utils.py", line 693, in log_errors
yield
File "d:\a\distributed\distributed\distributed\worker.py", line 3065, in gather_dep
self.transitions(recommendations, stimulus_id=stimulus_id)
File "d:\a\distributed\distributed\distributed\worker.py", line 2582, in transitions
a_recs, a_instructions = self._transition(
File "d:\a\distributed\distributed\distributed\worker.py", line 2518, in _transition
raise InvalidTransition(
distributed.worker_state_machine.InvalidTransition: Impossible transition from fetch to missing for ('split-simple-shuffle-9a64a54870435a4b5560de9df9d2576e', 2, 6)
2022-04-13 15:06:03,744 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x000001ADCE761850>>, <Task finished name='Task-114657' coro=<Worker.gather_dep() done, defined at d:\a\distributed\distributed\distributed\worker.py:2932> exception=InvalidTransition("Impossible transition from fetch to missing for ('split-simple-shuffle-9a64a54870435a4b5560de9df9d2576e', 2, 6)")>) @fjetter as discussed, I plan to revert and keep the transition in. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both of you were right. Of course, the tasks we forgot to iterate over before are typically in state fetch and we need this transition.
I believe my reasoning about not having a missing-data
message is still correct.
Planning to merge once CI passes |
Fixes #5951
In #5653
we removed the fetch -> missing transition
This caused deadlocks.
Now we add it back in.
pre-commit run --all-files