From 8e7c98bd3994489e63ddd36c178303fc1883e725 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 9 May 2022 12:30:36 +0200 Subject: [PATCH 1/3] Support receiving stimulus_id's in Scheduler.reschedule --- distributed/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index ad37548766..a8292fd3af 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -6697,7 +6697,7 @@ async def get_story(self, keys=()): transition_story = story - def reschedule(self, key=None, worker=None): + def reschedule(self, key=None, worker=None, stimulus_id=None): """Reschedule a task Things may have shifted and this task may now be better suited to run @@ -6715,7 +6715,7 @@ def reschedule(self, key=None, worker=None): return if worker and ts.processing_on.address != worker: return - self.transitions({key: "released"}, f"reschedule-{time()}") + self.transitions({key: "released"}, stimulus_id or f"reschedule-{time()}") ##################### # Utility functions # From 09eb798b6ca22648bfe4ed2faabf593d936726b7 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 9 May 2022 17:35:43 +0200 Subject: [PATCH 2/3] Address review comments --- distributed/scheduler.py | 4 ++-- distributed/tests/test_scheduler.py | 6 +++--- distributed/tests/test_steal.py | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index a8292fd3af..c0babc8174 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -6697,7 +6697,7 @@ async def get_story(self, keys=()): transition_story = story - def reschedule(self, key=None, worker=None, stimulus_id=None): + def reschedule(self, key: str, worker: str, stimulus_id: str): """Reschedule a task Things may have shifted and this task may now be better suited to run @@ -6715,7 +6715,7 @@ def reschedule(self, key=None, worker=None, stimulus_id=None): return if worker and ts.processing_on.address != worker: return - self.transitions({key: "released"}, stimulus_id or f"reschedule-{time()}") + self.transitions({key: "released"}, stimulus_id) ##################### # Utility functions # diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 784448c588..9e0f18bcf1 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -1481,7 +1481,7 @@ async def test_reschedule(c, s, a, b): await asyncio.sleep(0.001) for future in x: - s.reschedule(key=future.key) + s.reschedule(future.key, None, "test") # Worker b gets more of the original tasks await wait(x) @@ -1492,7 +1492,7 @@ async def test_reschedule(c, s, a, b): @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2) async def test_reschedule_warns(c, s, a, b): with captured_logger(logging.getLogger("distributed.scheduler")) as sched: - s.reschedule(key="__this-key-does-not-exist__") + s.reschedule("__this-key-does-not-exist__", None, "test") assert "not found on the scheduler" in sched.getvalue() assert "Aborting reschedule" in sched.getvalue() @@ -3301,7 +3301,7 @@ async def test_set_restrictions(c, s, a, b): await f s.set_restrictions(worker={f.key: a.address}) assert s.tasks[f.key].worker_restrictions == {a.address} - s.reschedule(f) + s.reschedule(f, None, "test") await f diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 9fc2420d6b..2c8835a134 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1109,7 +1109,7 @@ async def test_steal_reschedule_reset_in_flight_occupancy(c, s, *workers): steal.move_task_request(victim_ts, wsA, wsB) - s.reschedule(victim_key) + s.reschedule(victim_key, None, "test") await c.gather(futs1) del futs1 @@ -1175,7 +1175,7 @@ async def test_reschedule_concurrent_requests_deadlock(c, s, *workers): steal.move_task_request(victim_ts, wsA, wsB) s.set_restrictions(worker={victim_key: [wsB.address]}) - s.reschedule(victim_key) + s.reschedule(victim_key, None, "test") assert wsB == victim_ts.processing_on # move_task_request is not responsible for respecting worker restrictions steal.move_task_request(victim_ts, wsB, wsC) From f2f3a39f619d53add6219ea37486ae65d52b58c9 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 9 May 2022 17:40:08 +0200 Subject: [PATCH 3/3] Make Scheduler.reschule(stimulus_id) a mandatory kwarg --- distributed/scheduler.py | 2 +- distributed/tests/test_scheduler.py | 6 +++--- distributed/tests/test_steal.py | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index c0babc8174..cc00d815ce 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -6697,7 +6697,7 @@ async def get_story(self, keys=()): transition_story = story - def reschedule(self, key: str, worker: str, stimulus_id: str): + def reschedule(self, key: str, worker=None, *, stimulus_id: str): """Reschedule a task Things may have shifted and this task may now be better suited to run diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 9e0f18bcf1..e709e624e0 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -1481,7 +1481,7 @@ async def test_reschedule(c, s, a, b): await asyncio.sleep(0.001) for future in x: - s.reschedule(future.key, None, "test") + s.reschedule(future.key, stimulus_id="test") # Worker b gets more of the original tasks await wait(x) @@ -1492,7 +1492,7 @@ async def test_reschedule(c, s, a, b): @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2) async def test_reschedule_warns(c, s, a, b): with captured_logger(logging.getLogger("distributed.scheduler")) as sched: - s.reschedule("__this-key-does-not-exist__", None, "test") + s.reschedule("__this-key-does-not-exist__", stimulus_id="test") assert "not found on the scheduler" in sched.getvalue() assert "Aborting reschedule" in sched.getvalue() @@ -3301,7 +3301,7 @@ async def test_set_restrictions(c, s, a, b): await f s.set_restrictions(worker={f.key: a.address}) assert s.tasks[f.key].worker_restrictions == {a.address} - s.reschedule(f, None, "test") + s.reschedule(f, stimulus_id="test") await f diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 2c8835a134..fa61b0403d 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1109,7 +1109,7 @@ async def test_steal_reschedule_reset_in_flight_occupancy(c, s, *workers): steal.move_task_request(victim_ts, wsA, wsB) - s.reschedule(victim_key, None, "test") + s.reschedule(victim_key, stimulus_id="test") await c.gather(futs1) del futs1 @@ -1175,7 +1175,7 @@ async def test_reschedule_concurrent_requests_deadlock(c, s, *workers): steal.move_task_request(victim_ts, wsA, wsB) s.set_restrictions(worker={victim_key: [wsB.address]}) - s.reschedule(victim_key, None, "test") + s.reschedule(victim_key, stimulus_id="test") assert wsB == victim_ts.processing_on # move_task_request is not responsible for respecting worker restrictions steal.move_task_request(victim_ts, wsB, wsC)