Skip to content

Commit

Permalink
Track scheduler instances in
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait committed May 18, 2022
1 parent 32313c0 commit 97c862d
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
5 changes: 4 additions & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,9 @@ class TaskState:
#: Cached hash of :attr:`~TaskState.client_key`
_hash: int

__slots__ = tuple(__annotations__) # type: ignore
__slots__ = tuple(__annotations__) + ("__weakref__",) # type: ignore

_instances: weakref.WeakSet[TaskState] = weakref.WeakSet()

def __init__(self, key: str, run_spec: object):
self.key = key
Expand Down Expand Up @@ -1098,6 +1100,7 @@ def __init__(self, key: str, run_spec: object):
self.metadata = {}
self.annotations = {}
self.erred_on = set()
TaskState._instances.add(self)

def __hash__(self) -> int:
return self._hash
Expand Down
7 changes: 7 additions & 0 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@
from distributed.nanny import Nanny
from distributed.node import ServerNode
from distributed.proctitle import enable_proctitle_on_children
from distributed.profile import wait_profiler
from distributed.protocol import deserialize
from distributed.scheduler import TaskState
from distributed.security import Security
from distributed.utils import (
DequeHandler,
Expand Down Expand Up @@ -1793,6 +1795,11 @@ def check_instances():

_global_clients.clear()

wait_profiler()
gc.collect()

assert not TaskState._instances

for w in Worker._instances:
with suppress(RuntimeError): # closed IOLoop
w.loop.add_callback(w.close, report=False, executor_wait=False)
Expand Down

0 comments on commit 97c862d

Please sign in to comment.