From 7f454fd34c83d01e276601ffef9bcd4cc2354544 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 9 Jun 2021 12:20:32 -0600 Subject: [PATCH 01/23] WIP co-assign related root-ish tasks --- distributed/scheduler.py | 42 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index a8571e8b627..db6a9a68281 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -950,6 +950,8 @@ class TaskGroup: _start: double _stop: double _all_durations: object + _last_worker: WorkerState + _last_worker_tasks_left: int # TODO Py_ssize_t? def __init__(self, name: str): self._name = name @@ -964,6 +966,8 @@ def __init__(self, name: str): self._start = 0.0 self._stop = 0.0 self._all_durations = defaultdict(float) + self._last_worker = None + self._last_worker_tasks_left = 0 @property def name(self): @@ -1009,6 +1013,18 @@ def start(self): def stop(self): return self._stop + @property + def last_worker(self): + return self._last_worker + + @property + def last_worker_tasks_left(self): + return self._last_worker_tasks_left + + @last_worker_tasks_left.setter + def last_worker_tasks_left(self, n: int): + self._last_worker_tasks_left = n + @ccall def add(self, o): ts: TaskState = o @@ -7517,7 +7533,33 @@ def decide_worker( for ws in candidates: break else: + group: TaskGroup = ts._group + ws = group._last_worker + + total_nthreads = sum( + wws._nthreads for wws in candidates + ) # TODO get `self._total_threads` from scheduler? Though that doesn't account for worker restrictions. + + group_tasks_per_worker = len(group) / total_nthreads + + # Try to schedule sibling root-like tasks on the same workers, so subsequent reduction tasks + # don't require data transfer. Assumes `decide_worker` is being called in priority order. + if ( + ws is not None # there is a previous worker + and group._last_worker_tasks_left > 0 # previous worker not fully assigned + and ts._dependents # task has dependents + and group_tasks_per_worker > 1 # group is larger than cluster + and ( # is a root-like task (task group depends on very few tasks) + sum(map(len, group._dependencies)) < 5 # TODO what number + ) + ): + group._last_worker_tasks_left -= 1 + return ws + ws = min(candidates, key=objective) + group._last_worker = ws + group._last_worker_tasks_left = math.floor(group_tasks_per_worker) + return ws From 8e8f7f1d6e4ee590608adf3d406abdf9ca9a76f1 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 9 Jun 2021 14:05:23 -0600 Subject: [PATCH 02/23] Handle fastpath decide_worker case --- distributed/scheduler.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index db6a9a68281..4d950d23478 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2353,7 +2353,11 @@ def decide_worker(self, ts: TaskState) -> WorkerState: ts.state = "no-worker" return ws - if ts._dependencies or valid_workers is not None: + if ( + ts._dependencies + or valid_workers is not None + or ts._group._last_worker is not None + ): ws = decide_worker( ts, self._workers_dv.values(), @@ -2361,6 +2365,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState: partial(self.worker_objective, ts), ) else: + # Fastpath when there are no related tasks or restrictions worker_pool = self._idle or self._workers worker_pool_dv = cast(dict, worker_pool) wp_vals = worker_pool.values() @@ -2382,6 +2387,12 @@ def decide_worker(self, ts: TaskState) -> WorkerState: else: # dumb but fast in large case ws = wp_vals[self._n_tasks % n_workers] + # TODO repeated logic from `decide_worker` + ts._group._last_worker = ws + ts._group._last_worker_tasks_left = math.floor( + len(ts._group) / self._total_nthreads + ) + if self._validate: assert ws is None or isinstance(ws, WorkerState), ( type(ws), From b12d4900704b4099fb047e7d56d35d70d5abba64 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 9 Jun 2021 14:33:25 -0600 Subject: [PATCH 03/23] matt's occupancy-based method Unfortunately `ts._prefix._duration_average` == -1 for all the root tasks we care about, so this won't work. --- distributed/scheduler.py | 46 ++++++++++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 4d950d23478..40b50fd203b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2388,10 +2388,11 @@ def decide_worker(self, ts: TaskState) -> WorkerState: ws = wp_vals[self._n_tasks % n_workers] # TODO repeated logic from `decide_worker` + print("fastpath") ts._group._last_worker = ws - ts._group._last_worker_tasks_left = math.floor( - len(ts._group) / self._total_nthreads - ) + # ts._group._last_worker_tasks_left = math.floor( + # len(ts._group) / self._total_nthreads + # ) if self._validate: assert ws is None or isinstance(ws, WorkerState), ( @@ -7545,7 +7546,9 @@ def decide_worker( break else: group: TaskGroup = ts._group - ws = group._last_worker + old: WorkerState = group._last_worker + + ws = min(candidates, key=objective) total_nthreads = sum( wws._nthreads for wws in candidates @@ -7556,20 +7559,41 @@ def decide_worker( # Try to schedule sibling root-like tasks on the same workers, so subsequent reduction tasks # don't require data transfer. Assumes `decide_worker` is being called in priority order. if ( - ws is not None # there is a previous worker - and group._last_worker_tasks_left > 0 # previous worker not fully assigned + old is not None # there is a previous worker and ts._dependents # task has dependents and group_tasks_per_worker > 1 # group is larger than cluster + # and group._last_worker_tasks_left > 0 # previous worker not fully assigned + and ts._prefix._duration_average + old.occupancy < ws.occupancy + math.ceil(group_tasks_per_worker) * ts._prefix._duration_average and ( # is a root-like task (task group depends on very few tasks) sum(map(len, group._dependencies)) < 5 # TODO what number ) ): - group._last_worker_tasks_left -= 1 - return ws - - ws = min(candidates, key=objective) + # group._last_worker_tasks_left -= 1 + print("using last worker") + return old + + print( + f"{ts.prefix_key}", + f"{ts._prefix._duration_average=}", + f"{old.occupancy=}" if old else None, + f"{ws.occupancy=}", + f"{group_tasks_per_worker=}", + ) + # print( + # f"{old is not None=}", + # f"{ts._dependents=}", + # f"{group_tasks_per_worker > 1=}", + # f"{ts._prefix._duration_average + old.occupancy}", + # f"{ws.occupancy + math.ceil(group_tasks_per_worker) * ts._prefix._duration_average}", + # f"{sum(map(len, group._dependencies))=}", + # f"{group._dependencies=}", + # f"{group=}", + # f"{ts=}", + # ) + + # ws = min(candidates, key=objective) group._last_worker = ws - group._last_worker_tasks_left = math.floor(group_tasks_per_worker) + # group._last_worker_tasks_left = math.floor(group_tasks_per_worker) return ws From 064be2ebc0f7a0af59768adba65603e0b048b3ff Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 9 Jun 2021 14:33:46 -0600 Subject: [PATCH 04/23] Revert "matt's occupancy-based method" This reverts commit e348a7cec5afa73f6a0ca01e53cd3f0fce53052f. --- distributed/scheduler.py | 46 ++++++++++------------------------------ 1 file changed, 11 insertions(+), 35 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 40b50fd203b..4d950d23478 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2388,11 +2388,10 @@ def decide_worker(self, ts: TaskState) -> WorkerState: ws = wp_vals[self._n_tasks % n_workers] # TODO repeated logic from `decide_worker` - print("fastpath") ts._group._last_worker = ws - # ts._group._last_worker_tasks_left = math.floor( - # len(ts._group) / self._total_nthreads - # ) + ts._group._last_worker_tasks_left = math.floor( + len(ts._group) / self._total_nthreads + ) if self._validate: assert ws is None or isinstance(ws, WorkerState), ( @@ -7546,9 +7545,7 @@ def decide_worker( break else: group: TaskGroup = ts._group - old: WorkerState = group._last_worker - - ws = min(candidates, key=objective) + ws = group._last_worker total_nthreads = sum( wws._nthreads for wws in candidates @@ -7559,41 +7556,20 @@ def decide_worker( # Try to schedule sibling root-like tasks on the same workers, so subsequent reduction tasks # don't require data transfer. Assumes `decide_worker` is being called in priority order. if ( - old is not None # there is a previous worker + ws is not None # there is a previous worker + and group._last_worker_tasks_left > 0 # previous worker not fully assigned and ts._dependents # task has dependents and group_tasks_per_worker > 1 # group is larger than cluster - # and group._last_worker_tasks_left > 0 # previous worker not fully assigned - and ts._prefix._duration_average + old.occupancy < ws.occupancy + math.ceil(group_tasks_per_worker) * ts._prefix._duration_average and ( # is a root-like task (task group depends on very few tasks) sum(map(len, group._dependencies)) < 5 # TODO what number ) ): - # group._last_worker_tasks_left -= 1 - print("using last worker") - return old - - print( - f"{ts.prefix_key}", - f"{ts._prefix._duration_average=}", - f"{old.occupancy=}" if old else None, - f"{ws.occupancy=}", - f"{group_tasks_per_worker=}", - ) - # print( - # f"{old is not None=}", - # f"{ts._dependents=}", - # f"{group_tasks_per_worker > 1=}", - # f"{ts._prefix._duration_average + old.occupancy}", - # f"{ws.occupancy + math.ceil(group_tasks_per_worker) * ts._prefix._duration_average}", - # f"{sum(map(len, group._dependencies))=}", - # f"{group._dependencies=}", - # f"{group=}", - # f"{ts=}", - # ) - - # ws = min(candidates, key=objective) + group._last_worker_tasks_left -= 1 + return ws + + ws = min(candidates, key=objective) group._last_worker = ws - # group._last_worker_tasks_left = math.floor(group_tasks_per_worker) + group._last_worker_tasks_left = math.floor(group_tasks_per_worker) return ws From 1484e65d1e2eca31f4047ab6903fe29dcd2114dc Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 9 Jun 2021 16:17:04 -0600 Subject: [PATCH 05/23] don't require dependents --- distributed/scheduler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 4d950d23478..e2042e57468 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7558,7 +7558,6 @@ def decide_worker( if ( ws is not None # there is a previous worker and group._last_worker_tasks_left > 0 # previous worker not fully assigned - and ts._dependents # task has dependents and group_tasks_per_worker > 1 # group is larger than cluster and ( # is a root-like task (task group depends on very few tasks) sum(map(len, group._dependencies)) < 5 # TODO what number From 5163e771aad02d1ce48a91f48dd1932c91a3b08c Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 9 Jun 2021 16:22:44 -0600 Subject: [PATCH 06/23] pass in total_nthreads --- distributed/scheduler.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index e2042e57468..a65e6911f34 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2363,6 +2363,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState: self._workers_dv.values(), valid_workers, partial(self.worker_objective, ts), + self._total_nthreads, ) else: # Fastpath when there are no related tasks or restrictions @@ -7498,7 +7499,11 @@ def _reevaluate_occupancy_worker(state: SchedulerState, ws: WorkerState): @cfunc @exceptval(check=False) def decide_worker( - ts: TaskState, all_workers, valid_workers: set, objective + ts: TaskState, + all_workers, + valid_workers: set, + objective, + total_nthreads: Py_ssize_t, ) -> WorkerState: """ Decide which worker should take task *ts*. @@ -7534,7 +7539,7 @@ def decide_worker( candidates = valid_workers if not candidates: if ts._loose_restrictions: - ws = decide_worker(ts, all_workers, None, objective) + ws = decide_worker(ts, all_workers, None, objective, total_nthreads) return ws ncandidates: Py_ssize_t = len(candidates) @@ -7547,9 +7552,8 @@ def decide_worker( group: TaskGroup = ts._group ws = group._last_worker - total_nthreads = sum( - wws._nthreads for wws in candidates - ) # TODO get `self._total_threads` from scheduler? Though that doesn't account for worker restrictions. + if valid_workers is not None: + total_nthreads = sum(wws._nthreads for wws in candidates) group_tasks_per_worker = len(group) / total_nthreads From d9df8be9e67525416f2d867aa2cdf01fee8cb278 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 9 Jun 2021 18:29:39 -0600 Subject: [PATCH 07/23] REVERTME debugging message for out-of-order --- distributed/scheduler.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index a65e6911f34..3153e484e34 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -952,6 +952,7 @@ class TaskGroup: _all_durations: object _last_worker: WorkerState _last_worker_tasks_left: int # TODO Py_ssize_t? + _last_worker_priority: tuple # TODO remove (debugging only) def __init__(self, name: str): self._name = name @@ -968,6 +969,7 @@ def __init__(self, name: str): self._all_durations = defaultdict(float) self._last_worker = None self._last_worker_tasks_left = 0 + self._last_worker_priority = () @property def name(self): @@ -1025,6 +1027,14 @@ def last_worker_tasks_left(self): def last_worker_tasks_left(self, n: int): self._last_worker_tasks_left = n + @property + def last_worker_priority(self): + return self._last_worker_priority + + @last_worker_priority.setter + def last_worker_priority(self, x: tuple): + self._last_worker_priority = x + @ccall def add(self, o): ts: TaskState = o @@ -2393,6 +2403,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState: ts._group._last_worker_tasks_left = math.floor( len(ts._group) / self._total_nthreads ) + ts._group._last_worker_priority = ts._priority if self._validate: assert ws is None or isinstance(ws, WorkerState), ( @@ -7568,11 +7579,21 @@ def decide_worker( ) ): group._last_worker_tasks_left -= 1 + if group._last_worker_priority >= ts.priority: + print( + f"decide_worker called out of priority order: {group._last_worker_priority} >= {ts.priority}.\n" + f"{ts=}\n" + f"{group.last_worker=}\n" + f"{group.last_worker_tasks_left=}\n" + f"{group_tasks_per_worker=}\n" + ) + group._last_worker_priority = ts.priority return ws ws = min(candidates, key=objective) group._last_worker = ws group._last_worker_tasks_left = math.floor(group_tasks_per_worker) + group._last_worker_priority = ts.priority return ws From 7b9728f5e579f6942a380fe5a0892ace69ee431a Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 11 Jun 2021 11:43:37 -0600 Subject: [PATCH 08/23] REVERTME print statements --- distributed/scheduler.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 3153e484e34..a1473475c61 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2399,6 +2399,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState: ws = wp_vals[self._n_tasks % n_workers] # TODO repeated logic from `decide_worker` + print(f"nodeps / no last worker fastpah - {ts.group_key}") ts._group._last_worker = ws ts._group._last_worker_tasks_left = math.floor( len(ts._group) / self._total_nthreads @@ -7588,8 +7589,10 @@ def decide_worker( f"{group_tasks_per_worker=}\n" ) group._last_worker_priority = ts.priority + print(f"reusing worker - {ts.group_key}") return ws + print(f"picking worker - {ts.group_key}") ws = min(candidates, key=objective) group._last_worker = ws group._last_worker_tasks_left = math.floor(group_tasks_per_worker) From 0fbb75e0fa57b13370c96423a662acd22186214f Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 15 Jun 2021 19:09:52 -0600 Subject: [PATCH 09/23] Ignore deps of root-likes. This is working well. When a task is root-like and the previous worker is full, we don't want to use the normal `decide_worker` logic, since that only considers as candidates workers that have the deps of the dask. Since the task only has 1-5 deps, we'd only ever consider the same 1-5 workers. --- distributed/scheduler.py | 116 +++++++++++++++++++++++---------------- 1 file changed, 68 insertions(+), 48 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index a1473475c61..1b6d7868ae8 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2399,7 +2399,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState: ws = wp_vals[self._n_tasks % n_workers] # TODO repeated logic from `decide_worker` - print(f"nodeps / no last worker fastpah - {ts.group_key}") + print(f"nodeps / no last worker fastpah - {ts.group_key} -> {ws.name}") ts._group._last_worker = ws ts._group._last_worker_tasks_left = math.floor( len(ts._group) / self._total_nthreads @@ -7532,53 +7532,28 @@ def decide_worker( of bytes sent between workers. This is determined by calling the *objective* function. """ - ws: WorkerState = None wws: WorkerState - dts: TaskState - deps: set = ts._dependencies - candidates: set - assert all([dts._who_has for dts in deps]) - if ts._actor: - candidates = set(all_workers) - else: - candidates = {wws for dts in deps for wws in dts._who_has} - if valid_workers is None: - if not candidates: - candidates = set(all_workers) - else: - candidates &= valid_workers - if not candidates: - candidates = valid_workers - if not candidates: - if ts._loose_restrictions: - ws = decide_worker(ts, all_workers, None, objective, total_nthreads) - return ws - ncandidates: Py_ssize_t = len(candidates) - if ncandidates == 0: - pass - elif ncandidates == 1: - for ws in candidates: - break - else: - group: TaskGroup = ts._group - ws = group._last_worker + group: TaskGroup = ts._group + ws: WorkerState = group._last_worker - if valid_workers is not None: - total_nthreads = sum(wws._nthreads for wws in candidates) + if valid_workers is not None: + total_nthreads = sum(wws._nthreads for wws in valid_workers) - group_tasks_per_worker = len(group) / total_nthreads + group_tasks_per_worker = len(group) / total_nthreads + ignore_deps_while_picking: bool = False - # Try to schedule sibling root-like tasks on the same workers, so subsequent reduction tasks - # don't require data transfer. Assumes `decide_worker` is being called in priority order. - if ( - ws is not None # there is a previous worker - and group._last_worker_tasks_left > 0 # previous worker not fully assigned - and group_tasks_per_worker > 1 # group is larger than cluster - and ( # is a root-like task (task group depends on very few tasks) - sum(map(len, group._dependencies)) < 5 # TODO what number - ) - ): + # Try to schedule sibling root-like tasks on the same workers, so subsequent reduction tasks + # don't require data transfer. Assumes `decide_worker` is being called in priority order. + if ( + ws is not None # there is a previous worker + and group_tasks_per_worker > 1 # group is larger than cluster + and ( # is a root-like task (task group is large, but depends on very few tasks) + sum(map(len, group._dependencies)) < 5 # TODO what number + ) + ): + if group._last_worker_tasks_left > 0: + # Previous worker not fully assigned group._last_worker_tasks_left -= 1 if group._last_worker_priority >= ts.priority: print( @@ -7589,15 +7564,60 @@ def decide_worker( f"{group_tasks_per_worker=}\n" ) group._last_worker_priority = ts.priority - print(f"reusing worker - {ts.group_key}") + print(f"reusing worker - {ts.group_key} -> {ws.name}") return ws - print(f"picking worker - {ts.group_key}") + # Previous worker is fully assigned, so pick a new worker. + # Since this is a root-like task, we should ignore the placement of its dependencies while selecting workers. + # Every worker is going to end up running this type of task eventually, and any dependencies will have to be + # transferred to all workers, so there's no gain from only considering workers where the dependencies already live. + # Indeed, we _must_ consider all workers, otherwise we would keep picking the same "new" worker(s) every time, + # since there are only N workers to choose from that actually have the dependency (where N <= n_deps). + ignore_deps_while_picking = True + print(f"{ts.group_key} is root-like but {ws.name} is full - picking a new worker") + + # Not a root-like task; pick the best worker among the valid workers + # that hold at least one dependency of this task. + deps: set = ts._dependencies + dts: TaskState + candidates: set + assert all([dts._who_has for dts in deps]) + if ignore_deps_while_picking: + candidates = valid_workers if valid_workers is not None else set(all_workers) + else: + if ts._actor: + candidates = set(all_workers) + else: + candidates = {wws for dts in deps for wws in dts._who_has} + if valid_workers is None: + if not candidates: + candidates = set(all_workers) + else: + candidates &= valid_workers + if not candidates: + candidates = valid_workers + if not candidates: + if ts._loose_restrictions: + ws = decide_worker(ts, all_workers, None, objective, total_nthreads) + return ws + + ncandidates: Py_ssize_t = len(candidates) + if ncandidates == 0: + print(f"no candidates - {ts.group_key}") + pass + elif ncandidates == 1: + # NOTE: this is the ideal case: all the deps are already on the same worker. + # We did a good job in previous `decide_worker`s! + for ws in candidates: + break + print(f"1 candidate - {ts.group_key} -> {ws.name}") + else: ws = min(candidates, key=objective) - group._last_worker = ws - group._last_worker_tasks_left = math.floor(group_tasks_per_worker) - group._last_worker_priority = ts.priority + print(f"picked worker - {ts.group_key} -> {ws.name}") + group._last_worker = ws + group._last_worker_tasks_left = math.floor(group_tasks_per_worker) + group._last_worker_priority = ts.priority return ws From f2da0bc0224c3a4cf1ca5760c4c005b1bc772959 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 16 Jun 2021 16:07:40 -0600 Subject: [PATCH 10/23] Save a few cycles Only compute `total_nthreads` when a new worker is needed, and only compute the number of dependencies once per task group. Overloads the meaning of `_last_worker` to indicate if we've decided in the past whether a TaskGroup is root-ish or not. --- distributed/scheduler.py | 106 +++++++++++++++++++++++---------------- 1 file changed, 63 insertions(+), 43 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 1b6d7868ae8..5e024750814 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7508,6 +7508,9 @@ def _reevaluate_occupancy_worker(state: SchedulerState, ws: WorkerState): steal.put_key_in_stealable(ts) +NOT_ROOT_ISH = WorkerState() + + @cfunc @exceptval(check=False) def decide_worker( @@ -7537,54 +7540,67 @@ def decide_worker( group: TaskGroup = ts._group ws: WorkerState = group._last_worker - if valid_workers is not None: - total_nthreads = sum(wws._nthreads for wws in valid_workers) + group_tasks_per_worker: float + if ws is None or (ws is not NOT_ROOT_ISH and group._last_worker_tasks_left) == 0: + # Calculate the ratio of tasks in the task group to number of workers. + # We only need to do this computation when 1) seeing a task group for the first time, + # or 2) this is a root-ish task group, and we've just filled up the worker we were + # sending tasks to and need to pick a new one. + if valid_workers is not None: + total_nthreads = sum(wws._nthreads for wws in valid_workers) - group_tasks_per_worker = len(group) / total_nthreads - ignore_deps_while_picking: bool = False + group_tasks_per_worker = len(group) / total_nthreads + else: + group_tasks_per_worker = float("nan") - # Try to schedule sibling root-like tasks on the same workers, so subsequent reduction tasks - # don't require data transfer. Assumes `decide_worker` is being called in priority order. - if ( - ws is not None # there is a previous worker - and group_tasks_per_worker > 1 # group is larger than cluster - and ( # is a root-like task (task group is large, but depends on very few tasks) - sum(map(len, group._dependencies)) < 5 # TODO what number - ) - ): - if group._last_worker_tasks_left > 0: - # Previous worker not fully assigned - group._last_worker_tasks_left -= 1 - if group._last_worker_priority >= ts.priority: - print( - f"decide_worker called out of priority order: {group._last_worker_priority} >= {ts.priority}.\n" - f"{ts=}\n" - f"{group.last_worker=}\n" - f"{group.last_worker_tasks_left=}\n" - f"{group_tasks_per_worker=}\n" - ) - group._last_worker_priority = ts.priority - print(f"reusing worker - {ts.group_key} -> {ws.name}") - return ws + is_root_ish: bool + if ws is None: + # Very fist task in the group - we haven't determined yet whether it's a root-ish task group + if ( + group_tasks_per_worker > 1 # group is larger than cluster + and ( # is a root-like task (task group is large, but depends on very few tasks) + sum(map(len, group._dependencies)) < 5 # TODO what number + ) + ): + is_root_ish = True + else: + is_root_ish = False + group._last_worker = NOT_ROOT_ISH + else: + # We've seen this task group before and already made the above determination + is_root_ish = ws is not NOT_ROOT_ISH + + if is_root_ish and ws is not None and group._last_worker_tasks_left > 0: + # Root-ish task and previous worker not fully assigned - reuse previous worker. + # (When the previous worker _is_ fully assigned, we fall through here to the pick-a-worker logic.) + if group._last_worker_priority >= ts.priority: + print( + f"decide_worker called out of priority order: {group._last_worker_priority} >= {ts.priority}.\n" + f"{ts=}\n" + f"{group.last_worker=}\n" + f"{group.last_worker_tasks_left=}\n" + f"{group_tasks_per_worker=}\n" + ) + group._last_worker_priority = ts.priority + group._last_worker_tasks_left -= 1 + print(f"reusing worker - {ts.group_key} -> {ws.name}") + return ws - # Previous worker is fully assigned, so pick a new worker. + # Pick a worker to run this task + deps: set = ts._dependencies + dts: TaskState + candidates: set + assert all([dts._who_has for dts in deps]) + if is_root_ish: + # Previous worker is fully assigned (or unknown), so pick a new worker. # Since this is a root-like task, we should ignore the placement of its dependencies while selecting workers. # Every worker is going to end up running this type of task eventually, and any dependencies will have to be # transferred to all workers, so there's no gain from only considering workers where the dependencies already live. # Indeed, we _must_ consider all workers, otherwise we would keep picking the same "new" worker(s) every time, # since there are only N workers to choose from that actually have the dependency (where N <= n_deps). - ignore_deps_while_picking = True - print(f"{ts.group_key} is root-like but {ws.name} is full - picking a new worker") - - # Not a root-like task; pick the best worker among the valid workers - # that hold at least one dependency of this task. - deps: set = ts._dependencies - dts: TaskState - candidates: set - assert all([dts._who_has for dts in deps]) - if ignore_deps_while_picking: candidates = valid_workers if valid_workers is not None else set(all_workers) else: + # Restrict placement of this task to workers that hold its dependencies if ts._actor: candidates = set(all_workers) else: @@ -7598,13 +7614,15 @@ def decide_worker( candidates = valid_workers if not candidates: if ts._loose_restrictions: - ws = decide_worker(ts, all_workers, None, objective, total_nthreads) + ws = decide_worker( + ts, all_workers, None, objective, total_nthreads + ) return ws ncandidates: Py_ssize_t = len(candidates) if ncandidates == 0: print(f"no candidates - {ts.group_key}") - pass + return None elif ncandidates == 1: # NOTE: this is the ideal case: all the deps are already on the same worker. # We did a good job in previous `decide_worker`s! @@ -7615,9 +7633,11 @@ def decide_worker( ws = min(candidates, key=objective) print(f"picked worker - {ts.group_key} -> {ws.name}") - group._last_worker = ws - group._last_worker_tasks_left = math.floor(group_tasks_per_worker) - group._last_worker_priority = ts.priority + if is_root_ish: + group._last_worker = ws + group._last_worker_tasks_left = math.floor(group_tasks_per_worker) + group._last_worker_priority = ts.priority + return ws From d3db281716044c419752a7f3de0008f67d1cd443 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 16 Jun 2021 16:48:49 -0600 Subject: [PATCH 11/23] WIP docs --- distributed/scheduler.py | 39 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5e024750814..6b17786c72c 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7520,9 +7520,46 @@ def decide_worker( objective, total_nthreads: Py_ssize_t, ) -> WorkerState: - """ + r""" Decide which worker should take task *ts*. + There are two modes: root(ish) tasks, and normal tasks. + + Root(ish) tasks + ~~~~~~~~~~~~~~~ + + Root(ish) have no (or very very few) dependencies and fan out widely: + they belong to TaskGroups that contain more tasks than there are workers. + We want neighboring root tasks to run on the same worker, since there's a + good chance those neighbors will be combined in a downstream operation: + + i j + / \ / \ + e f g h + | | | | + a b c d + \ \ / / + X + + In the above case, we want ``a`` and ``b`` to run on the same worker, + and ``c`` and ``d`` to run on the same worker, reducing future + data transfer. We can also ignore the location of ``X``, because + as a common dependency, it will eventually get transferred everywhere. + + Calculaing this directly from the graph would be expensive, so instead + we use task priority as a proxy. We aim to send tasks close in priority + within a `TaskGroup` to the same worker. To do this efficiently, we rely + on the fact that `decide_worker` is generally called in priority order + for root tasks (because `Scheduler.update_graph` creates recommendations + in priority order), and track only the last worker used for a `TaskGroup`, + and how many more tasks can be assigned to it before picking a new one. + + By colocating related root tasks, we ensure that placing thier downstream + normal tasks is set up for success. + + Normal tasks + ~~~~~~~~~~~~ + We choose the worker that has the data on which *ts* depends. If several workers have dependencies then we choose the less-busy worker. From c929c962d26aa98cec4c38ba60b9c889f80f6573 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 16 Jun 2021 17:01:52 -0600 Subject: [PATCH 12/23] comment out prints & handle out of priority order --- distributed/scheduler.py | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 6b17786c72c..eec59c4b9a8 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7610,18 +7610,24 @@ def decide_worker( if is_root_ish and ws is not None and group._last_worker_tasks_left > 0: # Root-ish task and previous worker not fully assigned - reuse previous worker. # (When the previous worker _is_ fully assigned, we fall through here to the pick-a-worker logic.) - if group._last_worker_priority >= ts.priority: - print( - f"decide_worker called out of priority order: {group._last_worker_priority} >= {ts.priority}.\n" - f"{ts=}\n" - f"{group.last_worker=}\n" - f"{group.last_worker_tasks_left=}\n" - f"{group_tasks_per_worker=}\n" - ) - group._last_worker_priority = ts.priority - group._last_worker_tasks_left -= 1 - print(f"reusing worker - {ts.group_key} -> {ws.name}") - return ws + if group._last_worker_priority < ts.priority: + group._last_worker_priority = ts.priority + group._last_worker_tasks_left -= 1 + # print(f"reusing worker - {ts.group_key} -> {ws.name}") + return ws + + # We're not being called in priority order---this is probably not actually a + # root-ish task; disable root task mode for its whole task group. + # print( + # f"decide_worker called out of priority order: {group._last_worker_priority} >= {ts.priority}.\n" + # f"{ts=}\n" + # f"{group.last_worker=}\n" + # f"{group.last_worker_tasks_left=}\n" + # f"{group_tasks_per_worker=}\n" + # ) + group._last_worker = NOT_ROOT_ISH + group._last_worker_tasks_left = 0 + is_root_ish = False # Pick a worker to run this task deps: set = ts._dependencies @@ -7658,17 +7664,17 @@ def decide_worker( ncandidates: Py_ssize_t = len(candidates) if ncandidates == 0: - print(f"no candidates - {ts.group_key}") + # print(f"no candidates - {ts.group_key}") return None elif ncandidates == 1: # NOTE: this is the ideal case: all the deps are already on the same worker. # We did a good job in previous `decide_worker`s! for ws in candidates: break - print(f"1 candidate - {ts.group_key} -> {ws.name}") + # print(f"1 candidate - {ts.group_key} -> {ws.name}") else: ws = min(candidates, key=objective) - print(f"picked worker - {ts.group_key} -> {ws.name}") + # print(f"picked worker - {ts.group_key} -> {ws.name}") if is_root_ish: group._last_worker = ws From f25ed42610c5723de38f549c88d239d26af74de3 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 22 Jun 2021 15:44:43 -0600 Subject: [PATCH 13/23] Revert "Save a few cycles" This reverts commit f2da0bc0224c3a4cf1ca5760c4c005b1bc772959. --- distributed/scheduler.py | 114 ++++++++++++++++++--------------------- 1 file changed, 51 insertions(+), 63 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index eec59c4b9a8..e47cc90a862 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7508,9 +7508,6 @@ def _reevaluate_occupancy_worker(state: SchedulerState, ws: WorkerState): steal.put_key_in_stealable(ts) -NOT_ROOT_ISH = WorkerState() - - @cfunc @exceptval(check=False) def decide_worker( @@ -7577,73 +7574,65 @@ def decide_worker( group: TaskGroup = ts._group ws: WorkerState = group._last_worker - group_tasks_per_worker: float - if ws is None or (ws is not NOT_ROOT_ISH and group._last_worker_tasks_left) == 0: - # Calculate the ratio of tasks in the task group to number of workers. - # We only need to do this computation when 1) seeing a task group for the first time, - # or 2) this is a root-ish task group, and we've just filled up the worker we were - # sending tasks to and need to pick a new one. - if valid_workers is not None: - total_nthreads = sum(wws._nthreads for wws in valid_workers) - - group_tasks_per_worker = len(group) / total_nthreads - else: - group_tasks_per_worker = float("nan") - - is_root_ish: bool - if ws is None: - # Very fist task in the group - we haven't determined yet whether it's a root-ish task group - if ( - group_tasks_per_worker > 1 # group is larger than cluster - and ( # is a root-like task (task group is large, but depends on very few tasks) - sum(map(len, group._dependencies)) < 5 # TODO what number - ) - ): - is_root_ish = True - else: - is_root_ish = False - group._last_worker = NOT_ROOT_ISH - else: - # We've seen this task group before and already made the above determination - is_root_ish = ws is not NOT_ROOT_ISH - - if is_root_ish and ws is not None and group._last_worker_tasks_left > 0: - # Root-ish task and previous worker not fully assigned - reuse previous worker. - # (When the previous worker _is_ fully assigned, we fall through here to the pick-a-worker logic.) - if group._last_worker_priority < ts.priority: - group._last_worker_priority = ts.priority + if valid_workers is not None: + total_nthreads = sum(wws._nthreads for wws in valid_workers) + + group_tasks_per_worker = len(group) / total_nthreads + ignore_deps_while_picking: bool = False + + # Try to schedule sibling root-like tasks on the same workers, so subsequent reduction tasks + # don't require data transfer. Assumes `decide_worker` is being called in priority order. + if ( + # there is a previous worker + ws is not None + # `decide_worker` hasn't previously been called out of priority order + and group._last_worker_priority is not None + # group is larger than cluster + and group_tasks_per_worker > 1 + # is a root-like task (task group is large, but depends on very few tasks) + and sum(map(len, group._dependencies)) < 5 # TODO what number + ): + if group._last_worker_tasks_left > 0: + # Previous worker not fully assigned group._last_worker_tasks_left -= 1 - # print(f"reusing worker - {ts.group_key} -> {ws.name}") - return ws - - # We're not being called in priority order---this is probably not actually a - # root-ish task; disable root task mode for its whole task group. + if group._last_worker_priority < ts.priority: + group._last_worker_priority = ts.priority + # print(f"reusing worker - {ts.group_key} -> {ws.name}") + return ws + + # print( + # f"decide_worker called out of priority order: {group._last_worker_priority} >= {ts.priority}.\n" + # f"{ts=}\n" + # f"{group.last_worker=}\n" + # f"{group.last_worker_tasks_left=}\n" + # f"{group_tasks_per_worker=}\n" + # ) + # `decide_worker` called out of priority order---this is probably not actually a root-ish task; + # disable root-ish mode in the future. + group._last_worker = None + group._last_worker_tasks_left = 0 + group._last_worker_priority = None + + # Previous worker is fully assigned, so pick a new worker. + # Since this is a root-like task, we should ignore the placement of its dependencies while selecting workers. + # Every worker is going to end up running this type of task eventually, and any dependencies will have to be + # transferred to all workers, so there's no gain from only considering workers where the dependencies already live. + # Indeed, we _must_ consider all workers, otherwise we would keep picking the same "new" worker(s) every time, + # since there are only N workers to choose from that actually have the dependency (where N <= n_deps). + ignore_deps_while_picking = True # print( - # f"decide_worker called out of priority order: {group._last_worker_priority} >= {ts.priority}.\n" - # f"{ts=}\n" - # f"{group.last_worker=}\n" - # f"{group.last_worker_tasks_left=}\n" - # f"{group_tasks_per_worker=}\n" + # f"{ts.group_key} is root-like but {ws.name} is full - picking a new worker" # ) - group._last_worker = NOT_ROOT_ISH - group._last_worker_tasks_left = 0 - is_root_ish = False - # Pick a worker to run this task + # Not a root-like task; pick the best worker among the valid workers + # that hold at least one dependency of this task. deps: set = ts._dependencies dts: TaskState candidates: set assert all([dts._who_has for dts in deps]) - if is_root_ish: - # Previous worker is fully assigned (or unknown), so pick a new worker. - # Since this is a root-like task, we should ignore the placement of its dependencies while selecting workers. - # Every worker is going to end up running this type of task eventually, and any dependencies will have to be - # transferred to all workers, so there's no gain from only considering workers where the dependencies already live. - # Indeed, we _must_ consider all workers, otherwise we would keep picking the same "new" worker(s) every time, - # since there are only N workers to choose from that actually have the dependency (where N <= n_deps). + if ignore_deps_while_picking: candidates = valid_workers if valid_workers is not None else set(all_workers) else: - # Restrict placement of this task to workers that hold its dependencies if ts._actor: candidates = set(all_workers) else: @@ -7665,7 +7654,7 @@ def decide_worker( ncandidates: Py_ssize_t = len(candidates) if ncandidates == 0: # print(f"no candidates - {ts.group_key}") - return None + pass elif ncandidates == 1: # NOTE: this is the ideal case: all the deps are already on the same worker. # We did a good job in previous `decide_worker`s! @@ -7676,11 +7665,10 @@ def decide_worker( ws = min(candidates, key=objective) # print(f"picked worker - {ts.group_key} -> {ws.name}") - if is_root_ish: + if group._last_worker_priority is not None: group._last_worker = ws group._last_worker_tasks_left = math.floor(group_tasks_per_worker) group._last_worker_priority = ts.priority - return ws From f50daf104619f1adcecc8b1411391355330ec19c Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 22 Jun 2021 19:04:45 -0600 Subject: [PATCH 14/23] Count the task that was just scheduled --- distributed/scheduler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index e47cc90a862..0ac30829629 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2401,8 +2401,8 @@ def decide_worker(self, ts: TaskState) -> WorkerState: # TODO repeated logic from `decide_worker` print(f"nodeps / no last worker fastpah - {ts.group_key} -> {ws.name}") ts._group._last_worker = ws - ts._group._last_worker_tasks_left = math.floor( - len(ts._group) / self._total_nthreads + ts._group._last_worker_tasks_left = ( + math.floor(len(ts._group) / self._total_nthreads) - 1 ) ts._group._last_worker_priority = ts._priority @@ -7667,7 +7667,7 @@ def decide_worker( if group._last_worker_priority is not None: group._last_worker = ws - group._last_worker_tasks_left = math.floor(group_tasks_per_worker) + group._last_worker_tasks_left = math.floor(group_tasks_per_worker) - 1 group._last_worker_priority = ts.priority return ws From 3a7350829613cbdfcea98168a35e19702f9a191d Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 22 Jun 2021 19:12:42 -0600 Subject: [PATCH 15/23] Balace fairly across heterogeneous workers --- distributed/scheduler.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 0ac30829629..f8078c67b86 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2401,8 +2401,9 @@ def decide_worker(self, ts: TaskState) -> WorkerState: # TODO repeated logic from `decide_worker` print(f"nodeps / no last worker fastpah - {ts.group_key} -> {ws.name}") ts._group._last_worker = ws + group_tasks_per_thread = len(ts._group) / self._total_nthreads ts._group._last_worker_tasks_left = ( - math.floor(len(ts._group) / self._total_nthreads) - 1 + math.floor(group_tasks_per_thread * ws._nthreads) - 1 ) ts._group._last_worker_priority = ts._priority @@ -7577,7 +7578,7 @@ def decide_worker( if valid_workers is not None: total_nthreads = sum(wws._nthreads for wws in valid_workers) - group_tasks_per_worker = len(group) / total_nthreads + group_tasks_per_thread = len(group) / total_nthreads ignore_deps_while_picking: bool = False # Try to schedule sibling root-like tasks on the same workers, so subsequent reduction tasks @@ -7588,7 +7589,7 @@ def decide_worker( # `decide_worker` hasn't previously been called out of priority order and group._last_worker_priority is not None # group is larger than cluster - and group_tasks_per_worker > 1 + and group_tasks_per_thread > 1 # is a root-like task (task group is large, but depends on very few tasks) and sum(map(len, group._dependencies)) < 5 # TODO what number ): @@ -7667,7 +7668,9 @@ def decide_worker( if group._last_worker_priority is not None: group._last_worker = ws - group._last_worker_tasks_left = math.floor(group_tasks_per_worker) - 1 + group._last_worker_tasks_left = ( + math.floor(group_tasks_per_thread * ws._nthreads) - 1 + ) group._last_worker_priority = ts.priority return ws From cfe37f6874dad6c7af088a5b699ab26c16696513 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 22 Jun 2021 18:24:35 -0600 Subject: [PATCH 16/23] Tests TODO test out of priority order --- distributed/tests/test_scheduler.py | 110 +++++++++++++++++++++++++++- 1 file changed, 109 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index f1aeef606d2..3b40ca5bbed 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -17,7 +17,7 @@ import dask from dask import delayed -from dask.utils import apply +from dask.utils import apply, stringify from distributed import Client, Nanny, Worker, fire_and_forget, wait from distributed.comm import Comm @@ -126,6 +126,114 @@ async def test_decide_worker_with_restrictions(client, s, a, b, c): assert x.key in a.data or x.key in b.data +@gen_cluster( + client=True, + nthreads=[("127.0.0.1", 1)] * 3, + config={"distributed.scheduler.work-stealing": False}, +) +async def test_decide_worker_select_candidate_holding_no_deps(client, s, a, b, c): + await client.submit(slowinc, 10, delay=0.1) # learn that slowinc is slow + root = await client.scatter(1) + assert sum(root.key in worker.data for worker in [a, b, c]) == 1 + + start = time() + tasks = client.map(slowinc, [root] * 6, delay=0.1, pure=False) + await wait(tasks) + elapsed = time() - start + + assert elapsed <= 4 + assert all(root.key in worker.data for worker in [a, b, c]), [ + list(worker.data.keys()) for worker in [a, b, c] + ] + + +@pytest.mark.parametrize("ndeps", [0, 1, 4]) +@pytest.mark.parametrize( + "nthreads", + [ + [("127.0.0.1", 1)] * 5, + [("127.0.0.1", 3), ("127.0.0.1", 2), ("127.0.0.1", 1)], + ], +) +def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads): + @gen_cluster( + client=True, + nthreads=nthreads, + config={"distributed.scheduler.work-stealing": False}, + ) + async def test(c, s, *workers): + """Ensure that related tasks end up on the same node""" + da = pytest.importorskip("dask.array") + np = pytest.importorskip("numpy") + + if ndeps == 0: + x = da.random.random((100, 100), chunks=(10, 10)) + else: + + def random(**kwargs): + assert len(kwargs) == ndeps + return np.random.random((10, 10)) + + trivial_deps = {f"k{i}": delayed(object()) for i in range(ndeps)} + + # TODO is there a simpler (non-blockwise) way to make this sort of graph? + x = da.blockwise( + random, + "yx", + new_axes={"y": (10,) * 10, "x": (10,) * 10}, + dtype=float, + **trivial_deps, + ) + + xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20)) + await xsum + + # Check that each chunk-row of the array is (mostly) stored on the same worker + primary_worker_key_fractions = [] + secondary_worker_key_fractions = [] + for i, keys in enumerate(x.__dask_keys__()): + # Iterate along rows of the array. + keys = set(stringify(k) for k in keys) + + # No more than 2 workers should have any keys + assert sum(any(k in w.data for k in keys) for w in workers) <= 2 + + # What fraction of the keys for this row does each worker hold? + key_fractions = [ + len(set(w.data).intersection(keys)) / len(keys) for w in workers + ] + key_fractions.sort() + # Primary worker: holds the highest percentage of keys + # Secondary worker: holds the second highest percentage of keys + primary_worker_key_fractions.append(key_fractions[-1]) + secondary_worker_key_fractions.append(key_fractions[-2]) + + # There may be one or two rows that were poorly split across workers, + # but the vast majority of rows should only be on one worker. + assert np.mean(primary_worker_key_fractions) >= 0.9 + assert np.median(primary_worker_key_fractions) == 1.0 + assert np.mean(secondary_worker_key_fractions) <= 0.1 + assert np.median(secondary_worker_key_fractions) == 0.0 + + # Check that there were few transfers + unexpected_transfers = [] + for worker in workers: + for log in worker.incoming_transfer_log: + keys = log["keys"] + # The root-ish tasks should never be transferred + assert not any(k.startswith("random") for k in keys), keys + # `object-` keys (the trivial deps of the root random tasks) should be transferred + if any(not k.startswith("object") for k in keys): + # But not many other things should be + unexpected_transfers.append(list(keys)) + + # A transfer at the very end to move aggregated results is fine (necessary with unbalanced workers in fact), + # but generally there should be very very few transfers. + assert len(unexpected_transfers) <= 2, unexpected_transfers + + test() + + @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3) async def test_move_data_over_break_restrictions(client, s, a, b, c): [x] = await client.scatter([1], workers=b.address) From 0b5486f4a9f0b7b4cdd39ee275739d0d82947cab Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 22 Jun 2021 21:41:32 -0600 Subject: [PATCH 17/23] Test both axes. I think this is excessive. --- distributed/tests/test_scheduler.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 3b40ca5bbed..64b04c20f44 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -17,7 +17,7 @@ import dask from dask import delayed -from dask.utils import apply, stringify +from dask.utils import apply, deepmap, stringify from distributed import Client, Nanny, Worker, fire_and_forget, wait from distributed.comm import Comm @@ -148,6 +148,7 @@ async def test_decide_worker_select_candidate_holding_no_deps(client, s, a, b, c @pytest.mark.parametrize("ndeps", [0, 1, 4]) +@pytest.mark.parametrize("axis", [1, 0]) @pytest.mark.parametrize( "nthreads", [ @@ -155,7 +156,7 @@ async def test_decide_worker_select_candidate_holding_no_deps(client, s, a, b, c [("127.0.0.1", 3), ("127.0.0.1", 2), ("127.0.0.1", 1)], ], ) -def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads): +def test_decide_worker_coschedule_order_neighbors(ndeps, axis, nthreads): @gen_cluster( client=True, nthreads=nthreads, @@ -185,15 +186,18 @@ def random(**kwargs): **trivial_deps, ) - xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20)) + xx, xsum = dask.persist(x, x.sum(axis=axis, split_every=20)) await xsum # Check that each chunk-row of the array is (mostly) stored on the same worker primary_worker_key_fractions = [] secondary_worker_key_fractions = [] - for i, keys in enumerate(x.__dask_keys__()): + + for keys in np.moveaxis( + np.array(deepmap(stringify, x.__dask_keys__())), axis, 1 + ): # Iterate along rows of the array. - keys = set(stringify(k) for k in keys) + keys = set(keys) # No more than 2 workers should have any keys assert sum(any(k in w.data for k in keys) for w in workers) <= 2 From 2c2bb6866f90df907f6ed52a5dd32946466425c3 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 22 Jun 2021 21:41:43 -0600 Subject: [PATCH 18/23] Revert "Test both axes. I think this is excessive." This reverts commit 0d1e2380b525b0ee7b4e60d9bee62f889ed5520b. --- distributed/tests/test_scheduler.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 64b04c20f44..3b40ca5bbed 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -17,7 +17,7 @@ import dask from dask import delayed -from dask.utils import apply, deepmap, stringify +from dask.utils import apply, stringify from distributed import Client, Nanny, Worker, fire_and_forget, wait from distributed.comm import Comm @@ -148,7 +148,6 @@ async def test_decide_worker_select_candidate_holding_no_deps(client, s, a, b, c @pytest.mark.parametrize("ndeps", [0, 1, 4]) -@pytest.mark.parametrize("axis", [1, 0]) @pytest.mark.parametrize( "nthreads", [ @@ -156,7 +155,7 @@ async def test_decide_worker_select_candidate_holding_no_deps(client, s, a, b, c [("127.0.0.1", 3), ("127.0.0.1", 2), ("127.0.0.1", 1)], ], ) -def test_decide_worker_coschedule_order_neighbors(ndeps, axis, nthreads): +def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads): @gen_cluster( client=True, nthreads=nthreads, @@ -186,18 +185,15 @@ def random(**kwargs): **trivial_deps, ) - xx, xsum = dask.persist(x, x.sum(axis=axis, split_every=20)) + xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20)) await xsum # Check that each chunk-row of the array is (mostly) stored on the same worker primary_worker_key_fractions = [] secondary_worker_key_fractions = [] - - for keys in np.moveaxis( - np.array(deepmap(stringify, x.__dask_keys__())), axis, 1 - ): + for i, keys in enumerate(x.__dask_keys__()): # Iterate along rows of the array. - keys = set(keys) + keys = set(stringify(k) for k in keys) # No more than 2 workers should have any keys assert sum(any(k in w.data for k in keys) for w in workers) <= 2 From 5e58b5a81c93a147c1518ea6247d7115c40bd4a6 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 23 Jun 2021 11:21:53 -0600 Subject: [PATCH 19/23] Handle zero-division Maybe avoiding -1 is excessive; I just wanted it to still work if we changed to a Py_ssize_t --- distributed/scheduler.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index f8078c67b86..78cb9b883b1 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2401,10 +2401,14 @@ def decide_worker(self, ts: TaskState) -> WorkerState: # TODO repeated logic from `decide_worker` print(f"nodeps / no last worker fastpah - {ts.group_key} -> {ws.name}") ts._group._last_worker = ws - group_tasks_per_thread = len(ts._group) / self._total_nthreads - ts._group._last_worker_tasks_left = ( - math.floor(group_tasks_per_thread * ws._nthreads) - 1 - ) + if self._total_nthreads > 0: + group_tasks_per_thread = len(ts._group) / self._total_nthreads + ts._group._last_worker_tasks_left = ( + math.floor(group_tasks_per_thread * ws._nthreads) - 1 + ) + else: + # Note: negative would have been fine, except if this ever becomes Py_ssize_t + ts._group._last_worker_tasks_left = 0 ts._group._last_worker_priority = ts._priority if self._validate: @@ -7578,7 +7582,7 @@ def decide_worker( if valid_workers is not None: total_nthreads = sum(wws._nthreads for wws in valid_workers) - group_tasks_per_thread = len(group) / total_nthreads + group_tasks_per_thread = (len(group) / total_nthreads) if total_nthreads > 0 else 0 ignore_deps_while_picking: bool = False # Try to schedule sibling root-like tasks on the same workers, so subsequent reduction tasks @@ -7670,6 +7674,8 @@ def decide_worker( group._last_worker = ws group._last_worker_tasks_left = ( math.floor(group_tasks_per_thread * ws._nthreads) - 1 + if group_tasks_per_thread > 0 + else 0 ) group._last_worker_priority = ts.priority return ws From 4132583be8e9dfd94a772bd75b392997355629f7 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 23 Jun 2021 11:23:43 -0600 Subject: [PATCH 20/23] Unset `_last_worker` when we lose a worker Arguably would be better to check that `ws` is in (valid_workers or all_workers); downside is that the common `all_workers` would require an O(n) search, since it's only `dict_values`, not `dict`. --- distributed/scheduler.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 78cb9b883b1..23522caaacb 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4716,6 +4716,9 @@ async def remove_worker(self, comm=None, address=None, safe=False, close=True): recommendations[ts._key] = "released" else: # pure data recommendations[ts._key] = "forgotten" + if ts._group._last_worker is ws: + ts._group._last_worker = None + ts._group._last_worker_tasks_left = 0 ws._has_what.clear() self.transitions(recommendations) @@ -6289,8 +6292,9 @@ async def retire_workers( logger.info("Retire workers %s", workers) # Keys orphaned by retiring those workers - keys = {k for w in workers for k in w.has_what} - keys = {ts._key for ts in keys if ts._who_has.issubset(workers)} + tasks = {ts for w in workers for ts in w.has_what} + keys = {ts._key for ts in tasks if ts._who_has.issubset(workers)} + groups = {ts._group for ts in tasks} if keys: other_workers = set(parent._workers_dv.values()) - workers @@ -6305,6 +6309,11 @@ async def retire_workers( lock=False, ) + for group in groups: + if group._last_worker in workers: + group._last_worker = None + group._last_worker_tasks_left = 0 + worker_keys = {ws._address: ws.identity() for ws in workers} if close_workers: await asyncio.gather( From df2cf70536afcd0056ad483a6adc917a5726f897 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 23 Jun 2021 11:33:57 -0600 Subject: [PATCH 21/23] Check last used worker is valid for this task --- distributed/scheduler.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 23522caaacb..2e7e7538a50 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7609,9 +7609,12 @@ def decide_worker( if group._last_worker_tasks_left > 0: # Previous worker not fully assigned group._last_worker_tasks_left -= 1 - if group._last_worker_priority < ts.priority: + if group._last_worker_priority < ts.priority and ( + valid_workers is None or ws in valid_workers + ): group._last_worker_priority = ts.priority # print(f"reusing worker - {ts.group_key} -> {ws.name}") + assert ws in all_workers # TODO just for tests right now; slow! return ws # print( @@ -7621,8 +7624,8 @@ def decide_worker( # f"{group.last_worker_tasks_left=}\n" # f"{group_tasks_per_worker=}\n" # ) - # `decide_worker` called out of priority order---this is probably not actually a root-ish task; - # disable root-ish mode in the future. + # `decide_worker` called out of priority order, or the last used worker is not valid for this task. + # This is probably not actually a root-ish task; disable root-ish mode in the future. group._last_worker = None group._last_worker_tasks_left = 0 group._last_worker_priority = None From 19107d80499be26e03e534818731ea308c7545f2 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 23 Jun 2021 11:35:52 -0600 Subject: [PATCH 22/23] comment out print --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 2e7e7538a50..3775b3d8847 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2399,7 +2399,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState: ws = wp_vals[self._n_tasks % n_workers] # TODO repeated logic from `decide_worker` - print(f"nodeps / no last worker fastpah - {ts.group_key} -> {ws.name}") + # print(f"nodeps / no last worker fastpah - {ts.group_key} -> {ws.name}") ts._group._last_worker = ws if self._total_nthreads > 0: group_tasks_per_thread = len(ts._group) / self._total_nthreads From 8e45244ffd3cd0b9b07a13f5c75b5dc316db119c Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 23 Jun 2021 15:44:00 -0600 Subject: [PATCH 23/23] Remove some cruft --- distributed/scheduler.py | 50 +++++++--------------------------------- 1 file changed, 8 insertions(+), 42 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 3775b3d8847..3bcae43b9a4 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2398,17 +2398,13 @@ def decide_worker(self, ts: TaskState) -> WorkerState: else: # dumb but fast in large case ws = wp_vals[self._n_tasks % n_workers] - # TODO repeated logic from `decide_worker` - # print(f"nodeps / no last worker fastpah - {ts.group_key} -> {ws.name}") ts._group._last_worker = ws - if self._total_nthreads > 0: - group_tasks_per_thread = len(ts._group) / self._total_nthreads - ts._group._last_worker_tasks_left = ( - math.floor(group_tasks_per_thread * ws._nthreads) - 1 - ) - else: - # Note: negative would have been fine, except if this ever becomes Py_ssize_t - ts._group._last_worker_tasks_left = 0 + group_tasks_per_thread = ( + len(ts._group) / self._total_nthreads if self._total_nthreads > 0 else 0 + ) + ts._group._last_worker_tasks_left = ( + math.floor(group_tasks_per_thread * ws._nthreads) - 1 + ) ts._group._last_worker_priority = ts._priority if self._validate: @@ -7594,36 +7590,22 @@ def decide_worker( group_tasks_per_thread = (len(group) / total_nthreads) if total_nthreads > 0 else 0 ignore_deps_while_picking: bool = False - # Try to schedule sibling root-like tasks on the same workers, so subsequent reduction tasks - # don't require data transfer. Assumes `decide_worker` is being called in priority order. + # Try to schedule sibling root-like tasks on the same workers. if ( - # there is a previous worker ws is not None - # `decide_worker` hasn't previously been called out of priority order and group._last_worker_priority is not None - # group is larger than cluster + # ^ `decide_worker` hasn't previously been called out of priority order and group_tasks_per_thread > 1 - # is a root-like task (task group is large, but depends on very few tasks) and sum(map(len, group._dependencies)) < 5 # TODO what number ): if group._last_worker_tasks_left > 0: - # Previous worker not fully assigned group._last_worker_tasks_left -= 1 if group._last_worker_priority < ts.priority and ( valid_workers is None or ws in valid_workers ): group._last_worker_priority = ts.priority - # print(f"reusing worker - {ts.group_key} -> {ws.name}") - assert ws in all_workers # TODO just for tests right now; slow! return ws - # print( - # f"decide_worker called out of priority order: {group._last_worker_priority} >= {ts.priority}.\n" - # f"{ts=}\n" - # f"{group.last_worker=}\n" - # f"{group.last_worker_tasks_left=}\n" - # f"{group_tasks_per_worker=}\n" - # ) # `decide_worker` called out of priority order, or the last used worker is not valid for this task. # This is probably not actually a root-ish task; disable root-ish mode in the future. group._last_worker = None @@ -7631,18 +7613,8 @@ def decide_worker( group._last_worker_priority = None # Previous worker is fully assigned, so pick a new worker. - # Since this is a root-like task, we should ignore the placement of its dependencies while selecting workers. - # Every worker is going to end up running this type of task eventually, and any dependencies will have to be - # transferred to all workers, so there's no gain from only considering workers where the dependencies already live. - # Indeed, we _must_ consider all workers, otherwise we would keep picking the same "new" worker(s) every time, - # since there are only N workers to choose from that actually have the dependency (where N <= n_deps). ignore_deps_while_picking = True - # print( - # f"{ts.group_key} is root-like but {ws.name} is full - picking a new worker" - # ) - # Not a root-like task; pick the best worker among the valid workers - # that hold at least one dependency of this task. deps: set = ts._dependencies dts: TaskState candidates: set @@ -7670,24 +7642,18 @@ def decide_worker( ncandidates: Py_ssize_t = len(candidates) if ncandidates == 0: - # print(f"no candidates - {ts.group_key}") pass elif ncandidates == 1: # NOTE: this is the ideal case: all the deps are already on the same worker. - # We did a good job in previous `decide_worker`s! for ws in candidates: break - # print(f"1 candidate - {ts.group_key} -> {ws.name}") else: ws = min(candidates, key=objective) - # print(f"picked worker - {ts.group_key} -> {ws.name}") if group._last_worker_priority is not None: group._last_worker = ws group._last_worker_tasks_left = ( math.floor(group_tasks_per_thread * ws._nthreads) - 1 - if group_tasks_per_thread > 0 - else 0 ) group._last_worker_priority = ts.priority return ws