From c8ed3f6708213007ba34ecb12db981731671d7bf Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 15 Jun 2021 16:14:07 +0200 Subject: [PATCH 1/3] Improve work stealing for scaling situations --- distributed/stealing.py | 48 ++++++++++++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 10 deletions(-) diff --git a/distributed/stealing.py b/distributed/stealing.py index b352040bff..95f7b48962 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -1,5 +1,7 @@ import logging +import random from collections import defaultdict, deque +from functools import partial from math import log2 from time import time @@ -113,8 +115,6 @@ def steal_time_ratio(self, ts): For example a result of zero implies a task without dependencies. level: The location within a stealable list to place this value """ - if not ts.dependencies: # no dependencies fast path - return 0, 0 split = ts.prefix.name if split in fast_tasks: @@ -122,18 +122,14 @@ def steal_time_ratio(self, ts): ws = ts.processing_on compute_time = ws.processing[ts] - if compute_time < 0.005: # 5ms, just give up - return None, None nbytes = ts.get_nbytes_deps() transfer_time = nbytes / self.scheduler.bandwidth + LATENCY cost_multiplier = transfer_time / compute_time - if cost_multiplier > 100: - return None, None level = int(round(log2(cost_multiplier) + 6)) - if level < 1: - level = 1 + + level = min(len(self.cost_multipliers) - 1, level) return cost_multiplier, level @@ -344,7 +340,10 @@ def maybe_move_task(level, ts, sat, idl, duration, cost_multiplier): thieves = idle if not thieves: break - thief = thieves[i % len(thieves)] + + thief = self._maybe_pick_thief(ts, thieves) + if not thief: + thief = thieves[i % len(thieves)] duration = sat.processing.get(ts) if duration is None: @@ -380,7 +379,10 @@ def maybe_move_task(level, ts, sat, idl, duration, cost_multiplier): thieves = idle if not thieves: continue - thief = thieves[i % len(thieves)] + thief = self._maybe_pick_thief(ts, thieves) + if not thief: + thief = thieves[i % len(thieves)] + duration = sat.processing[ts] maybe_move_task( @@ -394,6 +396,32 @@ def maybe_move_task(level, ts, sat, idl, duration, cost_multiplier): if s.digests: s.digests["steal-duration"].add(stop - start) + def _maybe_pick_thief(self, ts, thieves): + """Try to be smart about picking a thief given some options. We're + trying to pick a thief which has dependencies for a given task, if + possible and will pick the one which works best for us given the + Scheduler.worker_objective + + If no idle worker with dependencies is found, this returns None. + """ + if ts._dependencies: + who_has = set() + for dep in ts._dependencies: + who_has.update(dep.who_has) + + thieves_with_data = who_has & set(thieves) + + # If there are potential thieves with dependencies we + # should prefer them and pick the one which works best. + # Otherwise just random/round robin + if thieves_with_data: + if len(thieves_with_data) > 10: + thieves_with_data = random.sample(thieves_with_data, 10) + return min( + thieves_with_data, + key=partial(self.scheduler.worker_objective, ts), + ) + def restart(self, scheduler): for stealable in self.stealable.values(): for s in stealable: From f42faec181b01ff3e7bff87e05ee69b25307a215 Mon Sep 17 00:00:00 2001 From: fjetter Date: Mon, 21 Jun 2021 17:21:20 +0200 Subject: [PATCH 2/3] Fix stealing dashboard --- distributed/dashboard/components/scheduler.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index b00bcfb94b..4ef0c6b5a7 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -1144,7 +1144,11 @@ class StealingTimeSeries(DashboardComponent): def __init__(self, scheduler, **kwargs): self.scheduler = scheduler self.source = ColumnDataSource( - {"time": [time(), time() + 1], "idle": [0, 0.1], "saturated": [0, 0.1]} + { + "time": [time() * 1000, time() * 1000 + 1], + "idle": [0, 0], + "saturated": [0, 0], + } ) x_range = DataRange1d(follow="end", follow_interval=20000, range_padding=0) @@ -1152,7 +1156,6 @@ def __init__(self, scheduler, **kwargs): self.root = figure( title="Idle and Saturated Workers Over Time", x_axis_type="datetime", - y_range=[-0.1, len(scheduler.workers) + 0.1], height=150, tools="", x_range=x_range, @@ -1204,7 +1207,6 @@ def __init__(self, scheduler, **kwargs): self.root = figure( title="Stealing Events", x_axis_type="datetime", - y_axis_type="log", height=250, tools="", x_range=x_range, @@ -1214,12 +1216,12 @@ def __init__(self, scheduler, **kwargs): self.root.circle( source=self.source, x="time", - y="cost_factor", + y="level", color="color", size="radius", alpha=0.5, ) - self.root.yaxis.axis_label = "Cost Multiplier" + self.root.yaxis.axis_label = "Level" hover = HoverTool() hover.tooltips = "Level: @level, Duration: @duration, Count: @count, Cost factor: @cost_factor" @@ -1262,9 +1264,11 @@ def convert(self, msgs): def update(self): with log_errors(): log = self.scheduler.get_events(topic="stealing") - n = self.steal.count - self.last + current = len(self.scheduler.events["stealing"]) + n = current - self.last + log = [log[-i][1] for i in range(1, n + 1) if isinstance(log[-i][1], list)] - self.last = self.steal.count + self.last = current if log: new = pipe( From 6d9545decad44b00f9ca83e046f46744356f7a7e Mon Sep 17 00:00:00 2001 From: fjetter Date: Mon, 21 Jun 2021 19:16:52 +0200 Subject: [PATCH 3/3] Add shortpath for no-dependency tasks --- distributed/stealing.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/distributed/stealing.py b/distributed/stealing.py index 95f7b48962..3a7d39d72f 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -115,7 +115,8 @@ def steal_time_ratio(self, ts): For example a result of zero implies a task without dependencies. level: The location within a stealable list to place this value """ - + if not ts.dependencies: # no dependencies fast path + return 0, 0 split = ts.prefix.name if split in fast_tasks: return None, None @@ -128,7 +129,8 @@ def steal_time_ratio(self, ts): cost_multiplier = transfer_time / compute_time level = int(round(log2(cost_multiplier) + 6)) - + if level < 1: + level = 1 level = min(len(self.cost_multipliers) - 1, level) return cost_multiplier, level @@ -282,6 +284,7 @@ def maybe_move_task(level, ts, sat, idl, duration, cost_multiplier): if occ_idl + cost_multiplier * duration <= occ_sat - duration / 2: self.move_task_request(ts, sat, idl) log.append( + # The format of this message is tightly coupled to the dashboard ( start, level, @@ -409,14 +412,14 @@ def _maybe_pick_thief(self, ts, thieves): for dep in ts._dependencies: who_has.update(dep.who_has) - thieves_with_data = who_has & set(thieves) + thieves_with_data = list(who_has & set(thieves)) # If there are potential thieves with dependencies we # should prefer them and pick the one which works best. # Otherwise just random/round robin if thieves_with_data: if len(thieves_with_data) > 10: - thieves_with_data = random.sample(thieves_with_data, 10) + thieves_with_data = random.choices(thieves_with_data, k=10) return min( thieves_with_data, key=partial(self.scheduler.worker_objective, ts),