Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fail_hard decorator for worker methods #6210

Merged
merged 18 commits into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions distributed/tests/test_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,3 +679,17 @@ async def test_log_invalid_worker_task_states(c, s, a):

assert "released" in out + err
assert "task-name" in out + err


def test_worker_fail_hard(capsys):
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)])
async def test_fail_hard(c, s, a):
with pytest.raises(Exception):
await a.gather_dep(
worker="abcd", to_gather=["x"], total_nbytes=0, stimulus_id="foo"
)

with pytest.raises(Exception) as info:
test_fail_hard()

assert "abcd" in str(info.value)
26 changes: 25 additions & 1 deletion distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,21 @@
from distributed.comm.tcp import TCP
from distributed.compatibility import WINDOWS
from distributed.config import initialize_logging
from distributed.core import CommClosedError, ConnectionPool, Status, connect, rpc
from distributed.core import (
CommClosedError,
ConnectionPool,
Status,
clean_exception,
connect,
rpc,
)
from distributed.deploy import SpecCluster
from distributed.diagnostics.plugin import WorkerPlugin
from distributed.metrics import time
from distributed.nanny import Nanny
from distributed.node import ServerNode
from distributed.proctitle import enable_proctitle_on_children
from distributed.protocol import deserialize
from distributed.security import Security
from distributed.utils import (
DequeHandler,
Expand Down Expand Up @@ -867,6 +875,7 @@ async def start_cluster(
await s.close(fast=True)
check_invalid_worker_transitions(s)
check_invalid_task_states(s)
check_worker_fail_hard(s)
raise TimeoutError("Cluster creation timeout")
return s, workers

Expand Down Expand Up @@ -898,6 +907,20 @@ def check_invalid_task_states(s: Scheduler) -> None:
raise ValueError("Invalid worker task state")


def check_worker_fail_hard(s: Scheduler) -> None:
if not s.events.get("worker-fail-hard"):
return

for timestamp, msg in s.events["worker-fail-hard"]:
msg = msg.copy()
worker = msg.pop("worker")
msg["exception"] = deserialize(msg["exception"].header, msg["exception"].frames)
msg["traceback"] = deserialize(msg["traceback"].header, msg["traceback"].frames)
print("Failed worker", worker)
typ, exc, tb = clean_exception(**msg)
raise exc.with_traceback(tb)


async def end_cluster(s, workers):
logger.debug("Closing out test cluster")

Expand All @@ -910,6 +933,7 @@ async def end_worker(w):
s.stop()
check_invalid_worker_transitions(s)
check_invalid_task_states(s)
check_worker_fail_hard(s)


def gen_cluster(
Expand Down
46 changes: 37 additions & 9 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,32 @@
DEFAULT_STARTUP_INFORMATION: dict[str, Callable[[Worker], Any]] = {}


def fail_hard(method):
"""
Decorator to close the worker if this method encounters an exception
"""
assert iscoroutinefunction(method)

@functools.wraps(method)
async def wrapper(self, *args, **kwargs):
try:
return await method(self, *args, **kwargs)
except Exception as e:
logger.exception(e)
self.log_event(
"worker-fail-hard",
{
**error_message(e),
"worker": self.address,
},
)
# TODO: send event to scheduler
await self.close(nanny=False, executor_wait=False)
raise

return wrapper


class Worker(ServerNode):
"""Worker node in a Dask distributed cluster

Expand Down Expand Up @@ -900,14 +926,15 @@ def logs(self):
return self._deque_handler.deque

def log_event(self, topic, msg):
self.loop.add_callback(
self.batched_stream.send,
{
"op": "log-event",
"topic": topic,
"msg": msg,
},
)
full_msg = {
"op": "log-event",
"topic": topic,
"msg": msg,
}
if self.thread_id == threading.get_ident():
self.batched_stream.send(full_msg)
else:
self.loop.add_callback(self.batched_stream.send, full_msg)
fjetter marked this conversation as resolved.
Show resolved Hide resolved

@property
def executing_count(self) -> int:
Expand Down Expand Up @@ -1199,6 +1226,7 @@ async def heartbeat(self):
finally:
self.heartbeat_active = False

@fail_hard
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the decorator is here, what's the point of having this block anymore?

finally:
if self.reconnect and self.status in Status.ANY_RUNNING:
logger.info("Connection to scheduler broken. Reconnecting...")
self.loop.add_callback(self.heartbeat)
else:
logger.info(
"Connection to scheduler broken. Closing without reporting. Status: %s",
self.status,
)
await self.close(report=False)

The decorator is going to close the worker as soon as the finally block completes. So what's the point of trying to reconnect if we're going to close either way? These seem like opposing behaviors.

I think we want to either remove the try/except entirely from handle_scheduler (because all we want to do on error is close the worker, and @fail_hard will do that for us anyway), or not use @fail_hard here, if we do in fact still want to reconnect in the face of errors.

async def handle_scheduler(self, comm):
try:
await self.handle_stream(comm, every_cycle=[self.ensure_communicating])
Expand Down Expand Up @@ -3020,6 +3048,7 @@ def _update_metrics_received_data(
self.counters["transfer-count"].add(len(data))
self.incoming_count += 1

@fail_hard
@log_errors
async def gather_dep(
self,
Expand Down Expand Up @@ -4104,7 +4133,6 @@ def validate_state(self):

except Exception as e:
logger.error("Validate state failed. Closing.", exc_info=e)
self.loop.add_callback(self.close)
logger.exception(e)
if LOG_PDB:
import pdb
Expand Down