Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove cancelled, resumed, and long-running states #6844

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
350 changes: 247 additions & 103 deletions distributed/tests/test_cancelled_state.py

Large diffs are not rendered by default.

32 changes: 31 additions & 1 deletion distributed/tests/test_reschedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from distributed import Event, Reschedule, get_worker, secede, wait
from distributed.utils_test import captured_logger, gen_cluster, slowinc
from distributed.worker_state_machine import (
ComputeTaskEvent,
FreeKeysEvent,
RescheduleEvent,
RescheduleMsg,
Expand Down Expand Up @@ -113,7 +114,7 @@ def test_cancelled_reschedule_worker_state(ws_with_running_task):

instructions = ws.handle_stimulus(FreeKeysEvent(keys=["x"], stimulus_id="s1"))
assert not instructions
assert ws.tasks["x"].state == "cancelled"
assert ws.tasks["x"].state == "released"
assert ws.available_resources == {"R": 0}

instructions = ws.handle_stimulus(RescheduleEvent(key="x", stimulus_id="s2"))
Expand All @@ -129,3 +130,32 @@ def test_reschedule_releases(ws_with_running_task):
assert instructions == [RescheduleMsg(stimulus_id="s1", key="x")]
assert ws.available_resources == {"R": 1}
assert "x" not in ws.tasks


def test_reschedule_cancelled(ws_with_running_task):
"""Test state loop:

executing -> (cancel task) -> released -> rescheduled
"""
ws = ws_with_running_task
instructions = ws.handle_stimulus(
FreeKeysEvent(keys=["x"], stimulus_id="s1"),
RescheduleEvent(key="x", stimulus_id="s2"),
)
assert not instructions
assert "x" not in ws.tasks


def test_reschedule_resumed(ws_with_running_task):
"""Test state loop:

executing -> (cancel task) -> released -> (resume task) -> executing -> rescheduled
"""
ws = ws_with_running_task
instructions = ws.handle_stimulus(
FreeKeysEvent(keys=["x"], stimulus_id="s1"),
ComputeTaskEvent.dummy("x", stimulus_id="s2", resource_restrictions={"R": 1}),
RescheduleEvent(key="x", stimulus_id="s3"),
)
assert instructions == [RescheduleMsg(key="x", stimulus_id="s3")]
assert "x" not in ws.tasks
76 changes: 76 additions & 0 deletions distributed/tests/test_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from distributed.worker_state_machine import (
ComputeTaskEvent,
Execute,
ExecuteFailureEvent,
ExecuteSuccessEvent,
FreeKeysEvent,
TaskFinishedMsg,
Expand Down Expand Up @@ -508,3 +509,78 @@ async def test_resources_from_python_override_config(c, s, a, b):
info = c.scheduler_info()
for worker in [a, b]:
assert info["workers"][worker.address]["resources"] == {"my_resources": 10}


@pytest.mark.parametrize("done_ev_cls", [ExecuteSuccessEvent, ExecuteFailureEvent])
def test_cancel_with_resources(ws_with_running_task, done_ev_cls):
"""Test transition loop of a task with resources:

executing -> (cancel task) -> released -> (complete execution)
"""
ws = ws_with_running_task
assert ws.tasks["x"].state == "executing"
assert ws.available_resources == {"R": 0}

ws.handle_stimulus(FreeKeysEvent(keys=["x"], stimulus_id="s1"))
assert ws.tasks["x"].state == "released"
assert ws.available_resources == {"R": 0}

ws.handle_stimulus(done_ev_cls.dummy("x", stimulus_id="s2"))
assert "x" not in ws.tasks
assert ws.available_resources == {"R": 1}


@pytest.mark.parametrize("done_ev_cls", [ExecuteSuccessEvent, ExecuteFailureEvent])
def test_cancel_then_flight_with_resources(ws_with_running_task, done_ev_cls):
"""Test transition loop of a task with resources:

executing -> (cancel task) -> released -> fetch -> flight -> (complete execution)
"""
ws = ws_with_running_task
ws2 = "127.0.0.1:2"
assert ws.available_resources == {"R": 0}
ws.handle_stimulus(FreeKeysEvent(keys=["x"], stimulus_id="s1"))
assert ws.tasks["x"].state == "released"
assert ws.available_resources == {"R": 0}

ws.handle_stimulus(
ComputeTaskEvent.dummy("y", who_has={"x": [ws2]}, stimulus_id="s2")
)
assert ws.tasks["x"].state == "flight"
assert ws.available_resources == {"R": 0}

ws.handle_stimulus(done_ev_cls.dummy("x", stimulus_id="s3"))
assert ws.tasks["x"].state == "flight"
assert ws.available_resources == {"R": 1}


@pytest.mark.parametrize(
"done_ev_cls,done_status",
[(ExecuteSuccessEvent, "memory"), (ExecuteFailureEvent, "error")],
)
def test_resume_with_different_resources(ws, done_ev_cls, done_status):
"""A task with resources is cancelled and then resumed to the same state, but with
different resources. This is actually possible in case of manual cancellation from
the client, followed by resubmit.
"""
ws.total_resources = {"R": 1}
ws.available_resources = {"R": 1}

ws.handle_stimulus(
ComputeTaskEvent.dummy("x", stimulus_id="s2", resource_restrictions={"R": 0.2})
)
assert ws.tasks["x"].state == "executing"
assert ws.available_resources == {"R": 0.8}

ws.handle_stimulus(FreeKeysEvent(keys=["x"], stimulus_id="s2"))
assert ws.tasks["x"].state == "released"
assert ws.available_resources == {"R": 0.8}

instructions = ws.handle_stimulus(
ComputeTaskEvent.dummy("x", stimulus_id="s3", resource_restrictions={"R": 0.3})
)
assert not instructions
assert ws.available_resources == {"R": 0.8}
ws.handle_stimulus(done_ev_cls.dummy(key="x", stimulus_id="s4"))
assert ws.tasks["x"].state == done_status
assert ws.available_resources == {"R": 1}
2 changes: 1 addition & 1 deletion distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -1330,7 +1330,7 @@ def test_steal_worker_state(ws_with_running_task):

ws.handle_stimulus(FreeKeysEvent(keys=["x"], stimulus_id="s1"))
assert ws.available_resources == {"R": 0}
assert ws.tasks["x"].state == "cancelled"
assert ws.tasks["x"].state == "released"

instructions = ws.handle_stimulus(ExecuteSuccessEvent.dummy("x", stimulus_id="s2"))
assert not instructions
Expand Down
6 changes: 3 additions & 3 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3221,7 +3221,7 @@ async def test_gather_dep_cancelled_rescheduled(c, s):
while fut4.key in b.state.tasks:
await asyncio.sleep(0)

assert b.state.tasks[fut2.key].state == "cancelled"
assert b.state.tasks[fut2.key].state == "released"

b.block_gather_dep.set()
await a.in_get_data.wait()
Expand Down Expand Up @@ -3259,7 +3259,7 @@ async def test_gather_dep_do_not_handle_response_of_not_requested_tasks(c, s, a)
while fut4.key in b.state.tasks:
await asyncio.sleep(0.01)

assert b.state.tasks[fut2.key].state == "cancelled"
assert b.state.tasks[fut2.key].state == "released"

b.block_gather_dep.set()

Expand Down Expand Up @@ -3287,7 +3287,7 @@ async def test_gather_dep_no_longer_in_flight_tasks(c, s, a):
while fut2.key in b.state.tasks:
await asyncio.sleep(0.01)

assert b.state.tasks[fut1.key].state == "cancelled"
assert b.state.tasks[fut1.key].state == "released"

b.block_gather_dep.set()
while fut2.key in b.state.tasks:
Expand Down
67 changes: 24 additions & 43 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,14 @@ def test_TaskState__to_dict():
a short repr and always appear in full directly under Worker.state.tasks.
Uninteresting fields are omitted.
"""
x = TaskState("x", state="memory", done=True)
x = TaskState("x", state="memory")
y = TaskState("y", priority=(0,), dependencies={x})
x.dependents.add(y)
actual = recursive_to_dict([x, y])
assert actual == [
{
"key": "x",
"state": "memory",
"done": True,
"dependents": ["<TaskState 'y' released>"],
},
{
Expand All @@ -123,12 +122,6 @@ def test_TaskState__to_dict():
def test_TaskState_repr():
ts = TaskState("x")
assert str(ts) == "<TaskState 'x' released>"
ts.state = "cancelled"
ts.previous = "flight"
assert str(ts) == "<TaskState 'x' cancelled(flight)>"
ts.state = "resumed"
ts.next = "waiting"
assert str(ts) == "<TaskState 'x' resumed(flight->waiting)>"


def test_WorkerState__to_dict(ws):
Expand Down Expand Up @@ -760,6 +753,7 @@ def test_new_replica_while_all_workers_in_flight(ws):
assert ws.tasks["y"].state == "flight"


# TODO before merge: move to test_cancelled_state.py
@gen_cluster(client=True)
async def test_cancelled_while_in_flight(c, s, a, b):
event = asyncio.Event()
Expand All @@ -769,7 +763,7 @@ async def test_cancelled_while_in_flight(c, s, a, b):
y = c.submit(inc, x, key="y", workers=[a.address])
await wait_for_state("x", "flight", a)
y.release()
await wait_for_state("x", "cancelled", a)
await wait_for_state("x", "released", a)

# Let the comm from b to a return the result
event.set()
Expand Down Expand Up @@ -1112,8 +1106,8 @@ def test_gather_priority(ws):
]


@pytest.mark.parametrize("state", ["executing", "long-running"])
def test_task_acquires_resources(ws, state):
@pytest.mark.parametrize("secede", [False, True])
def test_task_acquires_resources(ws, secede):
ws.available_resources = {"R": 1}
ws.total_resources = {"R": 1}

Expand All @@ -1122,11 +1116,11 @@ def test_task_acquires_resources(ws, state):
key="x", resource_restrictions={"R": 1}, stimulus_id="compute"
)
)
if state == "long-running":
if secede:
ws.handle_stimulus(
SecedeEvent(key="x", compute_duration=1.0, stimulus_id="secede")
)
assert ws.tasks["x"].state == state
assert ws.tasks["x"].state == "executing"
assert ws.available_resources == {"R": 0}


Expand Down Expand Up @@ -1169,39 +1163,30 @@ def test_task_with_dependencies_acquires_resources(ws):
assert ws.available_resources == {"R": 0}


@pytest.mark.parametrize(
"done_ev_cls,done_status",
[
(ExecuteSuccessEvent, "memory"),
pytest.param(
ExecuteFailureEvent,
"flight",
marks=pytest.mark.xfail(
reason="distributed#6682,distributed#6689,distributed#6693"
),
),
],
)
def test_resumed_task_releases_resources(
ws_with_running_task, done_ev_cls, done_status
):
# TODO before merge: move to test_cancelled_state.py
@pytest.mark.parametrize("done_ev_cls", [ExecuteSuccessEvent, ExecuteFailureEvent])
def test_resumed_task_releases_resources(ws_with_running_task, done_ev_cls):
ws = ws_with_running_task
assert ws.available_resources == {"R": 0}
ws2 = "127.0.0.1:2"

ws.handle_stimulus(FreeKeysEvent("cancel", ["x"]))
assert ws.tasks["x"].state == "cancelled"
instructions = ws.handle_stimulus(FreeKeysEvent("cancel", ["x"]))
assert not instructions
assert ws.tasks["x"].state == "released"
assert ws.available_resources == {"R": 0}

instructions = ws.handle_stimulus(
ComputeTaskEvent.dummy("y", who_has={"x": [ws2]}, stimulus_id="compute")
)
assert not instructions
assert ws.tasks["x"].state == "resumed"
assert instructions == [
GatherDep(worker=ws2, to_gather={"x"}, total_nbytes=1, stimulus_id="compute")
]
assert ws.tasks["x"].state == "flight"
assert ws.available_resources == {"R": 0}

ws.handle_stimulus(done_ev_cls.dummy("x", stimulus_id="s2"))
assert ws.tasks["x"].state == done_status
instructions = ws.handle_stimulus(done_ev_cls.dummy("x", stimulus_id="s2"))
assert not instructions
assert ws.tasks["x"].state == "flight"
assert ws.available_resources == {"R": 1}


Expand All @@ -1219,13 +1204,13 @@ def test_running_task_in_all_running_tasks(ws_with_running_task):
assert ts in ws.all_running_tasks

ws.handle_stimulus(FreeKeysEvent(keys=["x"], stimulus_id="s1"))
assert ts.state == "cancelled"
assert ts.state == "released"
assert ts in ws.all_running_tasks

ws.handle_stimulus(
ComputeTaskEvent.dummy("y", who_has={"x": [ws2]}, stimulus_id="s2")
)
assert ts.state == "resumed"
assert ts.state == "flight"
assert ts in ws.all_running_tasks


Expand All @@ -1249,11 +1234,7 @@ def test_done_task_not_in_all_running_tasks(
"done_ev_cls,done_status",
[
(ExecuteSuccessEvent, "memory"),
pytest.param(
ExecuteFailureEvent,
"flight",
marks=pytest.mark.xfail(reason="distributed#6689"),
),
(ExecuteFailureEvent, "flight"),
],
)
def test_done_resumed_task_not_in_all_running_tasks(
Expand All @@ -1268,7 +1249,7 @@ def test_done_resumed_task_not_in_all_running_tasks(
done_ev_cls.dummy("x", stimulus_id="s3"),
)
ts = ws.tasks["x"]
assert ts.state == done_status
assert ts.state == "flight"
assert ts not in ws.all_running_tasks


Expand Down
10 changes: 7 additions & 3 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2236,11 +2236,15 @@ def __init__(self, *args, **kwargs):

super().__init__(*args, **kwargs)

async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent:
async def execute(
self, key: str, *, priority: tuple[int, ...], stimulus_id: str
) -> StateMachineEvent:
self.in_execute.set()
await self.block_execute.wait()
try:
return await super().execute(key, stimulus_id=stimulus_id)
return await super().execute(
key, priority=priority, stimulus_id=stimulus_id
)
finally:
self.in_execute_exit.set()
await self.block_execute_exit.wait()
Expand Down Expand Up @@ -2391,7 +2395,7 @@ def ws_with_running_task(ws, request):
ws.handle_stimulus(
SecedeEvent(key="x", compute_duration=1.0, stimulus_id="secede")
)
assert ws.tasks["x"].state == request.param
assert ws.tasks["x"].state == "executing"
yield ws


Expand Down
Loading