Skip to content

Commit

Permalink
Mostly cosmetic tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Oct 31, 2022
1 parent 6e0467b commit 9ad1608
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 24 deletions.
35 changes: 17 additions & 18 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3288,7 +3288,7 @@ def bulk_schedule_after_adding_worker(self, ws: WorkerState) -> Recs:
Returns priority-ordered recommendations.
"""
maybe_runnable = list(_next_queued_when_slot_maybe_opened(self, ws))[::-1]
maybe_runnable = list(_next_queued_tasks_for_worker(self, ws))[::-1]

# Schedule any restricted tasks onto the new worker, if the worker can run them
for ts in self.unrunnable:
Expand Down Expand Up @@ -5326,8 +5326,7 @@ def handle_long_running(
self.check_idle_saturated(ws)

recommendations = {
qts.key: "processing"
for qts in _next_queued_when_slot_maybe_opened(self, ws)
qts.key: "processing" for qts in _next_queued_tasks_for_worker(self, ws)
}
if self.validate:
assert len(recommendations) <= 1, (ws, recommendations)
Expand Down Expand Up @@ -7882,30 +7881,30 @@ def _exit_processing_common(
state.check_idle_saturated(ws)
state.release_resources(ts, ws)

for qts in _next_queued_when_slot_maybe_opened(state, ws):
for qts in _next_queued_tasks_for_worker(state, ws):
if state.validate:
assert qts.key not in recommendations, recommendations[qts.key]
recommendations[qts.key] = "processing"

return ws


def _next_queued_when_slot_maybe_opened(
def _next_queued_tasks_for_worker(
state: SchedulerState, ws: WorkerState
) -> Iterator[TaskState]:
"Queued tasks to run, in priority order, if a slot may have opened up on this worker."
if state.queued and ws.status == Status.running:
# NOTE: this is called most frequently because a single task has completed, so
# there are <= 1 task slots available on the worker. `peekn` has fastpahs for
# these cases N<=0 and N==1.
for qts in state.queued.peekn(
_task_slots_available(ws, state.WORKER_SATURATION)
):
if state.validate:
assert qts.state == "queued", qts.state
assert not qts.processing_on
assert not qts.waiting_on
yield qts
"""Queued tasks to run, in priority order, on all open slots on a worker"""
if not state.queued or ws.status != Status.running:
return

# NOTE: this is called most frequently because a single task has completed, so there
# are <= 1 task slots available on the worker.
# `peekn` has fast paths for the cases N<=0 and N==1.
for qts in state.queued.peekn(_task_slots_available(ws, state.WORKER_SATURATION)):
if state.validate:
assert qts.state == "queued", qts.state
assert not qts.processing_on
assert not qts.waiting_on
yield qts


def _add_to_memory(
Expand Down
9 changes: 3 additions & 6 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,10 +479,7 @@ async def test_queued_remove_add_worker(c, s, a, b):
await wait(fs)


@gen_cluster(
client=True,
nthreads=[("", 1)],
)
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_secede_opens_slot(c, s, a):
first = Event()
second = Event()
Expand All @@ -493,10 +490,10 @@ def func(first, second):
second.wait()

fs = c.map(func, [first] * 5, [second] * 5)
await async_wait_for(lambda: a.state.executing, 5)
await async_wait_for(lambda: a.state.executing, timeout=5)

await first.set()
await async_wait_for(lambda: len(a.state.tasks) == len(fs), 5)
await async_wait_for(lambda: len(a.state.long_running) == len(fs), timeout=5)

await second.set()
await c.gather(fs)
Expand Down

0 comments on commit 9ad1608

Please sign in to comment.