Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add type hints to various handlers. #9223

Merged
merged 11 commits into from
Jan 26, 2021
1 change: 1 addition & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ files =
synapse/handlers/set_password.py,
synapse/handlers/sso.py,
synapse/handlers/state_deltas.py,
synapse/handlers/stats.py,
synapse/handlers/sync.py,
synapse/handlers/typing.py,
synapse/handlers/user_directory.py,
Expand Down
45 changes: 29 additions & 16 deletions synapse/handlers/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,25 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from collections import Counter
from typing import (
TYPE_CHECKING,
Any,
Counter as CounterType,
Dict,
Iterable,
Optional,
Tuple,
)

from synapse.api.constants import EventTypes, Membership
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import JsonDict

if TYPE_CHECKING:
from synapse.app.homeserver import HomeServer

logger = logging.getLogger(__name__)

Expand All @@ -31,7 +43,7 @@ class StatsHandler:
Heavily derived from UserDirectoryHandler
"""

def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
Expand All @@ -44,7 +56,7 @@ def __init__(self, hs):
self.stats_enabled = hs.config.stats_enabled

# The current position in the current_state_delta stream
self.pos = None
self.pos = None # type: Optional[int]

# Guard to ensure we only process deltas one at a time
self._is_processing = False
Expand All @@ -56,7 +68,7 @@ def __init__(self, hs):
# we start populating stats
self.clock.call_later(0, self.notify_new_event)

def notify_new_event(self):
def notify_new_event(self) -> None:
"""Called when there may be more deltas to process
"""
if not self.stats_enabled or self._is_processing:
Expand All @@ -72,7 +84,7 @@ async def process():

run_as_background_process("stats.notify_new_event", process)

async def _unsafe_process(self):
async def _unsafe_process(self) -> None:
# If self.pos is None then means we haven't fetched it from DB
if self.pos is None:
self.pos = await self.store.get_stats_positions()
Expand Down Expand Up @@ -110,10 +122,10 @@ async def _unsafe_process(self):
)

for room_id, fields in room_count.items():
room_deltas.setdefault(room_id, {}).update(fields)
room_deltas.setdefault(room_id, Counter()).update(fields)

for user_id, fields in user_count.items():
user_deltas.setdefault(user_id, {}).update(fields)
user_deltas.setdefault(user_id, Counter()).update(fields)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine because fields is a Dict[str, int], which in this case is effectively the same as Counter[str]?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty much, the alternative would be to make user_deltas a Dict[str, Dict[str, int]] instead of a Dict[str, Counter[str]] I think? Would that be better?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah, it's fine, just wanted to check my understanding of the change was correct :)


logger.debug("room_deltas: %s", room_deltas)
logger.debug("user_deltas: %s", user_deltas)
Expand All @@ -131,19 +143,20 @@ async def _unsafe_process(self):

self.pos = max_pos

async def _handle_deltas(self, deltas):
async def _handle_deltas(
self, deltas: Iterable[JsonDict]
) -> Tuple[Dict[str, CounterType[str]], Dict[str, CounterType[str]]]:
"""Called with the state deltas to process

Returns:
tuple[dict[str, Counter], dict[str, counter]]
Two dicts: the room deltas and the user deltas,
mapping from room/user ID to changes in the various fields.
"""

room_to_stats_deltas = {}
user_to_stats_deltas = {}
room_to_stats_deltas = {} # type: Dict[str, CounterType[str]]
user_to_stats_deltas = {} # type: Dict[str, CounterType[str]]

room_to_state_updates = {}
room_to_state_updates = {} # type: Dict[str, Dict[str, Any]]

for delta in deltas:
typ = delta["type"]
Expand Down Expand Up @@ -173,7 +186,7 @@ async def _handle_deltas(self, deltas):
)
continue

event_content = {}
event_content = {} # type: JsonDict

sender = None
if event_id is not None:
Expand Down Expand Up @@ -257,13 +270,13 @@ async def _handle_deltas(self, deltas):
)

if has_changed_joinedness:
delta = +1 if membership == Membership.JOIN else -1
membership_delta = +1 if membership == Membership.JOIN else -1

user_to_stats_deltas.setdefault(user_id, Counter())[
"joined_rooms"
] += delta
] += membership_delta

room_stats_delta["local_users_in_room"] += delta
room_stats_delta["local_users_in_room"] += membership_delta

elif typ == EventTypes.Create:
room_state["is_federatable"] = (
Expand Down
22 changes: 12 additions & 10 deletions synapse/storage/databases/main/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
# limitations under the License.

import logging
from collections import Counter
from enum import Enum
from itertools import chain
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Counter, Dict, List, Optional, Tuple

from twisted.internet.defer import DeferredLock

Expand Down Expand Up @@ -319,7 +318,9 @@ def _get_statistics_for_subject_txn(
return slice_list

@cached()
async def get_earliest_token_for_stats(self, stats_type: str, id: str) -> int:
async def get_earliest_token_for_stats(
self, stats_type: str, id: str
) -> Optional[int]:
"""
Fetch the "earliest token". This is used by the room stats delta
processor to ignore deltas that have been processed between the
Expand All @@ -339,7 +340,7 @@ async def get_earliest_token_for_stats(self, stats_type: str, id: str) -> int:
)

async def bulk_update_stats_delta(
self, ts: int, updates: Dict[str, Dict[str, Dict[str, Counter]]], stream_id: int
self, ts: int, updates: Dict[str, Dict[str, Counter[str]]], stream_id: int
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just wrong, Counter is already a (special) mapping.

) -> None:
"""Bulk update stats tables for a given stream_id and updates the stats
incremental position.
Expand Down Expand Up @@ -665,7 +666,7 @@ def _upsert_copy_from_table_with_additive_relatives_txn(

async def get_changes_room_total_events_and_bytes(
self, min_pos: int, max_pos: int
) -> Dict[str, Dict[str, int]]:
) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]:
"""Fetches the counts of events in the given range of stream IDs.

Args:
Expand All @@ -683,18 +684,19 @@ async def get_changes_room_total_events_and_bytes(
max_pos,
)

def get_changes_room_total_events_and_bytes_txn(self, txn, low_pos, high_pos):
def get_changes_room_total_events_and_bytes_txn(
self, txn, low_pos: int, high_pos: int
) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]:
"""Gets the total_events and total_event_bytes counts for rooms and
senders, in a range of stream_orderings (including backfilled events).

Args:
txn
low_pos (int): Low stream ordering
high_pos (int): High stream ordering
low_pos: Low stream ordering
high_pos: High stream ordering

Returns:
tuple[dict[str, dict[str, int]], dict[str, dict[str, int]]]: The
room and user deltas for total_events/total_event_bytes in the
The room and user deltas for total_events/total_event_bytes in the
format of `stats_id` -> fields
"""

Expand Down