Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add back Worker.transition_fetch_missing #6112

Merged
merged 8 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion distributed/tests/test_cancelled_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ async def wait_and_raise(*args, **kwargs):

b_story = b.story(fut1.key)
assert any("receive-dep-failed" in msg for msg in b_story)
assert any("missing-dep" in msg for msg in b_story)
assert any("cancelled" in msg for msg in b_story)
assert any("resumed" in msg for msg in b_story)

Expand Down
21 changes: 21 additions & 0 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3389,3 +3389,24 @@ async def test_tick_interval(c, s, a, b):
while s.workers[a.address].metrics["event_loop_interval"] < 0.100:
await asyncio.sleep(0.01)
time.sleep(0.200)


class BreakingWorker(Worker):
broke_once = False

def get_data(self, comm, **kwargs):
if not self.broke_once:
self.broke_once = True
raise OSError("fake error")
return super().get_data(comm, **kwargs)


@pytest.mark.slow
@gen_cluster(client=True, Worker=BreakingWorker)
async def test_broken_comm(c, s, a, b):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am somewhat ok with this test, since it does reliably trigger the behavior. But I think @fjetter was hoping to see a more minimized case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with that desire. I encourage folks to work on that. I think that this suffices.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test does not reliably trigger the condition for me. I do hit it but it is not deterministic

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can increase the data volume and it will become more and more likely. I don't have a deterministic test. I think that it would be good to have. I think that this suffices though.

df = dask.datasets.timeseries(
start="2000-01-01",
end="2000-01-10",
)
s = df.shuffle("id", shuffle="tasks")
await c.compute(s.size)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kudos to @gjoseph92 and @nils-braun for the test

18 changes: 17 additions & 1 deletion distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ def __init__(
("executing", "released"): self.transition_executing_released,
("executing", "rescheduled"): self.transition_executing_rescheduled,
("fetch", "flight"): self.transition_fetch_flight,
("fetch", "missing"): self.transition_fetch_missing,
("fetch", "released"): self.transition_generic_released,
("flight", "error"): self.transition_flight_error,
("flight", "fetch"): self.transition_flight_fetch,
Expand Down Expand Up @@ -1929,6 +1930,14 @@ def transition_flight_missing(
ts.done = False
return {}, []

def transition_fetch_missing(
self, ts: TaskState, *, stimulus_id: str
) -> RecsInstrs:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see some assertions about ts in here. Likely at least

assert not ts.done  # ??? really not sure about this. I find `done` confusingly named.
if self.validate:
    assert ts.state == "fetch"
    assert not ts.who_has

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in validate_task_fetch already, which gets called at the end of every transitions call. I think that we're safe here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean validate_task_missing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both validation methods assert the correct (I think) who_has state

    def validate_task_fetch(self, ts):
        assert ts.key not in self.data
        assert self.address not in ts.who_has
        assert not ts.done
        assert ts in self.data_needed
        assert ts.who_has

        for w in ts.who_has:
            assert ts.key in self.has_what[w]
            assert ts in self.pending_data_per_worker[w]

    def validate_task_missing(self, ts):
        assert ts.key not in self.data
        assert not ts.who_has
        assert not ts.done
        assert not any(ts.key in has_what for has_what in self.has_what.values())
        assert ts in self._missing_dep_flight

So after this transition is called, the validate_task_missing method will be called, and verify that not ts.who_has

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrm, it looks like we are asserting the incoming state by default though.

This seems a little odd to me given how few lines of code there are in between getting the state and choosing this method (below for convenience)

        start = ts.state
        func = self._transitions_table.get((start, cast(str, finish)))

I'd be happy to add it for now as convention is you like

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like it's inconsistently done. I'm going to pass on this one unless you feel strongly about it. If you do speak up and I'll add it. I'm probably hesitating here just because it feels weird to keep doing something just because we've been doing it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly suggest not to rely on these validate calls. They are helpful but do not replace testing. They raise an exception. The exception is lost in tornado and the only thing we see is an error log. Sometimes that causes the tests to get stuck but it's not reliable.
I haven't seen these problems recently but it's been a big problem a few months ago

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly suggest not to rely on these validate calls. They are helpful but do not replace testing.

We should talk about this in more depth if you want to rely less on them. Validation testing has, historically, been invaluable in maintaining correctness and stability.

This is still testing. The differnce now is that the tests themselves trigger certain behaviors, and assertions are checked in a more systematic way. It is, I think, a better way of verifying state than explicitly checking state in every test. This would be, I think, an inefficient way of writing our tests.

The exception is lost in tornado and the only thing we see is an error log. Sometimes that causes the tests to get stuck but it's not reliable

If tests are passing even when these are not then that's certainly an issue and we should address it quickly. You might not be saying this though. If these aren't as ergonomic as we'd like then let's see if we can make them more ergonomic.

Alternatively, if we have a good alternative for the validation methods then I'm happy to engage. I would be -1 to putting explicit state testing at this into all of the tests though. I'm curious to learn what other alternatives there might be. Could I ask you to raise an issue with thoughts and we can move the conversation there?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you, they are useful and I don't want to get rid of it. I'm just saying that we cannot blindly rely on them at this point in time. The way our concurrency model with tornado works is that AssertionError are just lost and logged. Sometimes that causes a worker to deadlock which then is a good thing because the test times and fails. However, it depends on where this assert is exactly called and relying on this "implicit deadlock" is not great.

To counter this, I proposed #4735 a while ago which proposes to log an exception and close the worker if a transition error occurs. I believe this would be a drastic behaviour but still a sane one, even for production. If anything goes wrong during state transitions, we should throw the worker away and rely on the scheduler to clean up the mess.
Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only run in validation mode in tests anyway, so I'm totally fine with it.

ts.state = "missing"
self._missing_dep_flight.add(ts)
ts.done = False
return {}, []

def transition_released_fetch(
self, ts: TaskState, *, stimulus_id: str
) -> RecsInstrs:
Expand Down Expand Up @@ -2671,6 +2680,9 @@ def ensure_communicating(self) -> None:
if ts.state != "fetch":
continue

if self.validate:
assert ts.who_has

workers = [w for w in ts.who_has if w not in self.in_flight_workers]
if not workers:
assert ts.priority is not None
Expand Down Expand Up @@ -2999,7 +3011,11 @@ async def gather_dep(
for d in has_what:
ts = self.tasks[d]
ts.who_has.remove(worker)

if not ts.who_has:
recommendations[ts] = "missing"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging here might be helpful for future debugging. Probably shouldn't call it missing-dep, so it can be differentiated from the finally case, but something in the same spirit.

It also might be worth a comment on why we don't send missing-data to the scheduler in this case, but do send it in the other case of a missing dep. (Because in this case, we don't know whether the dep is actually missing from the worker—we're just preemptively assuming it is because we're assuming the whole worker has died, but we don't want to send the scheduler wrong information based on that assumption.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added logging. I haven't added the comment. I felt that I didn't understand the reasoning here sufficiently well to argue for one way or the other.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two things were this code path becomes relevant

  1. Worker is actually dead. The way we implement this handler here acts as if the worker was dead. We basically purge the entire info we know about this worker because we don't expect him to come back.

The scheduler will detect the dead worker eventually and reschedule the task. By not sending the missing-data signal we open ourselves to a minor but harmless race condition where the scheduler distributes "faulty" who_has to other workers for a brief time such that multiple workers may run into this OSError. That's unfortunate from a performance perspective, particularly if they configure very high connect timeouts. The "correct" way would be to send the signal but it shouldn't matter a lot in the end.

  1. There was a network blip. We should actually not do anything in case of a network blip but just retry (w/ backoff). Sending a missing-data in this case might actually be very harmful since the scheduler removes this worker then from its who_has and the worker will never receive the "free-keys" signal, i.e. we'd acquire zombie tasks.

We currently cannot distinguish 1. and 2. so we need to find a middle ground. Purging data is safe because we can reacquire this information. Sending missing-data in the wrong situation has the potential for being unsafe so we should not do it and live with this very minor race condition mentioned above

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that the fetch->missing transition has been removed, I don't understand how this will work. Not all tasks in has_what are guaranteed to be in flight.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently cannot distinguish 1. and 2. so we need to find a middle ground. Purging data is safe because we can reacquire this information. Sending missing-data in the wrong situation has the potential for being unsafe so we should not do it and live with this very minor race condition mentioned above

This explanation is basically what I was looking for in the comment.

self.log.append(
("missing-who-has", worker, ts.key, stimulus_id, time())
)
except Exception as e:
logger.exception(e)
if self.batched_stream and LOG_PDB:
Expand Down