Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove cancelled, resumed, and long-running states #6844

Closed
wants to merge 1 commit into from

Conversation

crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented Aug 6, 2022

This is an even more aggressive redesign than #6699 and #6716.

Remove the long-running state. A seceded task is distinguished from an executing one exclusively by being in the WorkerState.long_running set instead of the executing set.

Remove the cancelled state. Instead, a cancelled task simply transitions to other states, while remaining in the executing, long_running, or in_flight_tasks sets. A task will not transition to forgotten for as long as it is in one of the three above sets.

Remove the resumed state. Instead,

  • If a task is recommended to transition to ready, but it is already in either the executing or the long_running sets, it transitions back to executing instead
  • If a task is recommended to transition to fetch, but it is already in the in_flight_tasks set, it transitions back to flight instead

All end events for Execute and GatherDep:

  • work as normal exclusively if the task is still in executing or flight state respectively
  • recommend a transition to forgotten if the task is in released state
  • otherwise, they do nothing besides cleaning up and kicking off the next ensure_computing/ensure_communicating.

At any given moment, there may be both an Execute and a GatherDep instruction for the same task, running at the same time.
If the Execute instruction finishes while the task is in flight, it will be a no-op, and vice versa. This means we no longer have to worry about mismatched end events.

Remove the previous, next, and done TaskState attributes.

TODO

  • Ensure all tickets above meet DoD
  • Failing tests in distributed/tests/test_cancelled_state.py

@crusaderky crusaderky self-assigned this Aug 6, 2022
@crusaderky
Copy link
Collaborator Author

crusaderky commented Aug 6, 2022

@fjetter as discussed. The implementation should be complete (save for bugs). An early review would be appreciated.
Note that, unlike #6716, it does not address #6705 and should not require any scheduler-side changes.

@crusaderky crusaderky requested a review from fjetter August 6, 2022 17:44
@github-actions
Copy link
Contributor

github-actions bot commented Aug 6, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±    0         15 suites  ±0   6h 57m 7s ⏱️ - 8m 48s
  3 021 tests +  21    2 919 ✔️ +11    82 💤  -   7    20 +  17 
22 351 runs  +105  21 262 ✔️ +70  967 💤  - 84  122 +119 

For more details on these failures, see this check.

Results for commit c5acdcd. ± Comparison against base commit 1d0701b.

♻️ This comment has been updated with latest results.

if ts.state == "flight":
ts.done = True
tasks.add(ts)
elif ts.state == "released":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can a task be released at this point? The point of having a cancelled state in the past was to avoid this since it can lead to a couple of inconsistencies. I'm concerned that we're opening ourselves to the same problems again.

Copy link
Collaborator Author

@crusaderky crusaderky Aug 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A task that is currently being served by GatherDep or Execute can have any state, except forgotten.
The key code is in
transition_executing_released (cancel executing)
transition_flight_released (cancel flight)
transition_waiting_ready (resume executing)
transition_generic_fetch (resume flight)
transition_released_forgotten (do not forget if currently in GatherDep or Execute)

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, the complexity reduction in this PR is a bit misleading. You are decreasing the number of TaskStateStates and are reducing LOC which at first glance looks as an objective reduction of complexity. However, the devil's in the detail.

The reduction of TaskStateStates causes the remaining states to gain a higher degree of degeneracy. For instance, on main, TSS flight clearly encodes a task that is currently in the process of being gathered from a remote peer. Further, the schedulers intention and the future flow of this task is clearly determined based on the result of the remote fetch. There is little to no ambiguity in any follow up decision, e.g. "gather dep failed -> Always do X".
On main, change of intended flow is encoded by transitioning the Task to a different TSS, e.g. cancelled. For this argument, let's stick to a "happy path" / "forward progress path", e.g. waiting->executing->cancelled-{ExecuteDoneEvent}->?
Given the state is cancelled, every ExecuteDoneEvent has a very well defined outcome. The control flow is unambiguous which leads to relatively easy code with few control branches, i.e. it favors low local code complexity in favor of higher system complexity.

Now, looking at a non-trivial control flow, e.g. waiting->executing->cancelled-{AcquireReplicasEvent|ComputeTaskEvent}->? this is no longer well defined since the state cancelled in reality describes two different situations, i.e. cancelled_from_executing and cancelled_from_flight and depending on what substate we're dealing with, the outcome would be very different. This substate is encoded in the TaskState._previous attribute.
This situation is very similar to the resumed state which, strictly speaking, should break up into resume_to_fetch and resume_to_waiting substates, depending on the ts._next attribute. The implementation around these states may be buggy but conceptually I'm not convinced that we should remove them. The reason why I never broke cancelled/resumed further up was mostly because I thought this to be a feasible compromise to reduce the number of required transition functions. That caused the transition functions we have to be overly complex and buggy.

To my best understanding, this PR does not fundamentally change the control flow of a task but merely chooses a different way to describe it. After all, we still can't cancel a gather_dep or an execute (the later could be cancelled but it would not allow us to fundamentally simplify the problem so I'll ignore any "abort thread" proposals for the sake of the argument)

This PR now proposes to remove the cancelled state which will ultimately require us to encode the information I described above in a different way. Specifically, this PR proposes to transition a task directly back to released and remember the information that a task is still executing by putting a task back to the executing dict, i.e. this PR increases the degeneracy of both the released state and the executing dict.
In different words, on main, the semantic meaning of the executing dict was simple and unique. It included "all tasks in state executing". With this PR, it would encode "all tasks in state executing and some tasks in state released". Similarly, released would no longer be a neutral state but a state that means "neutral or still executing or still fetching".
These small and subtle changes of semantics require us to always check all the conditions to infer what the actual state is. Every time we'll interact with a released task, we'll need to check whether it is in a neutral state, it is still executing, etc.
This PR exhibits this increase of complexity in various places. Nice examples of this are _execute_done_common and _gather_dep_done_common which previously were basically no-ops. The former doesn't even exist on main. Now, we got switch statements. This is how the finally clause of gather_dep started and evolved and we invested an awful amount of time to get rid of that.

I would even go as far as claim that there are bugs because we're not dealing with this degeneracy properly. For instance, let's assume a task was in flight, got cancelled and then asked to be computed again. IIUC, we're nowhere dealing with the fact that the task is still in flight but are transitioning the task straight to executing, i.e. we could have a task simultaneously in flight and in executing. That's an entire class of inconsistency problems that originally was causing the first "wave" of deadlocks.

At first glance, this PR reverts a lot of hard effort in making the state machine more explicit. The verbosity of the current code was intentional to a certain degree. The need for this explicitness was already motivated back in #4413 where this entire refactoring started.


Re: long-running

I don't see a need to change anything about long-running. In fact, I consider merging this with executing quite misleading. Long running tasks are not handled well right now and this change might address some of these artifacts but they are not working well regardless since thread rejoining is not implemented.
I believe it makes sense to distinguish these two states from an instrumentation POV alone.

@crusaderky
Copy link
Collaborator Author

The reduction of TaskStateStates causes the remaining states to gain a higher degree of degeneracy.

I think this is a wild exxageration. It causes three transitions, exactly, to gain a small code branch:

  • transition_waiting_ready (resume executing)
  • transition_generic_fetch (resume flight)
  • transition_released_forgotten (do not forget if currently in GatherDep or Execute)

For instance, on main, TSS flight clearly encodes a task that is currently in the process of being gathered from a remote peer. Further, the schedulers intention and the future flow of this task is clearly determined based on the result of the remote fetch. There is little to no ambiguity in any follow up decision, e.g. "gather dep failed -> Always do X".

This has not changed.

The control flow is unambiguous which leads to relatively easy code with few control branches

Just have a look at _transition_from_resumed. Alone it contains a wealth of subtle issues that are very hard to casually spot (#6693).

The control flow in main is, to say the least, chaotic.
In main, the scheduler can think that a task is executing, but it's actually in flight. When the task completes, the scheduler can receive unexpected termination messages and has to deal with them. Unsurprisingly, it not always does so in the corner cases, as testified by the deadlock #6689.

Also in main, the worker state needs to deal with a GatherDep or Execute finishing while the state is just about anything. This has caused many issues in the past.

the resumed state which, strictly speaking, should break up into resume_to_fetch and resume_to_waiting substates, depending on the ts._next attribute.

This is the theory, and it's well and good. Except that the implementation doesn't do that, and there are points where ts._next can be missing or other weird stuff. I can find them on request. The reason for this general bugginess is that it is just so ridiculously hard to wrap one's head around the cancelled/resumed state. I myself, after spending many weeks refactoring the state machine, did not have a solid grasp on it and only now I can state I fully understand it.

To my best understanding, this PR does not fundamentally change the control flow of a task but merely chooses a different way to describe it.

Not true.
In main, when a resumed task completes (successfully or otherwise) you have several, edge use cases that you don't have when a regular task does.

More philosophically: we have a (somewhat) unescapable problem, which is Execute and GatherDep can't just be cancelled.

In main, the way to cope with it is to enter four special states, cancelled(flight), cancelled(executing), resumed(flight->waiting), resumed(executing->fetch), plus buggy intruders like resumed(executing->missing) just to deal with it, and a wealth of very, very special transitions for when each of these 4 states finishes.

In this PR, we simply say that the Execute and GatherDep asyncio tasks can just stay there, unattended, until we need to do one of three things:

  • transition_waiting_ready (resume executing). Note that this is only a matter of courtesy towards the user code, as in some cases it may break when two identical tasks run at the same time in the same worker.
  • transition_generic_fetch (resume flight). I strongly suspect we could get rid of this.
  • transition_released_forgotten (do not forget if currently in GatherDep or Execute). This is only needed for a few things - namely, preserve TaskState.resource_restrictions, and we could work around it easily by simply carrying the information through the Execute instruction to the ExecuteSuccessEvent, just like total_nbytes is carried from GatherDep to GatherDepDoneEvent. This means that when GatherDepDone and ExecuteDoneEvent are triggered, you could not have the task at all and I don't see a problem with it.

After all, we still can't cancel a gather_dep or an execute (the later could be cancelled but it would not allow us to fundamentally simplify the problem so I'll ignore any "abort thread" proposals for the sake of the argument)

Agreed, such proposals are interesting but complicated to implement and definitely out of scope.

This PR now proposes to remove the cancelled state which will ultimately require us to encode the information I described above in a different way. Specifically, this PR proposes to transition a task directly back to released and remember the information that a task is still executing by putting a task back to the executing dict, i.e. this PR increases the degeneracy of both the released state and the executing dict. In different words, on main, the semantic meaning of the executing dict was simple and unique. It included "all tasks in state executing".

This is false.
In main, the executing set includes

  • state=executing
  • state=cancelled, previous=executing
  • state=resumed, previous=executing, next=fetch
  • plus buggy states such as state=resumed, previous=executing, next=missing

While the long_running set includes

  • state=long-running
  • state=cancelled, previous=long-running
  • state=resumed, previous=long-running, next=fetch
  • state=resumed, previous=long-running, next=missing

and the in_flight_tasks set includes

  • state=flight
  • state=cancelled, previous=flight
  • state=resumed, previous=flight, next=waiting

Also interesting to note that, given the above, the information encoded in the prev and next attributes is redundant and could be fully extrapolated from the inclusion of the task in one of the three sets.
This PR, among other things, removes such redundancy.

Every time we'll interact with a released task, we'll need to check whether it is in a neutral state, it is still executing, etc.

False. There are very, very few places where this happens, because they're the places which would directly interact with the currently running asyncio tasks:

  • transition_waiting_ready
  • transition_generic_fetch (we probably could do without)
  • transition_released_forgotten (we could do without, as discussed above)
  • _handle_compute_task (do not overwrite resource_restrictions; we could clean it up making it work like total_nbytes, as discussed above).

This PR exhibits this increase of complexity in various places. Nice examples of this are _execute_done_common and _gather_dep_done_common which previously were basically no-ops. The former doesn't even exist on main. Now, we got switch statements. This is how the finally clause of gather_dep started and evolved and we invested an awful amount of time to get rid of that.

You're talking about a very different time, where the event handlers where spaghettified together with the actual code running the instructions. I personally do not see any issue, today, in moving business logic away from the _transition_* methods and to the _handle_event ones.

I would even go as far as claim that there are bugs because we're not dealing with this degeneracy properly. For instance, let's assume a task was in flight, got cancelled and then asked to be computed again. IIUC, we're nowhere dealing with the fact that the task is still in flight but are transitioning the task straight to executing

Yes, this is the main feature of this PR.

i.e. we could have a task simultaneously in flight and in executing. That's an entire class of inconsistency problems that originally was causing the first "wave" of deadlocks.

Could you come up with what these inconsistency problems are? Again, you are talking about a time where gather_dep was changing the state machine itself. It can' anymore - only a selection of exit events can and, with this PR, they do so exclusively in two states, flight and released (and we could remove the latter use case, as explained above).

Could you come up with a list of examples of how a PR from a junior contributor could subtly cause this to become a problem, without any of the current tests tripping very explicitly about it? I can't come up with any.

At first glance, this PR reverts a lot of hard effort in making the state machine more explicit.

I disagree. It simply states that an abandoned GatherDep or Execute instruction should be a no-op for as much as possible.

The verbosity of the current code was intentional to a certain degree.

This PR has no intention to reduce verbosity. However, I already spent many many weeks dealing with issues that were specifically hidden in the transitions from cancelled and from resumed, particularly the intersections with other edge cases, e.g. #6685.

I don't see a need to change anything about long-running.

I personally lost count of how many PRs I already wrote trying to fix the long-running state, all of which were caused by a change at some point in transition_executing_* while forgetting to replicate and test the same in _transition_long_running_*, or by adding a test for state == executing instead of state in (executing, long-running).

Long running tasks are not handled well right now

Yes, and the reason is the one above.

this change might address some of these artifacts

This change fixes all the problems of having a double state executing/long-running which must behave in the same way. With this PR, the one and only place where long-running is treated differently from executing is a single line in _ensure_computing, where it counts the tasks already running.

but they are not working well regardless since thread rejoining is not implemented.

This is a completely separate issue and it should be treated as out of scope.
Fixing it will be no more no less difficult in main as it is in this PR.

@crusaderky
Copy link
Collaborator Author

crusaderky commented Aug 8, 2022

...ok, I found an issue that may scupper the whole design.
In the first part of Worker.execute, there are many accesses to ts which require the task to not have changed - crucially, ts.dependencies.

In this events stream:

  • ComputeTaskEvent(key=x, who_has={y: [...], z: [...]})
  • FreeKeysEvent(keys=[x])

the task may not find z by the time it tries to read it from data.
This will never happen in the current PR, as the coroutine will exit with AlreadyCancelledEvent.

However, this is a big problem:
In this events stream:

  • ComputeTaskEvent(key=x, who_has={y: [...], z: [...]})
  • FreeKeysEvent(keys=[x])
  • ComputeTaskEvent(key=x, who_has={y: [...]})

then you are expected to resume the task - but z may not be there anymore, and run_spec most likely changed too.

I already encountered the same problem with resource_restrictions, and I could use the same logic for run_spec. dependencies is a lot more problematic.

I'm unsure if there's a clean way to deal with this which does not make Worker.execute less dumb than it is now (which is a huge feature) and is robust against subtle race conditions which are very hard to reproduce - a bunch of sleep(0) in the unit tests are needed to reproduce them.
I'll give it some further brainstorming and, if I fail, fall back to #6699.

@crusaderky
Copy link
Collaborator Author

crusaderky commented Aug 10, 2022

I fixed the race condition I described in my previous post.
The fix is... fancy, and testing it requires some finesse. At first sight, this detracts from the key point of this PR, which to make the code simpler and intrinsically more robust.

However, while I was dealing with this I encountered, in main, three other very subtle issues in the very same Worker.execute preamble, which require just as much finesse in writing proper test cases for:

As discussed during standup, I will now pause this PR, work on the issues above, write the necessary complicated tests for them, and come back here.

@fjetter
Copy link
Member

fjetter commented Aug 11, 2022

#6869 is an example why I am concerned about letting multiple coroutines for the same task running, i.e. gather_dep and execute.

This can cause overlap and our event handlers need to be rock solid to make sure arbitrary overlaps are handled properly which I am currently not sufficiently confident about.

Again, I'm advocating for dropping this PR in favor of #6699 for the time being

@crusaderky
Copy link
Collaborator Author

We've got another issue automatically fixed by this PR: #6877.

@crusaderky
Copy link
Collaborator Author

#6869 is an example why I am concerned about letting multiple coroutines for the same task running, i.e. gather_dep and execute.

I can't understand how this PR would make #6869 any worse.

@crusaderky crusaderky force-pushed the no_cancelled branch 2 times, most recently from 10e9962 to 6ae138a Compare August 12, 2022 08:35
@crusaderky
Copy link
Collaborator Author

I've removed the done attribute 🤘

@crusaderky
Copy link
Collaborator Author

crusaderky commented Aug 15, 2022

I've found a roadblock that I didn't consider before.
When you transition from executing to released, all dependencies are immediately released as well (unless there are other dependants). However, they will remain in memory because they are referenced by the asyncio task running the user code. If, in the meantime, another task (or, somewhat less likely, the same task) lands with the same dependencies, then the dependencies will be fetched again over the network, leading to data duplication.

We could mitigate this through the weakref cache in SpillBuffer.__set__. Preventing the data transfer completely would be a lot more complicated and necessarily involve the worker state.
Even more complicated, the client may submit a new task with the same key but different dependencies while the previous execution is still running. I'm unsure how to behave there. (in main, te new run_spec and dependencies are completely disregarded).

I give up. I'll revert to #6699.
I still want to go on with the removal of the 'long-running' state though and I'd like to open a PR just for it.
@fjetter you were against that too - can we have a high-bandwidth chat about it?

@fjetter
Copy link
Member

fjetter commented Aug 15, 2022

I still want to go on with the removal of the 'long-running' state though and I'd like to open a PR just for it.
@fjetter you were against that too - can we have a high-bandwidth chat about it?

Sure, we can have a conversation about this. Here the gist of it up front (no need to reply if we find some time to briefly talk about it)

TLDR I don't mind reusing the implementation and am open to making the transition methods to deal with both long-running and executing at the same time to avoid code duplication. We're already dealing with a similar situation when dealing with the ready/constrained states and I believe we can handle it similarly.
The primary reason why I prefer keeping it is instrumentation since I think the difference between executing and long-running is significantly different to warrant a distinction on monitoring dashboards (bokeh and prometheus).
There is a secondary technical concern outlined in the description of #6607 where I argue that we should remove the rejoin/secede functionality and in-fact drop our custom threadpool entirely. If we were to go down that route, the distinction between long-running and executing would be helpful (not necessary).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment