Skip to content

Commit

Permalink
Worker call to Scheduler.reschedule will no longer fail
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed May 13, 2022
1 parent 50d2911 commit 3f20835
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 10 deletions.
10 changes: 6 additions & 4 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6590,7 +6590,9 @@ async def get_story(self, keys=()):

transition_story = story

def reschedule(self, key=None, worker=None):
def reschedule(
self, key: str, worker: str | None = None, *, stimulus_id: str
) -> None:
"""Reschedule a task
Things may have shifted and this task may now be better suited to run
Expand All @@ -6600,15 +6602,15 @@ def reschedule(self, key=None, worker=None):
ts = self.tasks[key]
except KeyError:
logger.warning(
"Attempting to reschedule task {}, which was not "
"found on the scheduler. Aborting reschedule.".format(key)
f"Attempting to reschedule task {key}, which was not "
"found on the scheduler. Aborting reschedule."
)
return
if ts.state != "processing":
return
if worker and ts.processing_on.address != worker:
return
self.transitions({key: "released"}, f"reschedule-{time()}")
self.transitions({key: "released"}, stimulus_id)

#####################
# Utility functions #
Expand Down
2 changes: 1 addition & 1 deletion distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ async def move_task_confirm(self, *, key, state, stimulus_id, worker=None):
*_log_msg,
)
)
self.scheduler.reschedule(key)
self.scheduler.reschedule(key, stimulus_id=stimulus_id)
# Victim had already started execution
elif state in _WORKER_STATE_REJECT:
self.log(("already-computing", *_log_msg))
Expand Down
6 changes: 3 additions & 3 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1482,7 +1482,7 @@ async def test_reschedule(c, s, a, b):
await asyncio.sleep(0.001)

for future in x:
s.reschedule(key=future.key)
s.reschedule(future.key, stimulus_id="test")

# Worker b gets more of the original tasks
await wait(x)
Expand All @@ -1493,7 +1493,7 @@ async def test_reschedule(c, s, a, b):
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2)
async def test_reschedule_warns(c, s, a, b):
with captured_logger(logging.getLogger("distributed.scheduler")) as sched:
s.reschedule(key="__this-key-does-not-exist__")
s.reschedule("__this-key-does-not-exist__", stimulus_id="test")

assert "not found on the scheduler" in sched.getvalue()
assert "Aborting reschedule" in sched.getvalue()
Expand Down Expand Up @@ -3358,7 +3358,7 @@ async def test_set_restrictions(c, s, a, b):
await f
s.set_restrictions(worker={f.key: a.address})
assert s.tasks[f.key].worker_restrictions == {a.address}
s.reschedule(f)
s.reschedule(f, stimulus_id="test")
await f


Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,7 @@ async def test_steal_reschedule_reset_in_flight_occupancy(c, s, *workers):

steal.move_task_request(victim_ts, wsA, wsB)

s.reschedule(victim_key)
s.reschedule(victim_key, stimulus_id="test")
await c.gather(futs1)

del futs1
Expand Down Expand Up @@ -1175,7 +1175,7 @@ async def test_reschedule_concurrent_requests_deadlock(c, s, *workers):
steal.move_task_request(victim_ts, wsA, wsB)

s.set_restrictions(worker={victim_key: [wsB.address]})
s.reschedule(victim_key)
s.reschedule(victim_key, stimulus_id="test")
assert wsB == victim_ts.processing_on
# move_task_request is not responsible for respecting worker restrictions
steal.move_task_request(victim_ts, wsB, wsC)
Expand Down

0 comments on commit 3f20835

Please sign in to comment.