diff --git a/distributed/scheduler.py b/distributed/scheduler.py index c9ddc2ec67c..c26ef38bb8f 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1062,7 +1062,9 @@ class TaskState: #: Cached hash of :attr:`~TaskState.client_key` _hash: int - __slots__ = tuple(__annotations__) # type: ignore + __slots__ = tuple(__annotations__) + ("__weakref__",) # type: ignore + + _instances: weakref.WeakSet[TaskState] = weakref.WeakSet() def __init__(self, key: str, run_spec: object): self.key = key @@ -1098,6 +1100,7 @@ def __init__(self, key: str, run_spec: object): self.metadata = {} self.annotations = {} self.erred_on = set() + TaskState._instances.add(self) def __hash__(self) -> int: return self._hash diff --git a/distributed/utils_test.py b/distributed/utils_test.py index fc75ea18177..a2756976cec 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -56,7 +56,9 @@ from distributed.nanny import Nanny from distributed.node import ServerNode from distributed.proctitle import enable_proctitle_on_children +from distributed.profile import wait_profiler from distributed.protocol import deserialize +from distributed.scheduler import TaskState from distributed.security import Security from distributed.utils import ( DequeHandler, @@ -1793,6 +1795,11 @@ def check_instances(): _global_clients.clear() + wait_profiler() + gc.collect() + + assert not TaskState._instances + for w in Worker._instances: with suppress(RuntimeError): # closed IOLoop w.loop.add_callback(w.close, report=False, executor_wait=False)