Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make root-ish definition static #7531

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 47 additions & 20 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,28 @@ def _to_dict_no_nest(self, *, exclude: Container[str] = ()) -> dict[str, Any]:
"""
return recursive_to_dict(self, exclude=exclude, members=True)

@property
def rootish(self):
"""
Whether this ``TaskGroup`` is root or root-like.

Root-ish tasks are part of a group that's typically considered to be at
the root or near the root of the graph and we expect it to be
responsible for the majority of data production.

Similar fan-out like patterns can also be found in intermediate graph
layers.

Most scheduler heuristics should be using
`Scheduler.is_rootish_no_restrictions` if they need to guarantee that a
task doesn't have any restrictions and can be run anywhere
"""
return (
len(self) >= 5
and len(self.dependencies) < 5
and sum(map(len, self.dependencies)) < 5
)


class TaskState:
"""A simple object holding information about a task.
Expand Down Expand Up @@ -2039,6 +2061,7 @@ def decide_worker_rootish_queuing_disabled(
"""
if self.validate:
# See root-ish-ness note below in `decide_worker_rootish_queuing_enabled`
assert self._is_rootish_no_restrictions(ts)
assert math.isinf(self.WORKER_SATURATION)

pool = self.idle.values() if self.idle else self.running
Expand All @@ -2052,6 +2075,7 @@ def decide_worker_rootish_queuing_disabled(
and tg.last_worker_tasks_left
and lws.status == Status.running
and self.workers.get(lws.address) is lws
and len(tg) > self.total_nthreads * 2
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting this statement here makes it also much easier to motivate it.

Below, we're calculating the number of tasks of the given task group we can assign safely to a worker (len(tg) / self.total_nthreads) * ws.nthreads

Therefore, last_worker_tasks_left > 2 * ws.nthreads ~ tasks_assigned / thread > 2

This means, we're only scheduling using co-assignment if we can at least assign two tasks per thread. Otherwise, we abort to protect our ability to scale out and use the ordinary worker_objective to distribute tasks

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether or not this "at least two tasks per thread" logic is a good choice is an entirely different matter, of course.
However, this gives some meaning to this otherwise arbitrary choice

):
ws = lws
else:
Expand All @@ -2074,7 +2098,16 @@ def decide_worker_rootish_queuing_disabled(

return ws

def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None:
def worker_objective_rootish_queuing(self, ws, ts):
# FIXME: This is basically the ordinary worker_objective but with task
# counts instead of occupancy.
comm_bytes = sum(
dts.get_nbytes() for dts in ts.dependencies if ws not in dts.who_has
)
# See test_nbytes_determines_worker
return (len(ws.processing) / ws.nthreads, comm_bytes, ws.nbytes)
Comment on lines +2102 to +2109
Copy link
Member Author

@fjetter fjetter Feb 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gjoseph92 I remember you very strongly opposing adding an objective function that respect comm_bytes. I remember vaguely a point about widely shared dependencies but I do not see an issue with this.

We need an objective like this to ensure very basic scheduling policies to be respected, e.g. test_nbytes_determines_worker where a dependent task is supposed to be scheduled on a worker holding most of its dependencies.

This should only truly matter if there are multiple workers available with an equal ratio of processing tasks per thread. My intuition tells me this objective function should still make sense

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xref #7280

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_nbytes_determines_worker where a dependent task is supposed to be scheduled on a worker holding most of its dependencies

tl;dr then that task shouldn't have been queued in the first place, and the root-ish heuristic is at fault, not the objective function. For a task to be queued, it must be equally good to run on any worker.

I do oppose an objective function that considers data transfer for root-ish tasks. It contradicts the definition of root-ish-ness: that the location of the tasks' dependencies are not important for scheduling purposes.

When there's only one thread open in the cluster, on a random worker, the next task on the queue must be equally "good" to run on that random worker as on any other worker in the cluster. If it would have been "better" if the thread had opened on a different particular worker—because that worker had more dependencies in memory, say, or because it had some resource—then the task should not have been on the queue in the first place. (This is why putting tasks with restrictions on the queue breaks the core assumption of queuing: #7526.)

This is important because we choose the pool of candidate workers for queued tasks based only on how busy they are. That works because we can guarantee there are no other factors that should play into worker selection for these tasks.

The root-ish-ness heuristic is meant to provide that guarantee, by identifying task groups where:

  1. The tasks will end up being assigned to every worker in the cluster (len > nthreads)
  2. Therefore, any common dependencies the tasks have will end up being replicated onto every worker in the cluster (deps len < 5)

The key is the second point: because the dependencies will be replicated to all workers, we don't need to select workers that avoid data transfer—in the long term, the data transfer is unavoidable. Indeed, we even know of some cases (that don't meet the root-ish heuristic, to be clear) where trying to avoid a transfer leads to worse scheduling choices: #6570.

So the reason I oppose an objective function that considers data transfer for root-ish tasks is because it implies that one worker might be preferable over another based on something besides busyness, which violates these core assumptions.

But to be clear: I only oppose this on the grounds of logical consistency. I don't expect an objective function that considers transfer will actually make a measurable different in most cases. (In most cases, the size of the candidate pool is 1 anyway.) But since it contradicts this key assumption of queuing, I'd want to come up with logical proof of why it's safe—and why it's preferable—especially since we know this type of weighting leads to bad decisions in other cases (#6570).

I haven't bothered to put in the effort to make the proof, since I haven't seen what the gain would be. I'm happy to be shown that it's both safe and better, though—again, I just haven't seen a reason to.

See also:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, I think this is a moot point once I introduce some cutoff as suggested in the other comment. It would've been fine to just point out this error instead of writing such a long comment.

It contradicts the definition of root-ish-ness: that the location of the tasks' dependencies are not important for scheduling purposes.

I disagree with this definition. The reason why we're having a special case for these tasks is because we assume they are data producers. The natural assumption is that they are at the bottom of our graph and are the most frequent source of data overproduction.
While I see some value for intermediate layers to be considered as root-ish I believe the same argument holds.
I don't think these perspectives are orthogonal but your definition is not the whole truth.

If something like #6570 is truly the complexity driver here we should consider alternative approaches (inlining, better weights, scheduler hints, etc.)

If it would have been "better" if the thread had opened on a different particular worker—because that worker had more dependencies in memory, say, or because it had some resource—then the task should not have been on the queue in the first place.

I also don't agree with this sentiment. I see a point for suppressing/ignoring dependencies but especially for cases where worker-saturation>1 I see how nuance can matter. Total randomness is not necessarily desired.

Copy link
Collaborator

@gjoseph92 gjoseph92 Feb 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the long comment. We've talked about this a few times so I wanted to have it all written out.

The reason why we're having a special case for these tasks is because we assume they are data producers

Yes, sorry, you're right. Root-ish-ness doesn't inherently care that the tasks can be scheduled without regard for their dependencies. It's just queuing that currently assumes this property.

It's unfortunate that these things are so intertwined right now. What began as a heuristic for co-assignment is now used as a heuristic for the current implementation of queuing. And it was always a weird heuristic.

The better way to phrase it might be that "whatever we use to decide whether to queue a task should guarantee the assumptions that queuing makes". Currently we've been using root-ish-ness, which does guarantee those assumptions. But there are probably other ways we could do this. And/or we can change the implementation of queuing to have different assumptions. We just shouldn't change only one without the other.

EDIT: switching to len(tg) > 5 does change the guarantees about what goes into the queue, but I think using worker_objective_rootish_queuing would then adjust what queuing is expecting in a compatible way; see other comment.


def decide_worker_rootish_queuing_enabled(self, ts) -> WorkerState | None:
"""Pick a worker for a runnable root-ish task, if not all are busy.

Picks the least-busy worker out of the ``idle`` workers (idle workers have fewer
Expand Down Expand Up @@ -2114,7 +2147,7 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None:
# NOTE: this will lead to worst-case scheduling with regards to co-assignment.
ws = min(
self.idle_task_count,
key=lambda ws: len(ws.processing) / ws.nthreads,
key=partial(self.worker_objective_rootish_queuing, ts=ts),
)
if self.validate:
assert not _worker_full(ws, self.WORKER_SATURATION), (
Expand Down Expand Up @@ -2206,7 +2239,7 @@ def transition_waiting_processing(self, key: str, stimulus_id: str) -> RecsMsgs:
"""
ts = self.tasks[key]

if self.is_rootish(ts):
if self._is_rootish_no_restrictions(ts):
# NOTE: having two root-ish methods is temporary. When the feature flag is
# removed, there should only be one, which combines co-assignment and
# queuing. Eventually, special-casing root tasks might be removed entirely,
Expand All @@ -2215,7 +2248,7 @@ def transition_waiting_processing(self, key: str, stimulus_id: str) -> RecsMsgs:
if not (ws := self.decide_worker_rootish_queuing_disabled(ts)):
return {ts.key: "no-worker"}, {}, {}
else:
if not (ws := self.decide_worker_rootish_queuing_enabled()):
if not (ws := self.decide_worker_rootish_queuing_enabled(ts)):
return {ts.key: "queued"}, {}, {}
else:
if not (ws := self.decide_worker_non_rootish(ts)):
Expand Down Expand Up @@ -2688,7 +2721,7 @@ def transition_queued_processing(self, key: str, stimulus_id: str) -> RecsMsgs:
assert not ts.actor, f"Actors can't be queued: {ts}"
assert ts in self.queued

if ws := self.decide_worker_rootish_queuing_enabled():
if ws := self.decide_worker_rootish_queuing_enabled(ts):
self.queued.discard(ts)
worker_msgs = self._add_to_processing(ts, ws)
# If no worker, task just stays `queued`
Expand Down Expand Up @@ -2812,22 +2845,16 @@ def story(self, *keys_or_tasks_or_stimuli: str | TaskState) -> list[Transition]:
# Assigning Tasks to Workers #
##############################

def is_rootish(self, ts: TaskState) -> bool:
"""
Whether ``ts`` is a root or root-like task.

Root-ish tasks are part of a group that's much larger than the cluster,
and have few or no dependencies.
"""
if ts.resource_restrictions or ts.worker_restrictions or ts.host_restrictions:
def _is_rootish_no_restrictions(self, ts: TaskState) -> bool:
"""See also ``TaskGroup.rootish``"""
if (
ts.resource_restrictions
or ts.worker_restrictions
or ts.host_restrictions
or ts.actor
):
return False
tg = ts.group
# TODO short-circuit to True if `not ts.dependencies`?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this all works out well, I would actually like to raise this method to a TaskGroup property instead. We can implement a similar short-circuit there although I'm not sure if this is actually worth it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the short-circuit still makes sense with what you have here. With len(tg) > 5, a task with no dependencies would be non-rootish if it's in a group of 1.

This would let us close

return (
len(tg) > self.total_nthreads * 2
and len(tg.dependencies) < 5
and sum(map(len, tg.dependencies)) < 5
)
return ts.group.rootish

def check_idle_saturated(self, ws: WorkerState, occ: float = -1.0) -> None:
"""Update the status of the idle and saturated state
Expand Down
19 changes: 12 additions & 7 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1331,13 +1331,13 @@ async def test_get_nbytes(c, s, a, b):
assert s.get_nbytes(summary=False) == {x.key: sizeof(1), y.key: sizeof(2)}


@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
@gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True)
@gen_cluster([("", 1), ("", 2)], client=True)
async def test_nbytes_determines_worker(c, s, a, b):
x = c.submit(identity, 1, workers=[a.ip])
y = c.submit(identity, tuple(range(100)), workers=[b.ip])
x = c.submit(identity, 1, workers=[a.address], key="x")
y = c.submit(identity, tuple(range(100)), workers=[b.address], key="y")
await c.gather([x, y])

assert x.key in list(a.data.keys())
assert y.key in list(b.data.keys())
z = c.submit(lambda x, y: None, x, y)
await z
assert s.tasks[z.key].who_has == {s.workers[b.address]}
Expand Down Expand Up @@ -4013,7 +4013,12 @@ async def test_scatter_compute_store_lose(c, s, a, b):
await asyncio.sleep(0.01)


@gen_cluster(client=True)
# FIXME there is a subtle race condition depending on how fast a worker is being
# closed. If is is closed very quickly, the transitions are never issuing a
# cancelled-key report to the client and we're stuck in the x.status loop. This
# is mor likely to happen if tasks are queued since y never makes it to the
# threadpool, delaying its shutdown
@gen_cluster(client=True, config={"distributed.scheduler.worker-saturation": "inf"})
async def test_scatter_compute_store_lose_processing(c, s, a, b):
"""
Create irreplaceable data on one machine,
Expand All @@ -4030,7 +4035,7 @@ async def test_scatter_compute_store_lose_processing(c, s, a, b):
await a.close()

while x.status == "finished":
await asyncio.sleep(0.01)
await asyncio.sleep(0.5)

assert y.status == "cancelled"
assert z.status == "cancelled"
Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/test_client_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,12 @@ def test_retries(client):
future = e.submit(varying(args))
assert future.result() == 42

with client.get_executor(retries=4) as e:
with client.get_executor(retries=3) as e:
future = e.submit(varying(args))
result = future.result()
assert result == 42

with client.get_executor(retries=2) as e:
with client.get_executor(retries=1) as e:
future = e.submit(varying(args))
with pytest.raises(ZeroDivisionError, match="two"):
res = future.result()
Expand Down
33 changes: 21 additions & 12 deletions distributed/tests/test_priorities.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,17 @@ async def block_worker(
await asyncio.sleep(0.01)

if pause:
assert len(s.unrunnable) == ntasks_on_worker
assert len(s.unrunnable) + len(s.queued) == ntasks_on_worker
assert not w.state.tasks
w.status = Status.running
else:
while len(w.state.tasks) < ntasks_on_worker:
await asyncio.sleep(0.01)
# TODO: What can we assert / wait for when tasks are being queued?
# This "queue on worker" case is likely just not possible.
# Possibly, this file should be extended with non-rootish cases to
# assert this logic instead

# while len(w.state.tasks) < ntasks_on_worker:
# await asyncio.sleep(0.01)
await ev.set()
await clog
del clog
Expand All @@ -96,7 +101,11 @@ def gen_blockable_cluster(test_func):
gen_cluster(
client=True,
nthreads=[("", 1)],
config={"distributed.worker.memory.pause": False},
config={
"distributed.worker.memory.pause": False,
# A lot of this test logic otherwise won't add up
"distributed.scheduler.worker-saturation": 1.0,
},
)(test_func)
)

Expand All @@ -106,12 +115,12 @@ async def test_submit(c, s, a, pause):
async with block_worker(c, s, a, pause):
low = c.submit(inc, 1, key="low", priority=-1)
ev = Event()
clog = c.submit(lambda ev: ev.wait(), ev, key="clog")
clog = c.submit(lambda ev: ev.set(), ev=ev, key="clog")
high = c.submit(inc, 2, key="high", priority=1)

await wait(high)
assert all(ws.processing for ws in s.workers.values())
assert s.tasks[low.key].state == "processing"
# assert all(ws.processing for ws in s.workers.values())
# assert s.tasks[low.key].state in ("processing", "queued")
await ev.set()
await wait(low)

Expand All @@ -126,7 +135,7 @@ async def test_map(c, s, a, pause):

await wait(high)
assert all(ws.processing for ws in s.workers.values())
assert all(s.tasks[fut.key].state == "processing" for fut in low)
assert all(s.tasks[fut.key].state in ("processing", "queued") for fut in low)
await ev.set()
await clog
await wait(low)
Expand All @@ -142,7 +151,7 @@ async def test_compute(c, s, a, pause):

await wait(high)
assert all(ws.processing for ws in s.workers.values())
assert s.tasks[low.key].state == "processing"
assert s.tasks[low.key].state in ("processing", "queued")
await ev.set()
await clog
await wait(low)
Expand All @@ -158,7 +167,7 @@ async def test_persist(c, s, a, pause):

await wait(high)
assert all(ws.processing for ws in s.workers.values())
assert s.tasks[low.key].state == "processing"
assert s.tasks[low.key].state in ("processing", "queued")
await ev.set()
await wait(clog)
await wait(low)
Expand All @@ -177,7 +186,7 @@ async def test_annotate_compute(c, s, a, pause):
low, clog, high = c.compute([low, clog, high], optimize_graph=False)

await wait(high)
assert s.tasks[low.key].state == "processing"
assert s.tasks[low.key].state in ("processing", "queued")
await ev.set()
await wait(clog)
await wait(low)
Expand All @@ -196,7 +205,7 @@ async def test_annotate_persist(c, s, a, pause):
low, clog, high = c.persist([low, clog, high], optimize_graph=False)

await wait(high)
assert s.tasks[low.key].state == "processing"
assert s.tasks[low.key].state in ("processing", "queued")
await ev.set()
await wait(clog)
await wait(low)
Expand Down
Loading