Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Convert stats and related calls to async/await (#8192)
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep authored Aug 27, 2020
1 parent b71d4a0 commit b49a5b9
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 77 deletions.
1 change: 1 addition & 0 deletions changelog.d/8192.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
15 changes: 7 additions & 8 deletions synapse/storage/databases/main/monthly_active_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import List
from typing import Dict, List

from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool, make_in_list_sql_clause
Expand All @@ -33,11 +33,11 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self.hs = hs

@cached(num_args=0)
def get_monthly_active_count(self):
async def get_monthly_active_count(self) -> int:
"""Generates current count of monthly active users
Returns:
Defered[int]: Number of current monthly active users
Number of current monthly active users
"""

def _count_users(txn):
Expand All @@ -46,19 +46,18 @@ def _count_users(txn):
(count,) = txn.fetchone()
return count

return self.db_pool.runInteraction("count_users", _count_users)
return await self.db_pool.runInteraction("count_users", _count_users)

@cached(num_args=0)
def get_monthly_active_count_by_service(self):
async def get_monthly_active_count_by_service(self) -> Dict[str, int]:
"""Generates current count of monthly active users broken down by service.
A service is typically an appservice but also includes native matrix users.
Since the `monthly_active_users` table is populated from the `user_ips` table
`config.track_appservice_user_ips` must be set to `true` for this
method to return anything other than native matrix users.
Returns:
Deferred[dict]: dict that includes a mapping between app_service_id
and the number of occurrences.
A mapping between app_service_id and the number of occurrences.
"""

Expand All @@ -74,7 +73,7 @@ def _count_users_by_service(txn):
result = txn.fetchall()
return dict(result)

return self.db_pool.runInteraction(
return await self.db_pool.runInteraction(
"count_users_by_service", _count_users_by_service
)

Expand Down
82 changes: 42 additions & 40 deletions synapse/storage/databases/main/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
# limitations under the License.

import logging
from collections import Counter
from itertools import chain
from typing import Any, Dict, Tuple
from typing import Any, Dict, List, Optional, Tuple

from twisted.internet.defer import DeferredLock

Expand Down Expand Up @@ -251,21 +252,23 @@ async def update_room_state(self, room_id: str, fields: Dict[str, Any]) -> None:
desc="update_room_state",
)

def get_statistics_for_subject(self, stats_type, stats_id, start, size=100):
async def get_statistics_for_subject(
self, stats_type: str, stats_id: str, start: str, size: int = 100
) -> List[dict]:
"""
Get statistics for a given subject.
Args:
stats_type (str): The type of subject
stats_id (str): The ID of the subject (e.g. room_id or user_id)
start (int): Pagination start. Number of entries, not timestamp.
size (int): How many entries to return.
stats_type: The type of subject
stats_id: The ID of the subject (e.g. room_id or user_id)
start: Pagination start. Number of entries, not timestamp.
size: How many entries to return.
Returns:
Deferred[list[dict]], where the dict has the keys of
A list of dicts, where the dict has the keys of
ABSOLUTE_STATS_FIELDS[stats_type], and "bucket_size" and "end_ts".
"""
return self.db_pool.runInteraction(
return await self.db_pool.runInteraction(
"get_statistics_for_subject",
self._get_statistics_for_subject_txn,
stats_type,
Expand Down Expand Up @@ -319,18 +322,17 @@ async def get_earliest_token_for_stats(self, stats_type: str, id: str) -> int:
allow_none=True,
)

def bulk_update_stats_delta(self, ts, updates, stream_id):
async def bulk_update_stats_delta(
self, ts: int, updates: Dict[str, Dict[str, Dict[str, Counter]]], stream_id: int
) -> None:
"""Bulk update stats tables for a given stream_id and updates the stats
incremental position.
Args:
ts (int): Current timestamp in ms
updates(dict[str, dict[str, dict[str, Counter]]]): The updates to
commit as a mapping stats_type -> stats_id -> field -> delta.
stream_id (int): Current position.
Returns:
Deferred
ts: Current timestamp in ms
updates: The updates to commit as a mapping of
stats_type -> stats_id -> field -> delta.
stream_id: Current position.
"""

def _bulk_update_stats_delta_txn(txn):
Expand All @@ -355,38 +357,37 @@ def _bulk_update_stats_delta_txn(txn):
updatevalues={"stream_id": stream_id},
)

return self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"bulk_update_stats_delta", _bulk_update_stats_delta_txn
)

def update_stats_delta(
async def update_stats_delta(
self,
ts,
stats_type,
stats_id,
fields,
complete_with_stream_id,
absolute_field_overrides=None,
):
ts: int,
stats_type: str,
stats_id: str,
fields: Dict[str, int],
complete_with_stream_id: Optional[int],
absolute_field_overrides: Optional[Dict[str, int]] = None,
) -> None:
"""
Updates the statistics for a subject, with a delta (difference/relative
change).
Args:
ts (int): timestamp of the change
stats_type (str): "room" or "user" – the kind of subject
stats_id (str): the subject's ID (room ID or user ID)
fields (dict[str, int]): Deltas of stats values.
complete_with_stream_id (int, optional):
ts: timestamp of the change
stats_type: "room" or "user" – the kind of subject
stats_id: the subject's ID (room ID or user ID)
fields: Deltas of stats values.
complete_with_stream_id:
If supplied, converts an incomplete row into a complete row,
with the supplied stream_id marked as the stream_id where the
row was completed.
absolute_field_overrides (dict[str, int]): Current stats values
(i.e. not deltas) of absolute fields.
Does not work with per-slice fields.
absolute_field_overrides: Current stats values (i.e. not deltas) of
absolute fields. Does not work with per-slice fields.
"""

return self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"update_stats_delta",
self._update_stats_delta_txn,
ts,
Expand Down Expand Up @@ -646,19 +647,20 @@ def _upsert_copy_from_table_with_additive_relatives_txn(
txn, into_table, all_dest_keyvalues, src_row
)

def get_changes_room_total_events_and_bytes(self, min_pos, max_pos):
async def get_changes_room_total_events_and_bytes(
self, min_pos: int, max_pos: int
) -> Dict[str, Dict[str, int]]:
"""Fetches the counts of events in the given range of stream IDs.
Args:
min_pos (int)
max_pos (int)
min_pos
max_pos
Returns:
Deferred[dict[str, dict[str, int]]]: Mapping of room ID to field
changes.
Mapping of room ID to field changes.
"""

return self.db_pool.runInteraction(
return await self.db_pool.runInteraction(
"stats_incremental_total_events_and_bytes",
self.get_changes_room_total_events_and_bytes_txn,
min_pos,
Expand Down
21 changes: 11 additions & 10 deletions tests/handlers/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from synapse.handlers.auth import AuthHandler

from tests import unittest
from tests.test_utils import make_awaitable
from tests.utils import setup_test_homeserver


Expand Down Expand Up @@ -142,7 +143,7 @@ def test_mau_limits_disabled(self):
def test_mau_limits_exceeded_large(self):
self.auth_blocking._limit_usage_by_mau = True
self.hs.get_datastore().get_monthly_active_count = Mock(
return_value=defer.succeed(self.large_number_of_users)
side_effect=lambda: make_awaitable(self.large_number_of_users)
)

with self.assertRaises(ResourceLimitError):
Expand All @@ -153,7 +154,7 @@ def test_mau_limits_exceeded_large(self):
)

self.hs.get_datastore().get_monthly_active_count = Mock(
return_value=defer.succeed(self.large_number_of_users)
side_effect=lambda: make_awaitable(self.large_number_of_users)
)
with self.assertRaises(ResourceLimitError):
yield defer.ensureDeferred(
Expand All @@ -168,7 +169,7 @@ def test_mau_limits_parity(self):

# If not in monthly active cohort
self.hs.get_datastore().get_monthly_active_count = Mock(
return_value=defer.succeed(self.auth_blocking._max_mau_value)
side_effect=lambda: make_awaitable(self.auth_blocking._max_mau_value)
)
with self.assertRaises(ResourceLimitError):
yield defer.ensureDeferred(
Expand All @@ -178,7 +179,7 @@ def test_mau_limits_parity(self):
)

self.hs.get_datastore().get_monthly_active_count = Mock(
return_value=defer.succeed(self.auth_blocking._max_mau_value)
side_effect=lambda: make_awaitable(self.auth_blocking._max_mau_value)
)
with self.assertRaises(ResourceLimitError):
yield defer.ensureDeferred(
Expand All @@ -188,21 +189,21 @@ def test_mau_limits_parity(self):
)
# If in monthly active cohort
self.hs.get_datastore().user_last_seen_monthly_active = Mock(
return_value=defer.succeed(self.hs.get_clock().time_msec())
side_effect=lambda user_id: make_awaitable(self.hs.get_clock().time_msec())
)
self.hs.get_datastore().get_monthly_active_count = Mock(
return_value=defer.succeed(self.auth_blocking._max_mau_value)
side_effect=lambda: make_awaitable(self.auth_blocking._max_mau_value)
)
yield defer.ensureDeferred(
self.auth_handler.get_access_token_for_user_id(
"user_a", device_id=None, valid_until_ms=None
)
)
self.hs.get_datastore().user_last_seen_monthly_active = Mock(
return_value=defer.succeed(self.hs.get_clock().time_msec())
side_effect=lambda user_id: make_awaitable(self.hs.get_clock().time_msec())
)
self.hs.get_datastore().get_monthly_active_count = Mock(
return_value=defer.succeed(self.auth_blocking._max_mau_value)
side_effect=lambda: make_awaitable(self.auth_blocking._max_mau_value)
)
yield defer.ensureDeferred(
self.auth_handler.validate_short_term_login_token_and_get_user_id(
Expand All @@ -215,7 +216,7 @@ def test_mau_limits_not_exceeded(self):
self.auth_blocking._limit_usage_by_mau = True

self.hs.get_datastore().get_monthly_active_count = Mock(
return_value=defer.succeed(self.small_number_of_users)
side_effect=lambda: make_awaitable(self.small_number_of_users)
)
# Ensure does not raise exception
yield defer.ensureDeferred(
Expand All @@ -225,7 +226,7 @@ def test_mau_limits_not_exceeded(self):
)

self.hs.get_datastore().get_monthly_active_count = Mock(
return_value=defer.succeed(self.small_number_of_users)
side_effect=lambda: make_awaitable(self.small_number_of_users)
)
yield defer.ensureDeferred(
self.auth_handler.validate_short_term_login_token_and_get_user_id(
Expand Down
12 changes: 5 additions & 7 deletions tests/handlers/test_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

from mock import Mock

from twisted.internet import defer

from synapse.api.auth import Auth
from synapse.api.constants import UserTypes
from synapse.api.errors import Codes, ResourceLimitError, SynapseError
Expand Down Expand Up @@ -102,23 +100,23 @@ def test_mau_limits_when_disabled(self):
def test_get_or_create_user_mau_not_blocked(self):
self.hs.config.limit_usage_by_mau = True
self.store.count_monthly_users = Mock(
return_value=defer.succeed(self.hs.config.max_mau_value - 1)
side_effect=lambda: make_awaitable(self.hs.config.max_mau_value - 1)
)
# Ensure does not throw exception
self.get_success(self.get_or_create_user(self.requester, "c", "User"))

def test_get_or_create_user_mau_blocked(self):
self.hs.config.limit_usage_by_mau = True
self.store.get_monthly_active_count = Mock(
return_value=defer.succeed(self.lots_of_users)
side_effect=lambda: make_awaitable(self.lots_of_users)
)
self.get_failure(
self.get_or_create_user(self.requester, "b", "display_name"),
ResourceLimitError,
)

self.store.get_monthly_active_count = Mock(
return_value=defer.succeed(self.hs.config.max_mau_value)
side_effect=lambda: make_awaitable(self.hs.config.max_mau_value)
)
self.get_failure(
self.get_or_create_user(self.requester, "b", "display_name"),
Expand All @@ -128,14 +126,14 @@ def test_get_or_create_user_mau_blocked(self):
def test_register_mau_blocked(self):
self.hs.config.limit_usage_by_mau = True
self.store.get_monthly_active_count = Mock(
return_value=defer.succeed(self.lots_of_users)
side_effect=lambda: make_awaitable(self.lots_of_users)
)
self.get_failure(
self.handler.register_user(localpart="local_part"), ResourceLimitError
)

self.store.get_monthly_active_count = Mock(
return_value=defer.succeed(self.hs.config.max_mau_value)
side_effect=lambda: make_awaitable(self.hs.config.max_mau_value)
)
self.get_failure(
self.handler.register_user(localpart="local_part"), ResourceLimitError
Expand Down
9 changes: 4 additions & 5 deletions tests/rest/admin/test_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@

from mock import Mock

from twisted.internet import defer

import synapse.rest.admin
from synapse.api.constants import UserTypes
from synapse.api.errors import HttpResponseException, ResourceLimitError
from synapse.rest.client.v1 import login
from synapse.rest.client.v2_alpha import sync

from tests import unittest
from tests.test_utils import make_awaitable
from tests.unittest import override_config


Expand Down Expand Up @@ -338,7 +337,7 @@ def test_register_mau_limit_reached(self):

# Set monthly active users to the limit
store.get_monthly_active_count = Mock(
return_value=defer.succeed(self.hs.config.max_mau_value)
side_effect=lambda: make_awaitable(self.hs.config.max_mau_value)
)
# Check that the blocking of monthly active users is working as expected
# The registration of a new user fails due to the limit
Expand Down Expand Up @@ -592,7 +591,7 @@ def test_create_user_mau_limit_reached_active_admin(self):

# Set monthly active users to the limit
self.store.get_monthly_active_count = Mock(
return_value=defer.succeed(self.hs.config.max_mau_value)
side_effect=lambda: make_awaitable(self.hs.config.max_mau_value)
)
# Check that the blocking of monthly active users is working as expected
# The registration of a new user fails due to the limit
Expand Down Expand Up @@ -632,7 +631,7 @@ def test_create_user_mau_limit_reached_passive_admin(self):

# Set monthly active users to the limit
self.store.get_monthly_active_count = Mock(
return_value=defer.succeed(self.hs.config.max_mau_value)
side_effect=lambda: make_awaitable(self.hs.config.max_mau_value)
)
# Check that the blocking of monthly active users is working as expected
# The registration of a new user fails due to the limit
Expand Down
Loading

0 comments on commit b49a5b9

Please sign in to comment.