Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] queued<->no-worker transitions #7267

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

gjoseph92
Copy link
Collaborator

Yet another approach to #7259. See #7259 (comment).

  • Adds an overall decide_worker_or_next_state function, which picks the appropriate decide_worker function for the task (based on root-ish-ness and worker-saturation setting) and either returns a worker, or the state the task should transition to if no worker is available
  • Adds transitions between the queued and no-worker states. These can only occur when we try to schedule a task in one of those states, no workers are available, and the root-ish-ness status has flipped.
  • Tests added / passed
  • Passes pre-commit run --all-files

@gjoseph92
Copy link
Collaborator Author

gjoseph92 commented Nov 8, 2022

AFAICT, queued->no_worker is an impossible transition.

We'd need the TaskGroup or cluster size to change so a task no longer looks root-ish (easy). Then, we'd need an event to occur which tries to schedule a task off Scheduler.queued, but finds that there are no running workers.

(If there are running workers, then the task will just scheduled immediately thanks to decide_worker_non_rootish. I want to test the task getting moved from the queued set to the unrunnable set.)

I can't think of an event which would try to schedule tasks off the queue when there were no running workers.

All selection of queued tasks happens through _next_queued_tasks_for_worker (just added in #7224). That function returns immediately if the worker isn't running:

def _next_queued_tasks_for_worker(
state: SchedulerState, ws: WorkerState
) -> Iterator[TaskState]:
"""Queued tasks to run, in priority order, on all open slots on a worker"""
if not state.queued or ws.status != Status.running:
return

The events that cause a task to be scheduled off the queue are:

  1. A task completes
  2. A task secedes
  3. A worker joins
  4. A worker's status changes to running

But again, in all these cases, we only pick from the queue if the worker is running. And if at least one worker is running (and the task has no restrictions), it means _decide_worker_non_rootish will not return None, so the task will not be sent to no-worker.

@github-actions
Copy link
Contributor

github-actions bot commented Nov 8, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±  0         15 suites  ±0   6h 29m 46s ⏱️ + 21m 38s
  3 172 tests +  3    3 087 ✔️ +  3    83 💤  - 1  2 +1 
23 472 runs  +24  22 567 ✔️ +24  903 💤  - 1  2 +1 

For more details on these failures, see this check.

Results for commit 04201ff. ± Comparison against base commit 69a5709.

@gjoseph92
Copy link
Collaborator Author

This, plus test_queued_oversaturates_after_group_shrinks, points out the flaws of this approach.

Here, when root-ish-ness of a task changes, we're not eagerly moving tasks between the unrunnable and queued sets. All this PR does is allow us to use a different decide_worker function on the task when the task is chosen to be scheduled. However, when we choose to schedule the task is highly dependent on whether it's in unrunnable vs queued.

Specifically, if there are tasks in queued, and then they become non-root-ish, they'll still be in queued. And we only pop tasks out of queued when another task completes.

That means the tasks will still mostly be scheduled as though they're queued root-ish tasks. They won't all be submitted to workers up front, like non-rootish tasks should. The only difference will be that when another task completes and exactly one gets picked from the queue, it will always be scheduled, whereas a "proper" queued task might stay on the queue if there was a downstream task to run.

This effectively adds two more implicit scheduling cases: "queued but not root-ish", and "in unrunnable but root-ish".

I'm currently leaning away from this approach and think caching root-ish-ness #7262 is still the most consistent and maintainable option. Plus, it's the closest to what we want the end state to be (where root-ish-ness is a static property).

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm currently leaning away from this approach and think caching root-ish-ness

We discussed that caching is out of the question. It would create conditions that are incredibly difficult to debug.


Only one of the asserts that were added here fail on master. If this is the only failure case we're concerned about, I suggest to drop the issue entirely. Queuing a little bit too much is a conservative error we can afford in a rare situation.
What I understood from the initial problem report is that there are potential deadlocks. If that's true, I'd like them to be reproduced first before we discuss solutions. All of these tests run fine for me on main

Comment on lines +563 to +566
fs = [c.submit(inc, root, key=f"inc-{i}") for i in range(s.total_nthreads * 2 + 1)]
# ^ `c.submit` in a for-loop so the first tasks don't look root-ish (`TaskGroup` too
# small), then the last one does. So N-1 tasks will go to `no-worker`, and the last
# to `queued`. `is_rootish` is just messed up like that.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should verify the assumption of when the tasks are classified as root-ish. This is otherwise incredibly opaque and brittle

Comment on lines +578 to +616
@gen_cluster(
client=True,
nthreads=[("", 1)],
config={"distributed.scheduler.work-stealing": False},
)
async def test_queued_rootish_changes_scale_up(c, s, a):
"Tasks are initially root-ish. After cluster scales, they aren't."

root = c.submit(inc, 1, key="root")

event = Event()
clog = c.submit(event.wait, key="clog")
await wait_for_state(clog.key, "processing", s)

fs = c.map(inc, [root] * 5, key=[f"inc-{i}" for i in range(5)])

await async_wait_for(lambda: len(s.tasks) > len(fs), 5)

if not s.is_rootish(s.tasks[fs[0].key]):
pytest.fail(
"Test assumptions have changed; task is not root-ish. Test may no longer be relevant."
)
if math.isfinite(s.WORKER_SATURATION):
assert s.queued

async with AsyncExitStack() as stack:
for _ in range(3):
await stack.enter_async_context(Worker(s.address, nthreads=2))

if s.is_rootish(s.tasks[fs[0].key]):
pytest.fail(
"Test assumptions have changed; task is still root-ish. Test may no longer be relevant."
)

await event.set()
await clog

# Just verify it doesn't deadlock
await c.gather(fs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test works fine on main

# KEY ASSERTION:
# the next task on the queue got scheduled, exceeding worker-saturation, because
# even though it was in `queued`, it was no longer root-ish.
assert len(s.workers[a.address].processing) == a.state.nthreads + 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of all the new tests, this assert is the only one that fails on main

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion states that a task that is no longer classified as root-ish is still queued even though.
Is this the only error case we are concerned about? If so, I suggest to just drop the issue. If there is something else, the tests should reproduce the issue first.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants