Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add missing type hints to synapse.logging.context #11556

Merged
merged 17 commits into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11556.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add missing type hints to `synapse.logging.context`.
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ disallow_untyped_defs = False
[mypy-synapse.handlers.*]
disallow_untyped_defs = True

[mypy-synapse.logging.context]
disallow_untyped_defs = True

[mypy-synapse.metrics.*]
disallow_untyped_defs = True

Expand Down
9 changes: 5 additions & 4 deletions stubs/txredisapi.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@
from typing import Any, List, Optional, Type, Union

from twisted.internet import protocol
from twisted.internet.defer import Deferred

class RedisProtocol(protocol.Protocol):
def publish(self, channel: str, message: bytes): ...
async def ping(self) -> None: ...
async def set(
def ping(self) -> "Deferred[None]": ...
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
def set(
self,
key: str,
value: Any,
expire: Optional[int] = None,
pexpire: Optional[int] = None,
only_if_not_exists: bool = False,
only_if_exists: bool = False,
) -> None: ...
async def get(self, key: str) -> Any: ...
) -> "Deferred[None]": ...
def get(self, key: str) -> "Deferred[Any]": ...

class SubscriberProtocol(RedisProtocol):
def __init__(self, *args, **kwargs): ...
Expand Down
9 changes: 4 additions & 5 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

from prometheus_client import Counter, Gauge, Histogram

from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
from twisted.python import failure

Expand Down Expand Up @@ -67,7 +66,7 @@
from synapse.storage.databases.main.lock import Lock
from synapse.types import JsonDict, get_domain_from_id
from synapse.util import glob_to_regex, json_decoder, unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.async_helpers import Linearizer, concurrently_execute, gather_results
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import parse_server_name

Expand Down Expand Up @@ -360,13 +359,13 @@ async def _handle_incoming_transaction(
# want to block things like to device messages from reaching clients
# behind the potentially expensive handling of PDUs.
pdu_results, _ = await make_deferred_yieldable(
defer.gatherResults(
[
gather_results(
(
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
run_in_background(
self._handle_pdus_in_txn, origin, transaction, request_time
),
run_in_background(self._handle_edus_in_txn, origin, transaction),
],
),
consumeErrors=True,
).addErrback(unwrapFirstError)
)
Expand Down
19 changes: 11 additions & 8 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,31 +360,34 @@ async def try_backfill(domains: List[str]) -> bool:

logger.debug("calling resolve_state_groups in _maybe_backfill")
resolve = preserve_fn(self.state_handler.resolve_state_groups_for_events)
states = await make_deferred_yieldable(
states_list = await make_deferred_yieldable(
defer.gatherResults(
[resolve(room_id, [e]) for e in event_ids], consumeErrors=True
)
)

# dict[str, dict[tuple, str]], a map from event_id to state map of
# event_ids.
states = dict(zip(event_ids, [s.state for s in states]))
# A map from event_id to state map of event_ids.
state_ids: Dict[str, StateMap[str]] = dict(
zip(event_ids, [s.state for s in states_list])
)

state_map = await self.store.get_events(
[e_id for ids in states.values() for e_id in ids.values()],
[e_id for ids in state_ids.values() for e_id in ids.values()],
get_prev_content=False,
)
states = {

# A map from event_id to state map of events.
state_events: Dict[str, StateMap[EventBase]] = {
key: {
k: state_map[e_id]
for k, e_id in state_dict.items()
if e_id in state_map
}
for key, state_dict in states.items()
for key, state_dict in state_ids.items()
}

for e_id in event_ids:
likely_extremeties_domains = get_domains_from_state(states[e_id])
likely_extremeties_domains = get_domains_from_state(state_events[e_id])

success = await try_backfill(
[
Expand Down
33 changes: 19 additions & 14 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,27 @@
# limitations under the License.

import logging
from typing import TYPE_CHECKING, List, Optional, Tuple

from twisted.internet import defer
from typing import TYPE_CHECKING, List, Optional, Tuple, cast

from synapse.api.constants import EduTypes, EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.events import EventBase
from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
from synapse.handlers.receipts import ReceiptEventSource
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.storage.roommember import RoomsForUser
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester, RoomStreamToken, StreamToken, UserID
from synapse.types import (
JsonDict,
Requester,
RoomStreamToken,
StateMap,
StreamToken,
UserID,
)
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import concurrently_execute
from synapse.util.async_helpers import concurrently_execute, gather_results
from synapse.util.caches.response_cache import ResponseCache
from synapse.visibility import filter_events_for_client

Expand Down Expand Up @@ -190,22 +196,21 @@ async def handle_room(event: RoomsForUser) -> None:
)
deferred_room_state = run_in_background(
self.state_store.get_state_for_events, [event.event_id]
)
deferred_room_state.addCallback(
lambda states: states[event.event_id]
).addCallback(
lambda states: cast(StateMap[EventBase], states[event.event_id])
)

(messages, token), current_state = await make_deferred_yieldable(
defer.gatherResults(
[
gather_results(
(
run_in_background(
self.store.get_recent_events_for_room,
event.room_id,
limit=limit,
end_token=room_end_token,
),
deferred_room_state,
]
)
)
).addErrback(unwrapFirstError)

Expand Down Expand Up @@ -454,8 +459,8 @@ async def get_receipts() -> List[JsonDict]:
return receipts

presence, receipts, (messages, token) = await make_deferred_yieldable(
defer.gatherResults(
[
gather_results(
(
run_in_background(get_presence),
run_in_background(get_receipts),
run_in_background(
Expand All @@ -464,7 +469,7 @@ async def get_receipts() -> List[JsonDict]:
limit=limit,
end_token=now_token.room_key,
),
],
),
consumeErrors=True,
).addErrback(unwrapFirstError)
)
Expand Down
13 changes: 6 additions & 7 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

from canonicaljson import encode_canonical_json

from twisted.internet import defer
from twisted.internet.interfaces import IDelayedCall

from synapse import event_auth
Expand Down Expand Up @@ -57,7 +56,7 @@
from synapse.storage.state import StateFilter
from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
from synapse.util import json_decoder, json_encoder, log_failure
from synapse.util.async_helpers import Linearizer, unwrapFirstError
from synapse.util.async_helpers import Linearizer, gather_results, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import measure_func
from synapse.visibility import filter_events_for_client
Expand Down Expand Up @@ -1156,9 +1155,9 @@ async def handle_new_client_event(

# We now persist the event (and update the cache in parallel, since we
# don't want to block on it).
result = await make_deferred_yieldable(
defer.gatherResults(
[
result, _ = await make_deferred_yieldable(
gather_results(
(
run_in_background(
self._persist_event,
requester=requester,
Expand All @@ -1170,12 +1169,12 @@ async def handle_new_client_event(
run_in_background(
self.cache_joined_hosts_for_event, event, context
).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
],
),
consumeErrors=True,
)
).addErrback(unwrapFirstError)

return result[0]
return result

async def _persist_event(
self,
Expand Down
7 changes: 5 additions & 2 deletions synapse/http/federation/matrix_federation_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from twisted.internet import defer
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.internet.interfaces import (
IProtocol,
IProtocolFactory,
IReactorCore,
IStreamClientEndpoint,
Expand Down Expand Up @@ -309,12 +310,14 @@ def __init__(

self._srv_resolver = srv_resolver

def connect(self, protocol_factory: IProtocolFactory) -> defer.Deferred:
def connect(
self, protocol_factory: IProtocolFactory
) -> "defer.Deferred[IProtocol]":
"""Implements IStreamClientEndpoint interface"""

return run_in_background(self._do_connect, protocol_factory)

async def _do_connect(self, protocol_factory: IProtocolFactory) -> None:
async def _do_connect(self, protocol_factory: IProtocolFactory) -> IProtocol:
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
first_exception = None

server_list = await self._resolve_server()
Expand Down
Loading