From 1afcfd7a35c148e262d737085e6e6684346a0d88 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 10 Dec 2021 03:14:26 +0000 Subject: [PATCH 01/16] Add `defer.gatherResults` wrapper for heterogenous lists of `Deferred`s --- synapse/util/async_helpers.py | 55 +++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 20ce294209ad..a98540bc7d1f 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -30,9 +30,11 @@ Iterator, Optional, Set, + Tuple, TypeVar, Union, cast, + overload, ) import attr @@ -234,6 +236,59 @@ def yieldable_gather_results( ).addErrback(unwrapFirstError) +T1 = TypeVar("T1") +T2 = TypeVar("T2") +T3 = TypeVar("T3") + + +@overload +def gather_results( + deferredList: Tuple[()], consumeErrors: bool = ... +) -> "defer.Deferred[Tuple[()]]": + ... + + +@overload +def gather_results( + deferredList: Tuple["defer.Deferred[T1]"], + consumeErrors: bool = ..., +) -> "defer.Deferred[Tuple[T1]]": + ... + + +@overload +def gather_results( + deferredList: Tuple["defer.Deferred[T1]", "defer.Deferred[T2]"], + consumeErrors: bool = ..., +) -> "defer.Deferred[Tuple[T1, T2]]": + ... + + +@overload +def gather_results( + deferredList: Tuple[ + "defer.Deferred[T1]", "defer.Deferred[T2]", "defer.Deferred[T3]" + ], + consumeErrors: bool = ..., +) -> "defer.Deferred[Tuple[T1, T2, T3]]": + ... + + +def gather_results( # type: ignore[misc] + deferredList: Tuple["defer.Deferred[T1]", ...], + consumeErrors: bool = False, +) -> "defer.Deferred[Tuple[T1, ...]]": + """Combines a tuple of `Deferred`s into a single `Deferred`. + + Wraps `defer.gatherResults` to provide type annotations that support heterogenous + lists of `Deferred`s. + """ + # The `type: ignore[misc]` above suppresses + # "Overloaded function implementation cannot produce return type of signature 1/2/3" + deferred = defer.gatherResults(deferredList, consumeErrors=consumeErrors) + return deferred.addCallback(tuple) + + @attr.s(slots=True) class _LinearizerEntry: # The number of things executing. From d6dadd9791c288df83c3f65939aa07b648cb8ace Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 10 Dec 2021 02:19:37 +0000 Subject: [PATCH 02/16] Use `gather_results` where appropriate --- synapse/federation/federation_server.py | 9 ++++----- synapse/handlers/initial_sync.py | 16 +++++++--------- synapse/handlers/message.py | 13 ++++++------- 3 files changed, 17 insertions(+), 21 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 8e37e76206ac..cf067b56c6b4 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -30,7 +30,6 @@ from prometheus_client import Counter, Gauge, Histogram -from twisted.internet import defer from twisted.internet.abstract import isIPAddress from twisted.python import failure @@ -67,7 +66,7 @@ from synapse.storage.databases.main.lock import Lock from synapse.types import JsonDict, get_domain_from_id from synapse.util import glob_to_regex, json_decoder, unwrapFirstError -from synapse.util.async_helpers import Linearizer, concurrently_execute +from synapse.util.async_helpers import Linearizer, concurrently_execute, gather_results from synapse.util.caches.response_cache import ResponseCache from synapse.util.stringutils import parse_server_name @@ -360,13 +359,13 @@ async def _handle_incoming_transaction( # want to block things like to device messages from reaching clients # behind the potentially expensive handling of PDUs. pdu_results, _ = await make_deferred_yieldable( - defer.gatherResults( - [ + gather_results( + ( run_in_background( self._handle_pdus_in_txn, origin, transaction, request_time ), run_in_background(self._handle_edus_in_txn, origin, transaction), - ], + ), consumeErrors=True, ).addErrback(unwrapFirstError) ) diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 9cd21e7f2b3c..c5405e335867 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -15,8 +15,6 @@ import logging from typing import TYPE_CHECKING, List, Optional, Tuple -from twisted.internet import defer - from synapse.api.constants import EduTypes, EventTypes, Membership from synapse.api.errors import SynapseError from synapse.events.validator import EventValidator @@ -27,7 +25,7 @@ from synapse.streams.config import PaginationConfig from synapse.types import JsonDict, Requester, RoomStreamToken, StreamToken, UserID from synapse.util import unwrapFirstError -from synapse.util.async_helpers import concurrently_execute +from synapse.util.async_helpers import concurrently_execute, gather_results from synapse.util.caches.response_cache import ResponseCache from synapse.visibility import filter_events_for_client @@ -196,8 +194,8 @@ async def handle_room(event: RoomsForUser) -> None: ) (messages, token), current_state = await make_deferred_yieldable( - defer.gatherResults( - [ + gather_results( + ( run_in_background( self.store.get_recent_events_for_room, event.room_id, @@ -205,7 +203,7 @@ async def handle_room(event: RoomsForUser) -> None: end_token=room_end_token, ), deferred_room_state, - ] + ) ) ).addErrback(unwrapFirstError) @@ -454,8 +452,8 @@ async def get_receipts() -> List[JsonDict]: return receipts presence, receipts, (messages, token) = await make_deferred_yieldable( - defer.gatherResults( - [ + gather_results( + ( run_in_background(get_presence), run_in_background(get_receipts), run_in_background( @@ -464,7 +462,7 @@ async def get_receipts() -> List[JsonDict]: limit=limit, end_token=now_token.room_key, ), - ], + ), consumeErrors=True, ).addErrback(unwrapFirstError) ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 87f671708c4e..05998e30ad8d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -21,7 +21,6 @@ from canonicaljson import encode_canonical_json -from twisted.internet import defer from twisted.internet.interfaces import IDelayedCall from synapse import event_auth @@ -57,7 +56,7 @@ from synapse.storage.state import StateFilter from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester from synapse.util import json_decoder, json_encoder, log_failure -from synapse.util.async_helpers import Linearizer, unwrapFirstError +from synapse.util.async_helpers import Linearizer, gather_results, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import measure_func from synapse.visibility import filter_events_for_client @@ -1156,9 +1155,9 @@ async def handle_new_client_event( # We now persist the event (and update the cache in parallel, since we # don't want to block on it). - result = await make_deferred_yieldable( - defer.gatherResults( - [ + result, _ = await make_deferred_yieldable( + gather_results( + ( run_in_background( self._persist_event, requester=requester, @@ -1170,12 +1169,12 @@ async def handle_new_client_event( run_in_background( self.cache_joined_hosts_for_event, event, context ).addErrback(log_failure, "cache_joined_hosts_for_event failed"), - ], + ), consumeErrors=True, ) ).addErrback(unwrapFirstError) - return result[0] + return result async def _persist_event( self, From 2f19d0cdb11e217ebaa1f562d99f6ec97e5205a4 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 10 Dec 2021 03:17:52 +0000 Subject: [PATCH 03/16] Add type hints for `make_deferred_yieldable` --- synapse/logging/context.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/logging/context.py b/synapse/logging/context.py index d8ae3188b7da..8c52cc84abd1 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -778,7 +778,10 @@ def run_in_background(f, *args, **kwargs) -> defer.Deferred: return res -def make_deferred_yieldable(deferred): +T = TypeVar("T") + + +def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]": """Given a deferred (or coroutine), make it follow the Synapse logcontext rules: From 262e91d65ad4b62cb5036f2037c55030f7a21092 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 10 Dec 2021 02:55:52 +0000 Subject: [PATCH 04/16] Fix fallout from `make_deferred_yieldable` type hints --- synapse/handlers/federation.py | 4 ++-- synapse/http/federation/matrix_federation_agent.py | 7 +++++-- synapse/util/async_helpers.py | 2 +- synapse/util/file_consumer.py | 1 + 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 1ea837d08211..ef2992d3fdfd 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -360,7 +360,7 @@ async def try_backfill(domains: List[str]) -> bool: logger.debug("calling resolve_state_groups in _maybe_backfill") resolve = preserve_fn(self.state_handler.resolve_state_groups_for_events) - states = await make_deferred_yieldable( + states_list = await make_deferred_yieldable( defer.gatherResults( [resolve(room_id, [e]) for e in event_ids], consumeErrors=True ) @@ -368,7 +368,7 @@ async def try_backfill(domains: List[str]) -> bool: # dict[str, dict[tuple, str]], a map from event_id to state map of # event_ids. - states = dict(zip(event_ids, [s.state for s in states])) + states = dict(zip(event_ids, [s.state for s in states_list])) state_map = await self.store.get_events( [e_id for ids in states.values() for e_id in ids.values()], diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index 1238bfd28726..a8a520f80944 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -25,6 +25,7 @@ from twisted.internet import defer from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS from twisted.internet.interfaces import ( + IProtocol, IProtocolFactory, IReactorCore, IStreamClientEndpoint, @@ -309,12 +310,14 @@ def __init__( self._srv_resolver = srv_resolver - def connect(self, protocol_factory: IProtocolFactory) -> defer.Deferred: + def connect( + self, protocol_factory: IProtocolFactory + ) -> "defer.Deferred[IProtocol]": """Implements IStreamClientEndpoint interface""" return run_in_background(self._do_connect, protocol_factory) - async def _do_connect(self, protocol_factory: IProtocolFactory) -> None: + async def _do_connect(self, protocol_factory: IProtocolFactory) -> IProtocol: first_exception = None server_list = await self._resolve_server() diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index a98540bc7d1f..bde99ea8787b 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -407,7 +407,7 @@ def _await_lock(self, key: Hashable) -> defer.Deferred: logger.debug("Waiting to acquire linearizer lock %r for key %r", self.name, key) - new_defer = make_deferred_yieldable(defer.Deferred()) + new_defer: "defer.Deferred[None]" = make_deferred_yieldable(defer.Deferred()) entry.deferreds[new_defer] = 1 def cb(_r: None) -> "defer.Deferred[None]": diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index de2adacd70dc..46771a401b50 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -142,6 +142,7 @@ def _writer(self) -> None: def wait(self) -> "Deferred[None]": """Returns a deferred that resolves when finished writing to file""" + assert self._finished_deferred is not None return make_deferred_yieldable(self._finished_deferred) def _resume_paused_producer(self) -> None: From 40a48a08a8c91dbb6d60728a62cc28c982984b63 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 10 Dec 2021 03:05:32 +0000 Subject: [PATCH 05/16] Correct txredisapi stubs txredisapi is based on Twisted and uses `Deferred`s instead of async functions. --- stubs/txredisapi.pyi | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/stubs/txredisapi.pyi b/stubs/txredisapi.pyi index 4ff3c6de5feb..429234d7ae7f 100644 --- a/stubs/txredisapi.pyi +++ b/stubs/txredisapi.pyi @@ -17,11 +17,12 @@ from typing import Any, List, Optional, Type, Union from twisted.internet import protocol +from twisted.internet.defer import Deferred class RedisProtocol(protocol.Protocol): def publish(self, channel: str, message: bytes): ... - async def ping(self) -> None: ... - async def set( + def ping(self) -> "Deferred[None]": ... + def set( self, key: str, value: Any, @@ -29,8 +30,8 @@ class RedisProtocol(protocol.Protocol): pexpire: Optional[int] = None, only_if_not_exists: bool = False, only_if_exists: bool = False, - ) -> None: ... - async def get(self, key: str) -> Any: ... + ) -> "Deferred[None]": ... + def get(self, key: str) -> "Deferred[Any]": ... class SubscriberProtocol(RedisProtocol): def __init__(self, *args, **kwargs): ... From 498bcd77fdefb28c71b332546c4ee2662164498f Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 10 Dec 2021 03:30:14 +0000 Subject: [PATCH 06/16] Add type hints for `run_in_background` --- synapse/logging/context.py | 43 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 8c52cc84abd1..e2881c3e28d3 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -27,7 +27,17 @@ import threading import typing import warnings -from typing import TYPE_CHECKING, Optional, Tuple, TypeVar, Union +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Callable, + Optional, + Tuple, + TypeVar, + Union, + overload, +) import attr from typing_extensions import Literal @@ -711,6 +721,9 @@ def nested_logging_context(suffix: str) -> LoggingContext: ) +R = TypeVar("R") + + def preserve_fn(f): """Function decorator which wraps the function with run_in_background""" @@ -720,7 +733,30 @@ def g(*args, **kwargs): return g -def run_in_background(f, *args, **kwargs) -> defer.Deferred: +@overload +def run_in_background( # type: ignore[misc] + f: Callable[..., Awaitable[R]], *args: Any, **kwargs: Any +) -> "defer.Deferred[R]": + # The `type: ignore[misc]` above suppresses + # "Overloaded function signatures 1 and 2 overlap with incompatible return types" + ... + + +@overload +def run_in_background( + f: Callable[..., R], *args: Any, **kwargs: Any +) -> "defer.Deferred[R]": + ... + + +def run_in_background( + f: Union[ + Callable[..., R], + Callable[..., Awaitable[R]], + ], + *args: Any, + **kwargs: Any, +) -> "defer.Deferred[R]": """Calls a function, ensuring that the current context is restored after return from the function, and that the sentinel context is set once the deferred returned by the function completes. @@ -751,6 +787,9 @@ def run_in_background(f, *args, **kwargs) -> defer.Deferred: # At this point we should have a Deferred, if not then f was a synchronous # function, wrap it in a Deferred for consistency. if not isinstance(res, defer.Deferred): + # All `Awaitable`s in Synapse are either coroutines or `Deferred`s + assert not isinstance(res, Awaitable) + return defer.succeed(res) if res.called and not res.paused: From 6d28491594940ede3f1f431b2791096a65690343 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 10 Dec 2021 03:32:18 +0000 Subject: [PATCH 07/16] Fix fallout from `run_in_background` type hints --- synapse/handlers/initial_sync.py | 17 ++++++++++++----- synapse/util/caches/cached_call.py | 1 + 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index c5405e335867..9ab723ff975e 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -13,17 +13,25 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, List, Optional, Tuple +from typing import TYPE_CHECKING, List, Optional, Tuple, cast from synapse.api.constants import EduTypes, EventTypes, Membership from synapse.api.errors import SynapseError +from synapse.events import EventBase from synapse.events.validator import EventValidator from synapse.handlers.presence import format_user_presence_state from synapse.handlers.receipts import ReceiptEventSource from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.storage.roommember import RoomsForUser from synapse.streams.config import PaginationConfig -from synapse.types import JsonDict, Requester, RoomStreamToken, StreamToken, UserID +from synapse.types import ( + JsonDict, + Requester, + RoomStreamToken, + StateMap, + StreamToken, + UserID, +) from synapse.util import unwrapFirstError from synapse.util.async_helpers import concurrently_execute, gather_results from synapse.util.caches.response_cache import ResponseCache @@ -188,9 +196,8 @@ async def handle_room(event: RoomsForUser) -> None: ) deferred_room_state = run_in_background( self.state_store.get_state_for_events, [event.event_id] - ) - deferred_room_state.addCallback( - lambda states: states[event.event_id] + ).addCallback( + lambda states: cast(StateMap[EventBase], states[event.event_id]) ) (messages, token), current_state = await make_deferred_yieldable( diff --git a/synapse/util/caches/cached_call.py b/synapse/util/caches/cached_call.py index 470f4f91a59b..e325f44da328 100644 --- a/synapse/util/caches/cached_call.py +++ b/synapse/util/caches/cached_call.py @@ -76,6 +76,7 @@ async def get(self) -> TV: # Fire off the callable now if this is our first time if not self._deferred: + assert self._callable is not None self._deferred = run_in_background(self._callable) # we will never need the callable again, so make sure it can be GCed From a4eaf2893da2c8342413cc5bf1e894b2e5c0f17f Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 10 Dec 2021 03:39:53 +0000 Subject: [PATCH 08/16] Add type hints for `defer_to_thread` and `defer_to_threadpool` --- synapse/logging/context.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/synapse/logging/context.py b/synapse/logging/context.py index e2881c3e28d3..1a0d57fa9a9b 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -44,6 +44,8 @@ from twisted.internet import defer, threads +from synapse.types import ISynapseReactor + if TYPE_CHECKING: from synapse.logging.scopecontextmanager import _LogContextScope @@ -865,7 +867,9 @@ def _set_context_cb(result: ResultT, context: LoggingContext) -> ResultT: return result -def defer_to_thread(reactor, f, *args, **kwargs): +def defer_to_thread( + reactor: ISynapseReactor, f: Callable[..., R], *args: Any, **kwargs: Any +) -> "defer.Deferred[R]": """ Calls the function `f` using a thread from the reactor's default threadpool and returns the result as a Deferred. @@ -897,7 +901,9 @@ def defer_to_thread(reactor, f, *args, **kwargs): return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs) -def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs): +def defer_to_threadpool( + reactor: ISynapseReactor, threadpool, f: Callable[..., R], *args: Any, **kwargs: Any +) -> "defer.Deferred[R]": """ A wrapper for twisted.internet.threads.deferToThreadpool, which handles logcontexts correctly. @@ -939,7 +945,7 @@ def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs): assert isinstance(curr_context, LoggingContext) parent_context = curr_context - def g(): + def g() -> R: with LoggingContext(str(curr_context), parent_context=parent_context): return f(*args, **kwargs) From 3c8e9d5967e195d9ee3adbcf2506c0c244c44e3d Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 10 Dec 2021 03:55:31 +0000 Subject: [PATCH 09/16] Add type hints for `preserve_fn` --- synapse/logging/context.py | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 1a0d57fa9a9b..6485dc20fd5d 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -726,10 +726,29 @@ def nested_logging_context(suffix: str) -> LoggingContext: R = TypeVar("R") -def preserve_fn(f): +@overload +def preserve_fn( # type: ignore[misc] + f: Callable[..., Awaitable[R]], +) -> Callable[..., "defer.Deferred[R]"]: + # The `type: ignore[misc]` above suppresses + # "Overloaded function signatures 1 and 2 overlap with incompatible return types" + ... + + +@overload +def preserve_fn(f: Callable[..., R]) -> Callable[..., "defer.Deferred[R]"]: + ... + + +def preserve_fn( + f: Union[ + Callable[..., R], + Callable[..., Awaitable[R]], + ] +) -> Callable[..., "defer.Deferred[R]"]: """Function decorator which wraps the function with run_in_background""" - def g(*args, **kwargs): + def g(*args: Any, **kwargs: Any) -> "defer.Deferred[R]": return run_in_background(f, *args, **kwargs) return g From c3052271cec401a427e3b1841da441c9323520d5 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 10 Dec 2021 03:56:41 +0000 Subject: [PATCH 10/16] Fix fallout from `preserve_fn` type hints --- synapse/handlers/federation.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ef2992d3fdfd..26b8e3f43c40 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -366,25 +366,28 @@ async def try_backfill(domains: List[str]) -> bool: ) ) - # dict[str, dict[tuple, str]], a map from event_id to state map of - # event_ids. - states = dict(zip(event_ids, [s.state for s in states_list])) + # A map from event_id to state map of event_ids. + state_ids: Dict[str, StateMap[str]] = dict( + zip(event_ids, [s.state for s in states_list]) + ) state_map = await self.store.get_events( - [e_id for ids in states.values() for e_id in ids.values()], + [e_id for ids in state_ids.values() for e_id in ids.values()], get_prev_content=False, ) - states = { + + # A map from event_id to state map of events. + state_events: Dict[str, StateMap[EventBase]] = { key: { k: state_map[e_id] for k, e_id in state_dict.items() if e_id in state_map } - for key, state_dict in states.items() + for key, state_dict in state_ids.items() } for e_id in event_ids: - likely_extremeties_domains = get_domains_from_state(states[e_id]) + likely_extremeties_domains = get_domains_from_state(state_events[e_id]) success = await try_backfill( [ From d32d27bc51edabc1609bd48413d21a85d1da80a8 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 10 Dec 2021 10:54:51 +0000 Subject: [PATCH 11/16] Add newsfile --- changelog.d/11556.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/11556.misc diff --git a/changelog.d/11556.misc b/changelog.d/11556.misc new file mode 100644 index 000000000000..53b26aa676b4 --- /dev/null +++ b/changelog.d/11556.misc @@ -0,0 +1 @@ +Add missing type hints to `synapse.logging.context`. From 38c73904c8f94de3a7f9d726f95ed27f91e0efbe Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 10 Dec 2021 11:05:02 +0000 Subject: [PATCH 12/16] Fix cyclic import --- synapse/logging/context.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 6485dc20fd5d..bfd63130bab4 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -44,10 +44,9 @@ from twisted.internet import defer, threads -from synapse.types import ISynapseReactor - if TYPE_CHECKING: from synapse.logging.scopecontextmanager import _LogContextScope + from synapse.types import ISynapseReactor logger = logging.getLogger(__name__) @@ -887,7 +886,7 @@ def _set_context_cb(result: ResultT, context: LoggingContext) -> ResultT: def defer_to_thread( - reactor: ISynapseReactor, f: Callable[..., R], *args: Any, **kwargs: Any + reactor: "ISynapseReactor", f: Callable[..., R], *args: Any, **kwargs: Any ) -> "defer.Deferred[R]": """ Calls the function `f` using a thread from the reactor's default threadpool and @@ -921,7 +920,11 @@ def defer_to_thread( def defer_to_threadpool( - reactor: ISynapseReactor, threadpool, f: Callable[..., R], *args: Any, **kwargs: Any + reactor: "ISynapseReactor", + threadpool, + f: Callable[..., R], + *args: Any, + **kwargs: Any, ) -> "defer.Deferred[R]": """ A wrapper for twisted.internet.threads.deferToThreadpool, which handles From 4768efc199b5f32ca5559bc4cc824402b5ceaf31 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 10 Dec 2021 17:01:08 +0000 Subject: [PATCH 13/16] Add missing type hints --- mypy.ini | 3 +++ synapse/logging/context.py | 45 +++++++++++++++++++------------------- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/mypy.ini b/mypy.ini index 1caf807e8505..e924b69f768c 100644 --- a/mypy.ini +++ b/mypy.ini @@ -163,6 +163,9 @@ disallow_untyped_defs = False [mypy-synapse.handlers.*] disallow_untyped_defs = True +[mypy-synapse.logging.context] +disallow_untyped_defs = True + [mypy-synapse.metrics.*] disallow_untyped_defs = True diff --git a/synapse/logging/context.py b/synapse/logging/context.py index bfd63130bab4..f71811b1aa87 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -27,6 +27,7 @@ import threading import typing import warnings +from types import TracebackType from typing import ( TYPE_CHECKING, Any, @@ -34,6 +35,7 @@ Callable, Optional, Tuple, + Type, TypeVar, Union, overload, @@ -43,6 +45,7 @@ from typing_extensions import Literal from twisted.internet import defer, threads +from twisted.python.threadpool import ThreadPool if TYPE_CHECKING: from synapse.logging.scopecontextmanager import _LogContextScope @@ -77,7 +80,7 @@ def get_thread_resource_usage() -> "Optional[resource.struct_rusage]": # a hook which can be set during testing to assert that we aren't abusing logcontexts. -def logcontext_error(msg: str): +def logcontext_error(msg: str) -> None: logger.warning(msg) @@ -234,22 +237,19 @@ def __init__(self) -> None: def __str__(self) -> str: return "sentinel" - def copy_to(self, record): - pass - - def start(self, rusage: "Optional[resource.struct_rusage]"): + def start(self, rusage: "Optional[resource.struct_rusage]") -> None: pass - def stop(self, rusage: "Optional[resource.struct_rusage]"): + def stop(self, rusage: "Optional[resource.struct_rusage]") -> None: pass - def add_database_transaction(self, duration_sec): + def add_database_transaction(self, duration_sec: float) -> None: pass - def add_database_scheduled(self, sched_sec): + def add_database_scheduled(self, sched_sec: float) -> None: pass - def record_event_fetch(self, event_count): + def record_event_fetch(self, event_count: int) -> None: pass def __bool__(self) -> Literal[False]: @@ -390,7 +390,12 @@ def __enter__(self) -> "LoggingContext": ) return self - def __exit__(self, type, value, traceback) -> None: + def __exit__( + self, + type: Optional[Type[BaseException]], + value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: """Restore the logging context in thread local storage to the state it was before this context was entered. Returns: @@ -410,17 +415,6 @@ def __exit__(self, type, value, traceback) -> None: # recorded against the correct metrics. self.finished = True - def copy_to(self, record) -> None: - """Copy logging fields from this context to a log record or - another LoggingContext - """ - - # we track the current request - record.request = self.request - - # we also track the current scope: - record.scope = self.scope - def start(self, rusage: "Optional[resource.struct_rusage]") -> None: """ Record that this logcontext is currently running. @@ -637,7 +631,12 @@ def __init__( def __enter__(self) -> None: self._old_context = set_current_context(self._new_context) - def __exit__(self, type, value, traceback) -> None: + def __exit__( + self, + type: Optional[Type[BaseException]], + value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: context = set_current_context(self._old_context) if context != self._new_context: @@ -921,7 +920,7 @@ def defer_to_thread( def defer_to_threadpool( reactor: "ISynapseReactor", - threadpool, + threadpool: ThreadPool, f: Callable[..., R], *args: Any, **kwargs: Any, From 5cbf3c32571c41ad3dfbaddbfbfcc62a326f0174 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 10 Dec 2021 18:16:52 +0000 Subject: [PATCH 14/16] Reword comment on assert --- synapse/logging/context.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/logging/context.py b/synapse/logging/context.py index f71811b1aa87..7991e03c9dad 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -806,7 +806,8 @@ def run_in_background( # At this point we should have a Deferred, if not then f was a synchronous # function, wrap it in a Deferred for consistency. if not isinstance(res, defer.Deferred): - # All `Awaitable`s in Synapse are either coroutines or `Deferred`s + # `res` is not a `Deferred` and not a `Coroutine`. + # There are no other types of `Awaitable`s we expect to encounter in Synapse. assert not isinstance(res, Awaitable) return defer.succeed(res) From 18867504205c7d0d882b963045b4808af56c787a Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 10 Dec 2021 18:17:12 +0000 Subject: [PATCH 15/16] Simplify `make_deferred_yieldable` to only support `Deferred`s --- synapse/logging/context.py | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 7991e03c9dad..25e78cc82fcd 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -22,7 +22,6 @@ See doc/log_contexts.rst for details on how this works. """ -import inspect import logging import threading import typing @@ -841,12 +840,10 @@ def run_in_background( def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]": - """Given a deferred (or coroutine), make it follow the Synapse logcontext - rules: + """Given a deferred, make it follow the Synapse logcontext rules: - If the deferred has completed (or is not actually a Deferred), essentially - does nothing (just returns another completed deferred with the - result/failure). + If the deferred has completed, essentially does nothing (just returns another + completed deferred with the result/failure). If the deferred has not yet completed, resets the logcontext before returning a deferred. Then, when the deferred completes, restores the @@ -854,16 +851,6 @@ def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T] (This is more-or-less the opposite operation to run_in_background.) """ - if inspect.isawaitable(deferred): - # If we're given a coroutine we convert it to a deferred so that we - # run it and find out if it immediately finishes, it it does then we - # don't need to fiddle with log contexts at all and can return - # immediately. - deferred = defer.ensureDeferred(deferred) - - if not isinstance(deferred, defer.Deferred): - return deferred - if deferred.called and not deferred.paused: # it looks like this deferred is ready to run any callbacks we give it # immediately. We may as well optimise out the logcontext faffery. From 3f9a04b97446edd10a6896ac1af884edbe6410b6 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 10 Dec 2021 18:35:17 +0000 Subject: [PATCH 16/16] Remove tests for now unsupported `make_deferred_yieldable` features --- tests/util/test_logcontext.py | 35 ----------------------------------- 1 file changed, 35 deletions(-) diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index 5d9c4665aa58..621b0f9fcdf0 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -152,46 +152,11 @@ def test_make_deferred_yieldable_with_chained_deferreds(self): # now it should be restored self._check_test_key("one") - @defer.inlineCallbacks - def test_make_deferred_yieldable_on_non_deferred(self): - """Check that make_deferred_yieldable does the right thing when its - argument isn't actually a deferred""" - - with LoggingContext("one"): - d1 = make_deferred_yieldable("bum") - self._check_test_key("one") - - r = yield d1 - self.assertEqual(r, "bum") - self._check_test_key("one") - def test_nested_logging_context(self): with LoggingContext("foo"): nested_context = nested_logging_context(suffix="bar") self.assertEqual(nested_context.name, "foo-bar") - @defer.inlineCallbacks - def test_make_deferred_yieldable_with_await(self): - # an async function which returns an incomplete coroutine, but doesn't - # follow the synapse rules. - - async def blocking_function(): - d = defer.Deferred() - reactor.callLater(0, d.callback, None) - await d - - sentinel_context = current_context() - - with LoggingContext("one"): - d1 = make_deferred_yieldable(blocking_function()) - # make sure that the context was reset by make_deferred_yieldable - self.assertIs(current_context(), sentinel_context) - - yield d1 - - # now it should be restored - self._check_test_key("one") - # a function which returns a deferred which has been "called", but # which had a function which returned another incomplete deferred on