Skip to content

Commit

Permalink
Merge branch 'main' into log-length
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Sep 22, 2023
2 parents 83e077b + b6333df commit 7e780c7
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 14 deletions.
4 changes: 2 additions & 2 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8446,8 +8446,8 @@ def allowed_failures(self) -> int:

def __str__(self) -> str:
return (
f"Attempted to run task {self.task} on {self.allowed_failures} different "
"workers, but all those workers died while running it. "
f"Attempted to run task {self.task} on {self.allowed_failures + 1} "
"different workers, but all those workers died while running it. "
f"The last worker that attempt to run the task was {self.last_worker.address}. "
"Inspecting worker logs is often a good next step to diagnose what went wrong. "
"For more information see https://distributed.dask.org/en/stable/killed.html."
Expand Down
14 changes: 6 additions & 8 deletions distributed/shuffle/_merge.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# mypy: ignore-errors
from __future__ import annotations

from collections import defaultdict
from collections.abc import Iterable, Sequence
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -243,15 +242,14 @@ def _cull_dependencies(
all input partitions. This method does not require graph
materialization.
"""
deps = defaultdict(set)
deps = {}
parts_out = parts_out or self._keys_to_parts(keys)
keys = {(self.name_input_left, i) for i in range(self.npartitions)}
keys |= {(self.name_input_right, i) for i in range(self.npartitions)}
# Protect against mutations later on with frozenset
keys = frozenset(keys)
for part in parts_out:
deps[(self.name, part)] |= {
(self.name_input_left, i) for i in range(self.npartitions)
}
deps[(self.name, part)] |= {
(self.name_input_right, i) for i in range(self.npartitions)
}
deps[(self.name, part)] = keys
return deps

def _keys_to_parts(self, keys: Iterable[str]) -> set[str]:
Expand Down
7 changes: 5 additions & 2 deletions distributed/shuffle/_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,11 @@ def cull(
parameter.
"""
parts_out = self._keys_to_parts(keys)
input_parts = {(self.name_input, i) for i in range(self.npartitions_input)}
culled_deps = {(self.name, part): input_parts.copy() for part in parts_out}
# Protect against mutations later on with frozenset
input_parts = frozenset(
{(self.name_input, i) for i in range(self.npartitions_input)}
)
culled_deps = {(self.name, part): input_parts for part in parts_out}

if parts_out != set(self.parts_out):
culled_layer = self._cull(parts_out)
Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4244,8 +4244,8 @@ async def test_KilledWorker_informative_message(s, a, b):
with pytest.raises(KilledWorker) as excinfo:
raise ex
msg = str(excinfo.value)
assert "Attempted to run task foo-bar" in msg
assert str(s.allowed_failures) in msg
assert "Attempted to run task foo-bar on 667 different workers" in msg
assert a.address in msg
assert "worker logs" in msg
assert "https://distributed.dask.org/en/stable/killed.html" in msg

Expand Down

0 comments on commit 7e780c7

Please sign in to comment.