-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve work stealing for scaling situations #4920
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,9 +2,11 @@ | |
|
||
import asyncio | ||
import logging | ||
import random | ||
import uuid | ||
from collections import defaultdict, deque | ||
from collections.abc import Container | ||
from functools import partial | ||
from math import log2 | ||
from time import time | ||
|
||
|
@@ -190,27 +192,24 @@ def steal_time_ratio(self, ts): | |
For example a result of zero implies a task without dependencies. | ||
level: The location within a stealable list to place this value | ||
""" | ||
if not ts.dependencies: # no dependencies fast path | ||
return 0, 0 | ||
|
||
split = ts.prefix.name | ||
if split in fast_tasks: | ||
return None, None | ||
|
||
if not ts.dependencies: # no dependencies fast path | ||
return 0, 0 | ||
|
||
ws = ts.processing_on | ||
compute_time = ws.processing[ts] | ||
if compute_time < 0.005: # 5ms, just give up | ||
return None, None | ||
|
||
nbytes = ts.get_nbytes_deps() | ||
transfer_time = nbytes / self.scheduler.bandwidth + LATENCY | ||
cost_multiplier = transfer_time / compute_time | ||
if cost_multiplier > 100: | ||
return None, None | ||
|
||
level = int(round(log2(cost_multiplier) + 6)) | ||
if level < 1: | ||
level = 1 | ||
level = min(len(self.cost_multipliers) - 1, level) | ||
|
||
return cost_multiplier, level | ||
|
||
|
@@ -361,6 +360,7 @@ def maybe_move_task(level, ts, sat, idl, duration, cost_multiplier): | |
if occ_idl + cost_multiplier * duration <= occ_sat - duration / 2: | ||
self.move_task_request(ts, sat, idl) | ||
log.append( | ||
# The format of this message is tightly coupled to the dashboard | ||
( | ||
start, | ||
level, | ||
|
@@ -417,7 +417,10 @@ def maybe_move_task(level, ts, sat, idl, duration, cost_multiplier): | |
thieves = _potential_thieves_for(ts, idle) | ||
if not thieves: | ||
break | ||
thief = thieves[i % len(thieves)] | ||
|
||
thief = self._maybe_pick_thief(ts, thieves) | ||
if not thief: | ||
thief = thieves[i % len(thieves)] | ||
|
||
duration = sat.processing.get(ts) | ||
if duration is None: | ||
|
@@ -450,7 +453,10 @@ def maybe_move_task(level, ts, sat, idl, duration, cost_multiplier): | |
thieves = _potential_thieves_for(ts, idle) | ||
if not thieves: | ||
continue | ||
thief = thieves[i % len(thieves)] | ||
thief = self._maybe_pick_thief(ts, thieves) | ||
if not thief: | ||
thief = thieves[i % len(thieves)] | ||
|
||
duration = sat.processing[ts] | ||
|
||
maybe_move_task( | ||
|
@@ -464,6 +470,32 @@ def maybe_move_task(level, ts, sat, idl, duration, cost_multiplier): | |
if s.digests: | ||
s.digests["steal-duration"].add(stop - start) | ||
|
||
def _maybe_pick_thief(self, ts, thieves): | ||
"""Try to be smart about picking a thief given some options. We're | ||
trying to pick a thief which has dependencies for a given task, if | ||
possible and will pick the one which works best for us given the | ||
Scheduler.worker_objective | ||
|
||
If no idle worker with dependencies is found, this returns None. | ||
""" | ||
if ts._dependencies: | ||
who_has = set() | ||
for dep in ts._dependencies: | ||
who_has.update(dep.who_has) | ||
|
||
thieves_with_data = list(who_has & set(thieves)) | ||
|
||
# If there are potential thieves with dependencies we | ||
# should prefer them and pick the one which works best. | ||
# Otherwise just random/round robin | ||
if thieves_with_data: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If, say, just 1 idle worker holds dependencies, is it possible for that worker to get slammed with new tasks because of this? What has to happen between picking a worker as a thief and it getting removed from the idle set? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, this may hurt, particularly for sub-topologies like flowchart BT
A --> B1
A --> B2
A --> B3
where A is very small and B* is very compute heavy, i.e. steal_ratio will always be small and tasks are even allowed to be stolen from "public", i.e. non-saturated workers. I believe for other cases, i.e. if tasks are just stolen from saturated workers, this should be OK since decide_worker is much smarter with initial task placement.
Steal request must be confirmed which updates occupancies and recalculates the idle set. Somewhere on this PR a comment of mine is suggesting that this |
||
if len(thieves_with_data) > 10: | ||
thieves_with_data = random.choices(thieves_with_data, k=10) | ||
return min( | ||
thieves_with_data, | ||
key=partial(self.scheduler.worker_objective, ts), | ||
) | ||
|
||
def restart(self, scheduler): | ||
for stealable in self.stealable.values(): | ||
for s in stealable: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: might be easier to read if this was moved into
_maybe_pick_thief
(and it was no longermaybe
).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, I figured this is nicer because it felt weird passing
i
to_maybe_pick_thief
.