-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve type annotations for distributed.core.Server
#8239
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -43,6 +43,7 @@ | |||||
normalize_address, | ||||||
unparse_host_port, | ||||||
) | ||||||
from distributed.comm.core import Listener | ||||||
from distributed.compatibility import PeriodicCallback | ||||||
from distributed.counter import Counter | ||||||
from distributed.diskutils import WorkDir, WorkSpace | ||||||
|
@@ -64,6 +65,8 @@ | |||||
if TYPE_CHECKING: | ||||||
from typing_extensions import ParamSpec, Self | ||||||
|
||||||
from distributed.counter import Digest | ||||||
|
||||||
P = ParamSpec("P") | ||||||
R = TypeVar("R") | ||||||
T = TypeVar("T") | ||||||
|
@@ -315,12 +318,55 @@ class Server: | |||||
|
||||||
""" | ||||||
|
||||||
default_ip = "" | ||||||
default_port = 0 | ||||||
default_ip: ClassVar[str] = "" | ||||||
default_port: ClassVar[int] = 0 | ||||||
|
||||||
id: str | ||||||
blocked_handlers: list[str] | ||||||
handlers: dict[str, Callable] | ||||||
stream_handlers: dict[str, Callable] | ||||||
listeners: list[Listener] | ||||||
counters: defaultdict[str, Counter] | ||||||
deserialize: bool | ||||||
|
||||||
local_directory: str | ||||||
|
||||||
monitor: SystemMonitor | ||||||
io_loop: IOLoop | ||||||
thread_id: int | ||||||
|
||||||
periodic_callbacks: dict[str, PeriodicCallback] | ||||||
digests: defaultdict[Hashable, Digest] | None | ||||||
digests_total: defaultdict[Hashable, float] | ||||||
digests_total_since_heartbeat: defaultdict[Hashable, float] | ||||||
digests_max: defaultdict[Hashable, float] | ||||||
|
||||||
_last_tick: float | ||||||
_tick_counter: int | ||||||
_last_tick_counter: int | ||||||
_tick_interval: float | ||||||
_tick_interval_observed: float | ||||||
|
||||||
_status: Status | ||||||
|
||||||
_address: str | None | ||||||
_listen_address: str | None | ||||||
_host: str | None | ||||||
_port: int | None | ||||||
|
||||||
_comms: dict[Comm, str | None] | ||||||
|
||||||
_ongoing_background_tasks: AsyncTaskGroup | ||||||
_event_finished: asyncio.Event | ||||||
|
||||||
_original_local_dir: str | ||||||
_updated_sys_path: bool | ||||||
_workspace: WorkSpace | ||||||
_workdir: None | WorkDir | ||||||
|
||||||
_startup_lock: asyncio.Lock | ||||||
__startup_exc: Exception | None | ||||||
|
||||||
def __init__( | ||||||
self, | ||||||
handlers, | ||||||
|
@@ -673,13 +719,13 @@ def stop(self) -> None: | |||||
self.monitor.close() | ||||||
if not (stop_listeners := self._stop_listeners()).done(): | ||||||
self._ongoing_background_tasks.call_soon( | ||||||
asyncio.wait_for(stop_listeners, timeout=None) | ||||||
asyncio.wait_for(stop_listeners, timeout=None) # type: ignore[arg-type] | ||||||
) | ||||||
if self._workdir is not None: | ||||||
self._workdir.release() | ||||||
|
||||||
@property | ||||||
def listener(self): | ||||||
def listener(self) -> Listener | None: | ||||||
if self.listeners: | ||||||
return self.listeners[0] | ||||||
else: | ||||||
|
@@ -722,6 +768,7 @@ def address(self) -> str: | |||||
if self.listener is None: | ||||||
raise ValueError("cannot get address of non-running Server") | ||||||
self._address = self.listener.contact_address | ||||||
assert self._address | ||||||
return self._address | ||||||
|
||||||
@property | ||||||
|
@@ -784,7 +831,8 @@ def _to_dict(self, *, exclude: Container[str] = ()) -> dict: | |||||
Client.dump_cluster_state | ||||||
distributed.utils.recursive_to_dict | ||||||
""" | ||||||
info = self.identity() | ||||||
info: dict = {} | ||||||
info.update(self.identity()) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's more correct to change the return type to |
||||||
extra = { | ||||||
"address": self.address, | ||||||
"status": self.status.name, | ||||||
|
@@ -816,15 +864,15 @@ async def listen(self, port_or_addr=None, allow_offload=True, **kwargs): | |||||
) | ||||||
self.listeners.append(listener) | ||||||
|
||||||
def handle_comm(self, comm): | ||||||
def handle_comm(self, comm: Comm) -> NoOpAwaitable: | ||||||
"""Start a background task that dispatches new communications to coroutine-handlers""" | ||||||
try: | ||||||
self._ongoing_background_tasks.call_soon(self._handle_comm, comm) | ||||||
except AsyncTaskGroupClosedError: | ||||||
comm.abort() | ||||||
return NoOpAwaitable() | ||||||
|
||||||
async def _handle_comm(self, comm): | ||||||
async def _handle_comm(self, comm: Comm) -> None: | ||||||
"""Dispatch new communications to coroutine-handlers | ||||||
|
||||||
Handlers is a dictionary mapping operation names to functions or | ||||||
|
@@ -963,7 +1011,9 @@ async def _handle_comm(self, comm): | |||||
"Failed while closing connection to %r: %s", address, e | ||||||
) | ||||||
|
||||||
async def handle_stream(self, comm, extra=None): | ||||||
async def handle_stream( | ||||||
self, comm: Comm, extra: dict[str, str] | None = None | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
) -> None: | ||||||
extra = extra or {} | ||||||
logger.info("Starting established connection to %s", comm.peer_address) | ||||||
|
||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5863,7 +5863,7 @@ def worker_send(self, worker: str, msg: dict[str, Any]) -> None: | |
stream_comms[worker].send(msg) | ||
except (CommClosedError, AttributeError): | ||
self._ongoing_background_tasks.call_soon( | ||
self.remove_worker, | ||
self.remove_worker, # type: ignore[arg-type] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Type annotations on the |
||
address=worker, | ||
stimulus_id=f"worker-send-comm-fail-{time()}", | ||
) | ||
|
@@ -5909,7 +5909,7 @@ def send_all(self, client_msgs: Msgs, worker_msgs: Msgs) -> None: | |
pass | ||
except (CommClosedError, AttributeError): | ||
self._ongoing_background_tasks.call_soon( | ||
self.remove_worker, | ||
self.remove_worker, # type: ignore[arg-type] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Type annotations on the |
||
address=worker, | ||
stimulus_id=f"send-all-comm-fail-{time()}", | ||
) | ||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -1050,7 +1050,7 @@ async def get_metrics(self) -> dict: | |||
|
||||
self.digests_total_since_heartbeat.clear() | ||||
|
||||
out = dict( | ||||
out: dict = dict( | ||||
task_counts=self.state.task_counter.current_count(by_prefix=False), | ||||
bandwidth={ | ||||
"total": self.bandwidth, | ||||
|
@@ -1348,9 +1348,8 @@ async def gather(self, who_has: dict[Key, list[str]]) -> dict[Key, object]: | |||
else: | ||||
return {"status": "OK"} | ||||
|
||||
def get_monitor_info( | ||||
self, recent: bool = False, start: int = 0 | ||||
) -> dict[str, float]: | ||||
# FIXME: Improve typing | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I think that precisely specifying the possible types of the return dict values would actually do harm. |
||||
def get_monitor_info(self, recent: bool = False, start: int = 0) -> dict[str, Any]: | ||||
result = dict( | ||||
range_query=( | ||||
self.monitor.recent() | ||||
|
@@ -2457,7 +2456,7 @@ async def get_profile( | |||
): | ||||
now = time() + self.scheduler_delay | ||||
if server: | ||||
history = self.io_loop.profile | ||||
history = self.io_loop.profile # type: ignore[attr-defined] | ||||
elif key is None: | ||||
history = self.profile_history | ||||
else: | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type annotations on the
AsyncTaskGroup
are too narrow. This should be addressed in a follow-up PR.