diff --git a/distributed/scheduler.py b/distributed/scheduler.py index eb5828bf71..e6200b17aa 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1504,8 +1504,12 @@ class SchedulerState: #: Workers that are currently in running state running: set[WorkerState] #: Workers that are currently in running state and not fully utilized + #: Definition based on occupancy #: (actually a SortedDict, but the sortedcontainers package isn't annotated) idle: dict[str, WorkerState] + #: Similar to `idle` + #: Definition based on assigned tasks + idle_task_count: set[WorkerState] #: Workers that are fully utilized. May include non-running workers. saturated: set[WorkerState] total_nthreads: int @@ -1612,6 +1616,7 @@ def __init__( self.extensions = {} self.host_info = host_info self.idle = SortedDict() + self.idle_task_count = set() self.n_tasks = 0 self.resources = resources self.saturated = set() @@ -2146,13 +2151,16 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None: # (and actually pass in the task). assert not math.isinf(self.WORKER_SATURATION) - if not self.idle: + if not self.idle_task_count: # All workers busy? Task gets/stays queued. return None # Just pick the least busy worker. # NOTE: this will lead to worst-case scheduling with regards to co-assignment. - ws = min(self.idle.values(), key=lambda ws: len(ws.processing) / ws.nthreads) + ws = min( + self.idle_task_count, + key=lambda ws: len(ws.processing) / ws.nthreads, + ) if self.validate: assert not _worker_full(ws, self.WORKER_SATURATION), ( ws, @@ -2791,7 +2799,7 @@ def transition_waiting_queued(self, key, stimulus_id): worker_msgs: dict = {} if self.validate: - assert not self.idle, (ts, self.idle) + assert not self.idle_task_count, (ts, self.idle_task_count) _validate_ready(self, ts) ts.state = "queued" @@ -3062,25 +3070,23 @@ def check_idle_saturated(self, ws: WorkerState, occ: float = -1.0): idle = self.idle saturated = self.saturated - if ( - self.is_unoccupied(ws, occ, p) - if math.isinf(self.WORKER_SATURATION) - else not _worker_full(ws, self.WORKER_SATURATION) - ): + saturated.discard(ws) + if self.is_unoccupied(ws, occ, p): if ws.status == Status.running: idle[ws.address] = ws - saturated.discard(ws) else: idle.pop(ws.address, None) - nc = ws.nthreads if p > nc: pending = occ * (p - nc) / (p * nc) if 0.4 < pending > 1.9 * (self.total_occupancy / self.total_nthreads): saturated.add(ws) - return - saturated.discard(ws) + if not _worker_full(ws, self.WORKER_SATURATION): + if ws.status == Status.running: + self.idle_task_count.add(ws) + else: + self.idle_task_count.discard(ws) def is_unoccupied( self, ws: WorkerState, occupancy: float, nprocessing: int @@ -4746,6 +4752,7 @@ async def remove_worker( del self.stream_comms[address] del self.aliases[ws.name] self.idle.pop(ws.address, None) + self.idle_task_count.discard(ws) self.saturated.discard(ws) del self.workers[address] ws.status = Status.closed @@ -5333,6 +5340,7 @@ def handle_worker_status_change( else: self.running.discard(ws) self.idle.pop(ws.address, None) + self.idle_task_count.discard(ws) async def handle_request_refresh_who_has( self, keys: Iterable[str], worker: str, stimulus_id: str diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 48ad99db77..e5028f8746 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -398,6 +398,7 @@ async def test_queued_paused_new_worker(c, s, a, b): await asyncio.sleep(0.01) assert not s.idle + assert not s.idle_task_count assert not s.running async with Worker(s.address, nthreads=2) as w: @@ -446,6 +447,7 @@ async def test_queued_paused_unpaused(c, s, a, b, queue): assert not s.running assert not s.idle + assert not s.idle_task_count # un-pause a.status = Status.running @@ -455,7 +457,8 @@ async def test_queued_paused_unpaused(c, s, a, b, queue): if queue: assert not s.idle # workers should have been (or already were) filled - # If queuing is disabled, all workers might already be saturated when they un-pause. + # If queuing is disabled, all workers might already be saturated when they un-pause. + assert not s.idle_task_count await wait(final)