Skip to content

Commit

Permalink
Send worker validation errors to scheduler and err on test completion (
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin authored Apr 25, 2022
1 parent 2ef5cf3 commit 198522b
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 1 deletion.
21 changes: 21 additions & 0 deletions distributed/tests/test_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,3 +640,24 @@ async def test_log_invalid_transitions(c, s, a):

assert "foo" in out + err
assert "task-name" in out + err


def test_invalid_worker_states(capsys):
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)])
async def test_log_invalid_worker_task_states(c, s, a):
x = c.submit(inc, 1, key="task-name")
await x
a.tasks[x.key].state = "released"
with pytest.raises(Exception):
a.validate_task(a.tasks[x.key])

while not s.events["invalid-worker-task-states"]:
await asyncio.sleep(0.01)

with pytest.raises(Exception) as info:
test_log_invalid_worker_task_states()

out, err = capsys.readouterr()

assert "released" in out + err
assert "task-name" in out + err
17 changes: 16 additions & 1 deletion distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,11 +866,12 @@ async def start_cluster(
await asyncio.gather(*(w.close(timeout=1) for w in workers))
await s.close(fast=True)
check_invalid_worker_transitions(s)
check_invalid_task_states(s)
raise TimeoutError("Cluster creation timeout")
return s, workers


def check_invalid_worker_transitions(s):
def check_invalid_worker_transitions(s: Scheduler) -> None:
if not s.events.get("invalid-worker-transition"):
return

Expand All @@ -884,6 +885,19 @@ def check_invalid_worker_transitions(s):
)


def check_invalid_task_states(s: Scheduler) -> None:
if not s.events.get("invalid-worker-task-states"):
return

for timestamp, msg in s.events["invalid-worker-task-states"]:
print("Worker:", msg["worker"])
print("State:", msg["state"])
for line in msg["story"]:
print(line)

raise ValueError("Invalid worker task state")


async def end_cluster(s, workers):
logger.debug("Closing out test cluster")

Expand All @@ -895,6 +909,7 @@ async def end_worker(w):
await s.close() # wait until scheduler stops completely
s.stop()
check_invalid_worker_transitions(s)
check_invalid_task_states(s)


def gen_cluster(
Expand Down
10 changes: 10 additions & 0 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4009,6 +4009,16 @@ def validate_task(self, ts):

pdb.set_trace()

self.log_event(
"invalid-worker-task-states",
{
"key": ts.key,
"state": ts.state,
"story": self.story(ts),
"worker": self.address,
},
)

raise AssertionError(
f"Invalid TaskState encountered for {ts!r}.\nStory:\n{self.story(ts)}\n"
) from e
Expand Down

0 comments on commit 198522b

Please sign in to comment.