Skip to content

Commit

Permalink
More fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Jun 2, 2022
1 parent b404dd4 commit 853a59f
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 107 deletions.
2 changes: 0 additions & 2 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3047,12 +3047,10 @@ async def test_task_flight_compute_oserror(c, s, a, b):
),
# inc is lost and needs to be recomputed. Therefore, sum is released
("free-keys", ("f1",)),
("f1", "purge-state"),
# The recommendations here are hard to predict. Whatever key is
# currently scheduled to be fetched, if any, will be recommended to be
# released.
("f1", "waiting", "released", "released", lambda msg: msg["f1"] == "forgotten"),
("f1", "purge-state"),
("f1", "released", "forgotten", "forgotten", {}),
# Now, we actually compute the task *once*. This must not cycle back
("f1", "compute-task", "released"),
Expand Down
20 changes: 20 additions & 0 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ async def test_missing_handle_compute_dependency(c, s, w1, w2, w3):

f1 = c.submit(inc, 1, key="f1", workers=[w1.address])
f2 = c.submit(inc, 2, key="f2", workers=[w1.address])
await wait_for_state(f1.key, "memory", w1)

w3.handle_acquire_replicas(
keys=[f1.key], who_has={f1.key: [w2.address]}, stimulus_id="acquire"
Expand All @@ -515,3 +516,22 @@ async def test_missing_handle_compute_dependency(c, s, w1, w2, w3):
f3 = c.submit(sum, [f1, f2], key="f3", workers=[w3.address])

await f3


@gen_cluster(client=True, nthreads=[("", 1)] * 3)
async def test_missing_to_waiting(c, s, w1, w2, w3):
w3.periodic_callbacks["find-missing"].stop()

f1 = c.submit(inc, 1, key="f1", workers=[w1.address], allow_other_workers=True)
await wait_for_state(f1.key, "memory", w1)

w3.handle_acquire_replicas(
keys=[f1.key], who_has={f1.key: [w2.address]}, stimulus_id="acquire"
)

await wait_for_state(f1.key, "missing", w3)

await w2.close()
await w1.close()

await f1
132 changes: 27 additions & 105 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2058,12 +2058,21 @@ def transition_generic_fetch(self, ts: TaskState, stimulus_id: str) -> RecsInstr
# _select_keys_for_gather().
return {}, [EnsureCommunicatingAfterTransitions(stimulus_id=stimulus_id)]

def transition_missing_waiting(
self, ts: TaskState, *, stimulus_id: str
) -> RecsInstrs:
self._missing_dep_flight.discard(ts)
self._purge_state(ts)
return self.transition_released_waiting(ts, stimulus_id=stimulus_id)

def transition_missing_fetch(
self, ts: TaskState, *, stimulus_id: str
) -> RecsInstrs:
if self.validate:
assert ts.state == "missing"
assert ts.who_has

if not ts.who_has:
return {}, []

self._missing_dep_flight.discard(ts)
return self.transition_generic_fetch(ts, stimulus_id=stimulus_id)
Expand Down Expand Up @@ -2105,7 +2114,7 @@ def transition_released_fetch(
def transition_generic_released(
self, ts: TaskState, *, stimulus_id: str
) -> RecsInstrs:
self._purge_state(ts.key, stimulus_id=stimulus_id)
self._purge_state(ts)
recs: Recs = {}
for dependency in ts.dependencies:
if (
Expand All @@ -2127,7 +2136,6 @@ def transition_released_waiting(
self, ts: TaskState, *, stimulus_id: str
) -> RecsInstrs:
if self.validate:
assert ts.state == "released"
assert all(d.key in self.tasks for d in ts.dependencies)

recommendations: Recs = {}
Expand All @@ -2136,10 +2144,7 @@ def transition_released_waiting(
if dep_ts.state != "memory":
ts.waiting_for_data.add(dep_ts)
dep_ts.waiters.add(ts)
# TODO: main branch claims this shouldn't be conditional since a
# recent change
if dep_ts.state not in {"fetch", "flight", "missing"}:
recommendations[dep_ts] = "fetch"
recommendations[dep_ts] = "fetch"

if ts.waiting_for_data:
self.waiting_for_data_count += 1
Expand Down Expand Up @@ -2447,7 +2452,6 @@ def transition_cancelled_released(
self, ts: TaskState, *, stimulus_id: str
) -> RecsInstrs:
if not ts.done:
ts._next = "released"
return {}, []
self._executing.discard(ts)
self._in_flight_tasks.discard(ts)
Expand Down Expand Up @@ -2656,7 +2660,7 @@ def transition_released_forgotten(
dep.dependents.discard(ts)
if dep.state == "released" and not dep.dependents:
recommendations[dep] = "forgotten"
self._purge_state(ts.key, stimulus_id=stimulus_id)
self._purge_state(ts)
# Mark state as forgotten in case it is still referenced
ts.state = "forgotten"
self.tasks.pop(ts.key, None)
Expand Down Expand Up @@ -2708,6 +2712,7 @@ def transition_released_forgotten(
("missing", "fetch"): transition_missing_fetch,
("missing", "released"): transition_missing_released,
("missing", "error"): transition_generic_error,
("missing", "waiting"): transition_missing_waiting,
("ready", "error"): transition_generic_error,
("ready", "executing"): transition_ready_executing,
("ready", "released"): transition_generic_released,
Expand Down Expand Up @@ -2793,7 +2798,9 @@ def _transition(
(recs, instructions),
self._transition(ts, finish, *args, stimulus_id=stimulus_id),
)
except InvalidTransition:
# ValueError may be raised by merge_recs_instructions
# TODO: should merge_recs raise InvalidTransition?
except (ValueError, InvalidTransition):
self.log_event(
"invalid-worker-transition",
{
Expand Down Expand Up @@ -3528,25 +3535,14 @@ def handle_worker_status_change(self, status: str, stimulus_id: str) -> None:
# Update status and send confirmation to the Scheduler (see status.setter)
self.status = new_status

def _purge_state(
self,
key: str,
*,
stimulus_id: str,
) -> None:
def _purge_state(self, ts: TaskState) -> None:
"""Ensure that TaskState attributes are reset to a neutral default and
Worker-level state associated to the provided key is cleared (e.g.
who_has)
This is idempotent
"""
ts = self.tasks[key]
logger.debug(
"Purge task key: %s state: %s; stimulus_id=%s",
ts.key,
ts.state,
stimulus_id,
)
self.log.append((key, "purge-state", stimulus_id, time()))
key = ts.key
logger.debug("Purge task key: %s state: %s; stimulus_id=%s", ts.key, ts.state)
self.data.pop(key, None)
self.actors.pop(key, None)

Expand Down Expand Up @@ -3976,7 +3972,8 @@ def _(self, ev: FindMissingEvent) -> RecsInstrs:
return {}, []

if self.validate:
assert not any(ts.who_has for ts in self._missing_dep_flight)
for ts in self._missing_dep_flight:
assert not ts.who_has, self.story(ts)

smsg = RequestRefreshWhoHasMsg(
keys=[ts.key for ts in self._missing_dep_flight],
Expand Down Expand Up @@ -4193,6 +4190,9 @@ def validate_task_executing(self, ts):
assert ts.run_spec is not None
assert ts.key not in self.data
assert not ts.waiting_for_data
for dep in ts.dependencies:
assert dep.state == "memory", self.story(dep)
assert dep.key in self.data or dep.key in self.actors

def validate_task_ready(self, ts):
assert ts.key in pluck(1, self.ready)
Expand Down Expand Up @@ -4240,7 +4240,8 @@ def validate_task_missing(self, ts):
def validate_task_cancelled(self, ts):
assert ts.key not in self.data
assert ts._previous in {"long-running", "executing", "flight"}
assert ts._next is None # We'll always transition to released after it is done
# We'll always transition to released after it is done
assert ts._next is None, (ts.key, ts._next, self.story(ts))

def validate_task_resumed(self, ts):
assert ts.key not in self.data
Expand Down Expand Up @@ -5173,82 +5174,3 @@ async def benchmark_network(

out[size_str] = total / (time() - start)
return out


[
(
"f1",
"ensure-task-exists",
"released",
"compute-task-1654160270.4501",
1654160270.450457,
),
(
"f1",
"released",
"fetch",
"fetch",
{},
"compute-task-1654160270.4501",
1654160270.450478,
),
(
"gather-dependencies",
"tcp://127.0.0.1:54108",
{"f1"},
"compute-task-1654160270.4501",
1654160270.45051,
),
(
"f1",
"fetch",
"flight",
"flight",
{},
"compute-task-1654160270.4501",
1654160270.4505181,
),
(
"request-dep",
"tcp://127.0.0.1:54108",
{"f1"},
"compute-task-1654160270.4501",
1654160270.450558,
),
(
"busy-gather",
"tcp://127.0.0.1:54108",
{"f1"},
"compute-task-1654160270.4501",
1654160270.4519591,
),
(
"f1",
"flight",
"fetch",
"fetch",
{},
"compute-task-1654160270.4501",
1654160270.4519749,
),
("f1", "purge-state", "worker-connect-1654160270.3552", 1654160270.4605172),
(
"f1",
"fetch",
"released",
"released",
{"f1": "forgotten"},
"worker-connect-1654160270.3552",
1654160270.460523,
),
("f1", "purge-state", "worker-connect-1654160270.3552", 1654160270.460525),
(
"f1",
"released",
"forgotten",
"forgotten",
{},
"worker-connect-1654160270.3552",
1654160270.4605281,
),
]

0 comments on commit 853a59f

Please sign in to comment.