From 9ad1608680224584e9b3b42ea47e742f37ea6075 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 31 Oct 2022 11:34:28 +0000 Subject: [PATCH] Mostly cosmetic tweaks --- distributed/scheduler.py | 35 ++++++++++++++--------------- distributed/tests/test_scheduler.py | 9 +++----- 2 files changed, 20 insertions(+), 24 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 76d60f46ac..98a4174775 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3288,7 +3288,7 @@ def bulk_schedule_after_adding_worker(self, ws: WorkerState) -> Recs: Returns priority-ordered recommendations. """ - maybe_runnable = list(_next_queued_when_slot_maybe_opened(self, ws))[::-1] + maybe_runnable = list(_next_queued_tasks_for_worker(self, ws))[::-1] # Schedule any restricted tasks onto the new worker, if the worker can run them for ts in self.unrunnable: @@ -5326,8 +5326,7 @@ def handle_long_running( self.check_idle_saturated(ws) recommendations = { - qts.key: "processing" - for qts in _next_queued_when_slot_maybe_opened(self, ws) + qts.key: "processing" for qts in _next_queued_tasks_for_worker(self, ws) } if self.validate: assert len(recommendations) <= 1, (ws, recommendations) @@ -7882,7 +7881,7 @@ def _exit_processing_common( state.check_idle_saturated(ws) state.release_resources(ts, ws) - for qts in _next_queued_when_slot_maybe_opened(state, ws): + for qts in _next_queued_tasks_for_worker(state, ws): if state.validate: assert qts.key not in recommendations, recommendations[qts.key] recommendations[qts.key] = "processing" @@ -7890,22 +7889,22 @@ def _exit_processing_common( return ws -def _next_queued_when_slot_maybe_opened( +def _next_queued_tasks_for_worker( state: SchedulerState, ws: WorkerState ) -> Iterator[TaskState]: - "Queued tasks to run, in priority order, if a slot may have opened up on this worker." - if state.queued and ws.status == Status.running: - # NOTE: this is called most frequently because a single task has completed, so - # there are <= 1 task slots available on the worker. `peekn` has fastpahs for - # these cases N<=0 and N==1. - for qts in state.queued.peekn( - _task_slots_available(ws, state.WORKER_SATURATION) - ): - if state.validate: - assert qts.state == "queued", qts.state - assert not qts.processing_on - assert not qts.waiting_on - yield qts + """Queued tasks to run, in priority order, on all open slots on a worker""" + if not state.queued or ws.status != Status.running: + return + + # NOTE: this is called most frequently because a single task has completed, so there + # are <= 1 task slots available on the worker. + # `peekn` has fast paths for the cases N<=0 and N==1. + for qts in state.queued.peekn(_task_slots_available(ws, state.WORKER_SATURATION)): + if state.validate: + assert qts.state == "queued", qts.state + assert not qts.processing_on + assert not qts.waiting_on + yield qts def _add_to_memory( diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 391cd554f1..db231c317d 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -479,10 +479,7 @@ async def test_queued_remove_add_worker(c, s, a, b): await wait(fs) -@gen_cluster( - client=True, - nthreads=[("", 1)], -) +@gen_cluster(client=True, nthreads=[("", 1)]) async def test_secede_opens_slot(c, s, a): first = Event() second = Event() @@ -493,10 +490,10 @@ def func(first, second): second.wait() fs = c.map(func, [first] * 5, [second] * 5) - await async_wait_for(lambda: a.state.executing, 5) + await async_wait_for(lambda: a.state.executing, timeout=5) await first.set() - await async_wait_for(lambda: len(a.state.tasks) == len(fs), 5) + await async_wait_for(lambda: len(a.state.long_running) == len(fs), timeout=5) await second.set() await c.gather(fs)