From 2edea7a098b4a14d3d220c1439d0c4b6263d61e5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 26 May 2023 13:36:39 +0100 Subject: [PATCH 1/5] Cache requests for user's devices from federation This should mitigate the issue where lots of different servers requests the same user's devices all at once. --- .../storage/databases/main/end_to_end_keys.py | 53 ++++++++++++++++++- 1 file changed, 51 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 4bc391f21316..33b8d9437718 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -16,6 +16,7 @@ import abc from typing import ( TYPE_CHECKING, + Any, Collection, Dict, Iterable, @@ -39,6 +40,7 @@ TransactionUnusedFallbackKeys, ) from synapse.logging.opentracing import log_kv, set_tag, trace +from synapse.replication.tcp.streams._base import DeviceListsStream from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import ( DatabasePool, @@ -104,6 +106,23 @@ def __init__( self.hs.config.federation.allow_device_name_lookup_over_federation ) + def process_replication_rows( + self, + stream_name: str, + instance_name: str, + token: int, + rows: Iterable[Any], + ) -> None: + if stream_name == DeviceListsStream.NAME: + for row in rows: + assert isinstance(row, DeviceListsStream.DeviceListsStreamRow) + if row.entity.startswith("@"): + self._get_e2e_device_keys_for_federation_query_inner.invalidate( + (row.entity,) + ) + + super().process_replication_rows(stream_name, instance_name, token, rows) + async def get_e2e_device_keys_for_federation_query( self, user_id: str ) -> Tuple[int, List[JsonDict]]: @@ -114,6 +133,36 @@ async def get_e2e_device_keys_for_federation_query( """ now_stream_id = self.get_device_stream_token() + results = await self._get_e2e_device_keys_for_federation_query_inner(user_id) + + # Check that there have been no new devices added by another worker + # after the cache. + sql = """ + SELECT user_id FROM device_lists_stream + WHERE stream_id <= ? AND user_id = ? + """ + rows = await self.db_pool.execute( + "get_e2e_device_keys_for_federation_query_check", + None, + sql, + now_stream_id, + user_id, + ) + if rows: + # There has, so let's invalidate the cache and run again. + self._get_e2e_device_keys_for_federation_query_inner.invalidate((user_id,)) + results = await self._get_e2e_device_keys_for_federation_query_inner( + user_id + ) + + return now_stream_id, results + + @cached(iterable=True) + async def _get_e2e_device_keys_for_federation_query_inner( + self, user_id: str + ) -> List[JsonDict]: + """Get all devices (with any device keys) for a user""" + devices = await self.get_e2e_device_keys_and_signatures([(user_id, None)]) if devices: @@ -134,9 +183,9 @@ async def get_e2e_device_keys_for_federation_query( results.append(result) - return now_stream_id, results + return results - return now_stream_id, [] + return [] @trace @cancellable From 8737dcc85db17f8759d6b072f92faf557741848a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 26 May 2023 13:38:09 +0100 Subject: [PATCH 2/5] Newsfile --- changelog.d/15675.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/15675.misc diff --git a/changelog.d/15675.misc b/changelog.d/15675.misc new file mode 100644 index 000000000000..05538fdbeff9 --- /dev/null +++ b/changelog.d/15675.misc @@ -0,0 +1 @@ +Cache requests for user's devices over federation. From 89cdaa1830e39390317c04061cf861db99423b56 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 26 May 2023 14:00:20 +0100 Subject: [PATCH 3/5] Rework --- .../storage/databases/main/end_to_end_keys.py | 48 +++++++++++-------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 33b8d9437718..cef9000250f6 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -133,27 +133,37 @@ async def get_e2e_device_keys_for_federation_query( """ now_stream_id = self.get_device_stream_token() - results = await self._get_e2e_device_keys_for_federation_query_inner(user_id) - - # Check that there have been no new devices added by another worker - # after the cache. - sql = """ - SELECT user_id FROM device_lists_stream - WHERE stream_id <= ? AND user_id = ? - """ - rows = await self.db_pool.execute( - "get_e2e_device_keys_for_federation_query_check", - None, - sql, - now_stream_id, - user_id, + # We need to be careful with the caching here, as we need to always + # return *all* persisted devices, however there may be a lag between a + # new device being persisted and the cache being invalidated. + cached_results = ( + self._get_e2e_device_keys_for_federation_query_inner.cache.get_immediate( + user_id, None + ) ) - if rows: - # There has, so let's invalidate the cache and run again. - self._get_e2e_device_keys_for_federation_query_inner.invalidate((user_id,)) - results = await self._get_e2e_device_keys_for_federation_query_inner( - user_id + if cached_results is not None: + # Check that there have been no new devices added by another worker + # after the cache. This should be quick as there should be few rows + # with a higher stream ordering. + sql = """ + SELECT user_id FROM device_lists_stream + WHERE stream_id >= ? AND user_id = ? + """ + rows = await self.db_pool.execute( + "get_e2e_device_keys_for_federation_query_check", + None, + sql, + now_stream_id, + user_id, ) + if not rows: + # No new rows, so cache is still valid. + return now_stream_id, cached_results + + # There has, so let's invalidate the cache and run the query. + self._get_e2e_device_keys_for_federation_query_inner.invalidate((user_id,)) + + results = await self._get_e2e_device_keys_for_federation_query_inner(user_id) return now_stream_id, results From 68e8260561039e1d89b93e7f40e8148540e00bb5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 26 May 2023 14:27:08 +0100 Subject: [PATCH 4/5] Also invaliate on local worker --- synapse/storage/databases/main/devices.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index a67fdb3c22ce..f677d048aafb 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1941,6 +1941,10 @@ def _add_device_change_to_stream_txn( user_id, stream_ids[-1], ) + txn.call_after( + self._get_e2e_device_keys_for_federation_query_inner.invalidate, + (user_id,), + ) min_stream_id = stream_ids[0] From a4a61d1dcec221318d58b6421622d884817ac301 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2023 13:40:26 +0100 Subject: [PATCH 5/5] comment --- synapse/storage/databases/main/end_to_end_keys.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index cef9000250f6..91ae9c457d78 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -145,6 +145,10 @@ async def get_e2e_device_keys_for_federation_query( # Check that there have been no new devices added by another worker # after the cache. This should be quick as there should be few rows # with a higher stream ordering. + # + # Note that we invalidate based on the device stream, so we only + # have to check for potential invalidations after the + # `now_stream_id`. sql = """ SELECT user_id FROM device_lists_stream WHERE stream_id >= ? AND user_id = ?