From 00404b3ab43582c2c589f3f0c5aa46377796b78f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 24 Jul 2024 20:56:50 -0500 Subject: [PATCH 01/14] Better standardize `find_relevant_room_ids_for_extension(...)` --- synapse/handlers/sliding_sync.py | 193 +++++++++++++++++++------- synapse/types/handlers/__init__.py | 18 ++- synapse/types/rest/client/__init__.py | 18 +++ tests/rest/client/test_sync.py | 100 +++++-------- 4 files changed, 213 insertions(+), 116 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 32315744021..103ebfbbdfa 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -630,7 +630,8 @@ async def handle_room(room_id: str) -> None: extensions = await self.get_extensions_response( sync_config=sync_config, - lists=lists, + actual_lists=lists, + actual_room_ids=rooms.keys(), from_token=from_token, to_token=to_token, ) @@ -1800,7 +1801,8 @@ async def get_room_sync_data( async def get_extensions_response( self, sync_config: SlidingSyncConfig, - lists: Dict[str, SlidingSyncResult.SlidingWindowList], + actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList], + actual_room_ids: Set[str], to_token: StreamToken, from_token: Optional[SlidingSyncStreamToken], ) -> SlidingSyncResult.Extensions: @@ -1808,7 +1810,9 @@ async def get_extensions_response( Args: sync_config: Sync configuration - lists: Sliding window API. A map of list key to list results. + actual_lists: Sliding window API. A map of list key to list results in the + Sliding Sync response. + actual_room_ids: The actual room IDs in the the Sliding Sync response. to_token: The point in the stream to sync up to. from_token: The point in the stream to sync from. """ @@ -1837,18 +1841,100 @@ async def get_extensions_response( if sync_config.extensions.account_data is not None: account_data_response = await self.get_account_data_extension_response( sync_config=sync_config, - lists=lists, + actual_lists=actual_lists, + actual_room_ids=actual_room_ids, account_data_request=sync_config.extensions.account_data, to_token=to_token, from_token=from_token, ) + receipts_response = None + if sync_config.extensions.receipts is not None: + receipts_response = await self.get_receipts_extension_response( + sync_config=sync_config, + actual_lists=actual_lists, + actual_room_ids=actual_room_ids, + receipts_request=sync_config.extensions.receipts, + to_token=to_token, + from_token=from_token, + ) + return SlidingSyncResult.Extensions( to_device=to_device_response, e2ee=e2ee_response, account_data=account_data_response, + receipts=receipts_response, ) + def find_relevant_room_ids_for_extension( + self, + requested_lists: Optional[List[str]], + requested_rooms: Optional[List[str]], + actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList], + actual_room_ids: Set[str], + ) -> Set[str]: + """ + Handle the reserved `lists`/`rooms` keys for extensions. + + {"lists": []} // Do not process any lists. + {"lists": ["rooms", "dms"]} // Process only a subset of lists. + {"lists": ["*"]} // Process all lists defined in the Sliding Window API. (This is the default.) + + {"rooms": []} // Do not process any specific rooms. + {"rooms": ["!a:b", "!c:d"]} // Process only a subset of room subscriptions. + {"rooms": ["*"]} // Process all room subscriptions defined in the Room Subscription API. (This is the default.) + + Args: + requested_lists: The `lists` from the extension request. + requested_rooms: The `rooms` from the extension request. + actual_lists: The actual lists from the Sliding Sync response. + actual_room_subscriptions: The actual room subscriptions from the Sliding Sync request. + """ + + # We only want to include account data for rooms that are already in the sliding + # sync response AND that were requested in the account data request. + relevant_room_ids: Set[str] = set() + + # See what rooms from the room subscriptions we should get account data for + if requested_rooms is not None: + for room_id in requested_rooms: + # A wildcard means we process all rooms from the room subscriptions + if room_id == "*": + relevant_room_ids.update(actual_room_ids) + break + + if room_id in actual_room_ids: + relevant_room_ids.add(room_id) + + # See what rooms from the sliding window lists we should get account data for + if requested_lists is not None: + for list_key in requested_lists: + # Just some typing because we share the variable name in multiple places + actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None + + # A wildcard means we process rooms from all lists + if list_key == "*": + for actual_list in actual_lists.values(): + # We only expect a single SYNC operation for any list + assert len(actual_list.ops) == 1 + sync_op = actual_list.ops[0] + assert sync_op.op == OperationType.SYNC + + relevant_room_ids.update(sync_op.room_ids) + + break + + actual_list = actual_lists.get(list_key) + if actual_list is not None: + # We only expect a single SYNC operation for any list + assert len(actual_list.ops) == 1 + sync_op = actual_list.ops[0] + assert sync_op.op == OperationType.SYNC + + relevant_room_ids.update(sync_op.room_ids) + + return relevant_room_ids + async def get_to_device_extension_response( self, sync_config: SlidingSyncConfig, @@ -1976,7 +2062,8 @@ async def get_e2ee_extension_response( async def get_account_data_extension_response( self, sync_config: SlidingSyncConfig, - lists: Dict[str, SlidingSyncResult.SlidingWindowList], + actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList], + actual_room_ids: Set[str], account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension, to_token: StreamToken, from_token: Optional[SlidingSyncStreamToken], @@ -1985,7 +2072,9 @@ async def get_account_data_extension_response( Args: sync_config: Sync configuration - lists: Sliding window API. A map of list key to list results. + actual_lists: Sliding window API. A map of list key to list results in the + Sliding Sync response. + actual_room_ids: The actual room IDs in the the Sliding Sync response. account_data_request: The account_data extension from the request to_token: The point in the stream to sync up to. from_token: The point in the stream to sync from. @@ -2022,55 +2111,14 @@ async def get_account_data_extension_response( await self.push_rules_handler.push_rules_for_user(sync_config.user) ) - # We only want to include account data for rooms that are already in the sliding - # sync response AND that were requested in the account data request. - relevant_room_ids: Set[str] = set() - - # See what rooms from the room subscriptions we should get account data for - if ( - account_data_request.rooms is not None - and sync_config.room_subscriptions is not None - ): - actual_room_ids = sync_config.room_subscriptions.keys() - - for room_id in account_data_request.rooms: - # A wildcard means we process all rooms from the room subscriptions - if room_id == "*": - relevant_room_ids.update(sync_config.room_subscriptions.keys()) - break - - if room_id in actual_room_ids: - relevant_room_ids.add(room_id) - - # See what rooms from the sliding window lists we should get account data for - if account_data_request.lists is not None: - for list_key in account_data_request.lists: - # Just some typing because we share the variable name in multiple places - actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None - - # A wildcard means we process rooms from all lists - if list_key == "*": - for actual_list in lists.values(): - # We only expect a single SYNC operation for any list - assert len(actual_list.ops) == 1 - sync_op = actual_list.ops[0] - assert sync_op.op == OperationType.SYNC - - relevant_room_ids.update(sync_op.room_ids) - - break - - actual_list = lists.get(list_key) - if actual_list is not None: - # We only expect a single SYNC operation for any list - assert len(actual_list.ops) == 1 - sync_op = actual_list.ops[0] - assert sync_op.op == OperationType.SYNC - - relevant_room_ids.update(sync_op.room_ids) - # Fetch room account data account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] = {} + relevant_room_ids = self.find_relevant_room_ids_for_extension( + requested_lists=account_data_request.lists, + requested_rooms=account_data_request.rooms, + actual_lists=actual_lists, + actual_room_ids=actual_room_ids, + ) if len(relevant_room_ids) > 0: if from_token is not None: account_data_by_room_map = ( @@ -2094,3 +2142,42 @@ async def get_account_data_extension_response( global_account_data_map=global_account_data_map, account_data_by_room_map=account_data_by_room_map, ) + + async def get_receipts_extension_response( + self, + sync_config: SlidingSyncConfig, + actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList], + actual_room_ids: Set[str], + receipts_request: SlidingSyncConfig.Extensions.ReceiptsExtension, + to_token: StreamToken, + from_token: Optional[SlidingSyncStreamToken], + ) -> Optional[SlidingSyncResult.Extensions.ReceiptsExtension]: + """Handle Receipts extension (MSC3960) + + Args: + sync_config: Sync configuration + actual_lists: Sliding window API. A map of list key to list results in the + Sliding Sync response. + actual_room_ids: The actual room IDs in the the Sliding Sync response. + account_data_request: The account_data extension from the request + to_token: The point in the stream to sync up to. + from_token: The point in the stream to sync from. + """ + user_id = sync_config.user.to_string() + + # Skip if the extension is not enabled + if not receipts_request.enabled: + return None + + # receipt_source = self.event_sources.sources.receipt + # receipts, receipt_key = await receipt_source.get_new_events( + # user=sync_config.user, + # from_key=receipt_key, + # limit=sync_config.filter_collection.ephemeral_limit(), + # room_ids=room_ids, + # is_guest=sync_config.is_guest, + # ) + + return SlidingSyncResult.Extensions.ReceiptsExtension( + room_id_to_receipt_map=TODO, + ) diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index 479222a18dc..654c50ef617 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -349,12 +349,28 @@ def __bool__(self) -> bool: self.global_account_data_map or self.account_data_by_room_map ) + @attr.s(slots=True, frozen=True, auto_attribs=True) + class ReceiptsExtension: + """The Receipts extension (MSC3960) + + Attributes: + room_id_to_receipt_map: Mapping from room_id to `m.receipt` event (type, content) + """ + + room_id_to_receipt_map: Mapping[str, Mapping[str, JsonMapping]] + + def __bool__(self) -> bool: + return bool(self.room_id_to_receipt_map) + to_device: Optional[ToDeviceExtension] = None e2ee: Optional[E2eeExtension] = None account_data: Optional[AccountDataExtension] = None + receipts: Optional[ReceiptsExtension] = None def __bool__(self) -> bool: - return bool(self.to_device or self.e2ee or self.account_data) + return bool( + self.to_device or self.e2ee or self.account_data or self.receipts + ) next_pos: SlidingSyncStreamToken lists: Dict[str, SlidingWindowList] diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py index 34e07ddac5c..ae8b7d81442 100644 --- a/synapse/types/rest/client/__init__.py +++ b/synapse/types/rest/client/__init__.py @@ -339,9 +339,27 @@ class AccountDataExtension(RequestBodyModel): # Process all room subscriptions defined in the Room Subscription API. (This is the default.) rooms: Optional[List[StrictStr]] = ["*"] + class ReceiptsExtension(RequestBodyModel): + """The Receipts extension (MSC3960) + + Attributes: + enabled + lists: List of list keys (from the Sliding Window API) to apply this + extension to. + rooms: List of room IDs (from the Room Subscription API) to apply this + extension to. + """ + + enabled: Optional[StrictBool] = False + # Process all lists defined in the Sliding Window API. (This is the default.) + lists: Optional[List[StrictStr]] = ["*"] + # Process all room subscriptions defined in the Room Subscription API. (This is the default.) + rooms: Optional[List[StrictStr]] = ["*"] + to_device: Optional[ToDeviceExtension] = None e2ee: Optional[E2eeExtension] = None account_data: Optional[AccountDataExtension] = None + receipts: Optional[ReceiptsExtension] = None # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884 if TYPE_CHECKING: diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 135b677bad3..276588ad2f6 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -5941,8 +5941,7 @@ def test_room_account_data_relevant_rooms(self) -> None: room_id5: "room5", } - # Mix lists and rooms - sync_body = { + main_sync_body = { "lists": { # We expect this list range to include room5 and room4 "foo-list": { @@ -5963,6 +5962,11 @@ def test_room_account_data_relevant_rooms(self) -> None: "timeline_limit": 0, } }, + } + + # Mix lists and rooms + sync_body = { + **main_sync_body, "extensions": { "account_data": { "enabled": True, @@ -5991,26 +5995,7 @@ def test_room_account_data_relevant_rooms(self) -> None: # Try wildcards (this is the default) sync_body = { - "lists": { - # We expect this list range to include room5 and room4 - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": 0, - }, - # We expect this list range to include room5, room4, room3 - "bar-list": { - "ranges": [[0, 2]], - "required_state": [], - "timeline_limit": 0, - }, - }, - "room_subscriptions": { - room_id1: { - "required_state": [], - "timeline_limit": 0, - } - }, + **main_sync_body, "extensions": { "account_data": { "enabled": True, @@ -6039,26 +6024,7 @@ def test_room_account_data_relevant_rooms(self) -> None: # Empty list will return nothing sync_body = { - "lists": { - # We expect this list range to include room5 and room4 - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": 0, - }, - # We expect this list range to include room5, room4, room3 - "bar-list": { - "ranges": [[0, 2]], - "required_state": [], - "timeline_limit": 0, - }, - }, - "room_subscriptions": { - room_id1: { - "required_state": [], - "timeline_limit": 0, - } - }, + **main_sync_body, "extensions": { "account_data": { "enabled": True, @@ -6087,26 +6053,7 @@ def test_room_account_data_relevant_rooms(self) -> None: # Try wildcard and none sync_body = { - "lists": { - # We expect this list range to include room5 and room4 - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": 0, - }, - # We expect this list range to include room5, room4, room3 - "bar-list": { - "ranges": [[0, 2]], - "required_state": [], - "timeline_limit": 0, - }, - }, - "room_subscriptions": { - room_id1: { - "required_state": [], - "timeline_limit": 0, - } - }, + **main_sync_body, "extensions": { "account_data": { "enabled": True, @@ -6133,6 +6080,35 @@ def test_room_account_data_relevant_rooms(self) -> None: exact=True, ) + # Try requesting a room that is only in a list + sync_body = { + **main_sync_body, + "extensions": { + "account_data": { + "enabled": True, + "lists": [], + "rooms": [room_id5], + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + # room1: ❌ Not requested + # room2: ❌ Not requested + # room3: ❌ Not requested + # room4: ❌ Not requested + # room5: ✅ Requested via `rooms` and is in a list + self.assertIncludes( + { + room_id_to_human_name_map[room_id] + for room_id in response_body["extensions"]["account_data"] + .get("rooms") + .keys() + }, + {"room5"}, + exact=True, + ) + def test_wait_for_new_data(self) -> None: """ Test to make sure that the Sliding Sync request waits for new data to arrive. From 2631c245ee902da968f12941664dacb8624cdfed Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Jul 2024 00:28:51 -0500 Subject: [PATCH 02/14] Iterate more --- synapse/handlers/account_data.py | 1 + synapse/handlers/receipts.py | 4 +- synapse/handlers/sliding_sync.py | 67 ++- synapse/types/handlers/__init__.py | 2 +- tests/rest/client/test_sync.py | 735 +++++++++++++++++++---------- 5 files changed, 546 insertions(+), 263 deletions(-) diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index 97a463d8d0d..8041326cd50 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -327,6 +327,7 @@ async def get_new_events( explicit_room_id: Optional[str] = None, ) -> Tuple[List[JsonDict], int]: user_id = user.to_string() + # TODO: Take `to_key` into account last_stream_id = from_key current_stream_id = self.store.get_max_account_data_stream_id() diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 8674a8fcdda..d04c76be2a2 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -286,8 +286,10 @@ async def get_new_events( room_ids: Iterable[str], is_guest: bool, explicit_room_id: Optional[str] = None, + to_key: Optional[MultiWriterStreamToken] = None, ) -> Tuple[List[JsonMapping], MultiWriterStreamToken]: - to_key = self.get_current_key() + if to_key is None: + to_key = self.get_current_key() if from_key == to_key: return [], to_key diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 103ebfbbdfa..eb475694026 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -47,6 +47,7 @@ DeviceListUpdates, JsonDict, JsonMapping, + MultiWriterStreamToken, PersistedEventPosition, Requester, RoomStreamToken, @@ -631,7 +632,7 @@ async def handle_room(room_id: str) -> None: extensions = await self.get_extensions_response( sync_config=sync_config, actual_lists=lists, - actual_room_ids=rooms.keys(), + actual_room_ids=set(rooms.keys()), from_token=from_token, to_token=to_token, ) @@ -1869,12 +1870,14 @@ async def get_extensions_response( def find_relevant_room_ids_for_extension( self, requested_lists: Optional[List[str]], - requested_rooms: Optional[List[str]], + requested_room_ids: Optional[List[str]], actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList], actual_room_ids: Set[str], ) -> Set[str]: """ - Handle the reserved `lists`/`rooms` keys for extensions. + Handle the reserved `lists`/`rooms` keys for extensions. Extensions should only + return results for rooms in the Sliding Sync response. This matches up the + requested rooms/lists with the actual lists/rooms in the Sliding Sync response. {"lists": []} // Do not process any lists. {"lists": ["rooms", "dms"]} // Process only a subset of lists. @@ -1886,9 +1889,9 @@ def find_relevant_room_ids_for_extension( Args: requested_lists: The `lists` from the extension request. - requested_rooms: The `rooms` from the extension request. + requested_room_ids: The `rooms` from the extension request. actual_lists: The actual lists from the Sliding Sync response. - actual_room_subscriptions: The actual room subscriptions from the Sliding Sync request. + actual_room_ids: The actual room subscriptions from the Sliding Sync request. """ # We only want to include account data for rooms that are already in the sliding @@ -1896,8 +1899,8 @@ def find_relevant_room_ids_for_extension( relevant_room_ids: Set[str] = set() # See what rooms from the room subscriptions we should get account data for - if requested_rooms is not None: - for room_id in requested_rooms: + if requested_room_ids is not None: + for room_id in requested_room_ids: # A wildcard means we process all rooms from the room subscriptions if room_id == "*": relevant_room_ids.update(actual_room_ids) @@ -2087,6 +2090,7 @@ async def get_account_data_extension_response( global_account_data_map: Mapping[str, JsonMapping] = {} if from_token is not None: + # TODO: This should take into account the `from_token` and `to_token` global_account_data_map = ( await self.store.get_updated_global_account_data_for_user( user_id, from_token.stream_token.account_data_key @@ -2098,15 +2102,18 @@ async def get_account_data_extension_response( ) if have_push_rules_changed: global_account_data_map = dict(global_account_data_map) + # TODO: This should take into account the `from_token` and `to_token` global_account_data_map[AccountDataTypes.PUSH_RULES] = ( await self.push_rules_handler.push_rules_for_user(sync_config.user) ) else: + # TODO: This should take into account the `to_token` all_global_account_data = await self.store.get_global_account_data_for_user( user_id ) global_account_data_map = dict(all_global_account_data) + # TODO: This should take into account the `to_token` global_account_data_map[AccountDataTypes.PUSH_RULES] = ( await self.push_rules_handler.push_rules_for_user(sync_config.user) ) @@ -2115,18 +2122,20 @@ async def get_account_data_extension_response( account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] = {} relevant_room_ids = self.find_relevant_room_ids_for_extension( requested_lists=account_data_request.lists, - requested_rooms=account_data_request.rooms, + requested_room_ids=account_data_request.rooms, actual_lists=actual_lists, actual_room_ids=actual_room_ids, ) if len(relevant_room_ids) > 0: if from_token is not None: + # TODO: This should take into account the `from_token` and `to_token` account_data_by_room_map = ( await self.store.get_updated_room_account_data_for_user( user_id, from_token.stream_token.account_data_key ) ) else: + # TODO: This should take into account the `to_token` account_data_by_room_map = ( await self.store.get_room_account_data_for_user(user_id) ) @@ -2163,21 +2172,41 @@ async def get_receipts_extension_response( to_token: The point in the stream to sync up to. from_token: The point in the stream to sync from. """ - user_id = sync_config.user.to_string() - # Skip if the extension is not enabled if not receipts_request.enabled: return None - # receipt_source = self.event_sources.sources.receipt - # receipts, receipt_key = await receipt_source.get_new_events( - # user=sync_config.user, - # from_key=receipt_key, - # limit=sync_config.filter_collection.ephemeral_limit(), - # room_ids=room_ids, - # is_guest=sync_config.is_guest, - # ) + relevant_room_ids = self.find_relevant_room_ids_for_extension( + requested_lists=receipts_request.lists, + requested_room_ids=receipts_request.rooms, + actual_lists=actual_lists, + actual_room_ids=actual_room_ids, + ) + + room_id_to_receipt_map: Mapping[str, JsonMapping] = {} + if len(relevant_room_ids) > 0: + receipt_source = self.event_sources.sources.receipt + receipts, _ = await receipt_source.get_new_events( + user=sync_config.user, + from_key=( + from_token.stream_token.receipt_key + if from_token + else MultiWriterStreamToken(stream=0) + ), + to_key=to_token.receipt_key, + # This is a dummy value and isn't used in the function + limit=0, + room_ids=relevant_room_ids, + is_guest=False, + ) + + for receipt in receipts: + # These fields should exist for every receipt + room_id = receipt["room_id"] + type = receipt["type"] + content = receipt["content"] + room_id_to_receipt_map[room_id] = {type: type, content: content} return SlidingSyncResult.Extensions.ReceiptsExtension( - room_id_to_receipt_map=TODO, + room_id_to_receipt_map=room_id_to_receipt_map, ) diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index 654c50ef617..488ebd8365c 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -357,7 +357,7 @@ class ReceiptsExtension: room_id_to_receipt_map: Mapping from room_id to `m.receipt` event (type, content) """ - room_id_to_receipt_map: Mapping[str, Mapping[str, JsonMapping]] + room_id_to_receipt_map: Mapping[str, JsonMapping] def __bool__(self) -> bool: return bool(self.room_id_to_receipt_map) diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 276588ad2f6..50473139415 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -4625,6 +4625,179 @@ def test_room_subscriptions_world_readable(self) -> None: channel.json_body["rooms"].get(room_id1), channel.json_body["rooms"] ) + @parameterized.expand([("account_data",), ("receipts",)]) + def test_extensions_lists_rooms_relevant_rooms(self, extension_name: str) -> None: + """ + Test out different variations of `lists`/`rooms` we are requesting extensions for. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Create some rooms + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id4 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id5 = self.helper.create_room_as(user1_id, tok=user1_tok) + + room_id_to_human_name_map = { + room_id1: "room1", + room_id2: "room2", + room_id3: "room3", + room_id4: "room4", + room_id5: "room5", + } + + for room_id in room_id_to_human_name_map.keys(): + # Add some account data to each room + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + + main_sync_body = { + "lists": { + # We expect this list range to include room5 and room4 + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 0, + }, + # We expect this list range to include room5, room4, room3 + "bar-list": { + "ranges": [[0, 2]], + "required_state": [], + "timeline_limit": 0, + }, + }, + "room_subscriptions": { + room_id1: { + "required_state": [], + "timeline_limit": 0, + } + }, + } + + # Mix lists and rooms + sync_body = { + **main_sync_body, + "extensions": { + "account_data": { + "enabled": True, + "lists": ["foo-list", "non-existent-list"], + "rooms": [room_id1, room_id2, "!non-existent-room"], + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + # room1: ✅ Requested via `rooms` and a room subscription exists + # room2: ❌ Requested via `rooms` but not in the response (from lists or room subscriptions) + # room3: ❌ Not requested + # room4: ✅ Shows up because requested via `lists` and list exists in the response + # room5: ✅ Shows up because requested via `lists` and list exists in the response + self.assertIncludes( + { + room_id_to_human_name_map[room_id] + for room_id in response_body["extensions"]["account_data"] + .get("rooms") + .keys() + }, + {"room1", "room4", "room5"}, + exact=True, + ) + + # Try wildcards (this is the default) + sync_body = { + **main_sync_body, + "extensions": { + "account_data": { + "enabled": True, + # "lists": ["*"], + # "rooms": ["*"], + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + # room1: ✅ Shows up because of default `rooms` wildcard and is in one of the room subscriptions + # room2: ❌ Not requested + # room3: ✅ Shows up because of default `lists` wildcard and is in a list + # room4: ✅ Shows up because of default `lists` wildcard and is in a list + # room5: ✅ Shows up because of default `lists` wildcard and is in a list + self.assertIncludes( + { + room_id_to_human_name_map[room_id] + for room_id in response_body["extensions"]["account_data"] + .get("rooms") + .keys() + }, + {"room1", "room3", "room4", "room5"}, + exact=True, + ) + + # Empty list will return nothing + sync_body = { + **main_sync_body, + "extensions": { + "account_data": { + "enabled": True, + "lists": [], + "rooms": [], + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + # room1: ❌ Not requested + # room2: ❌ Not requested + # room3: ❌ Not requested + # room4: ❌ Not requested + # room5: ❌ Not requested + self.assertIncludes( + { + room_id_to_human_name_map[room_id] + for room_id in response_body["extensions"]["account_data"] + .get("rooms") + .keys() + }, + set(), + exact=True, + ) + + # Try wildcard and none + sync_body = { + **main_sync_body, + "extensions": { + "account_data": { + "enabled": True, + "lists": ["*"], + "rooms": [], + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + # room1: ❌ Not requested + # room2: ❌ Not requested + # room3: ✅ Shows up because of default `lists` wildcard and is in a list + # room4: ✅ Shows up because of default `lists` wildcard and is in a list + # room5: ✅ Shows up because of default `lists` wildcard and is in a list + self.assertIncludes( + { + room_id_to_human_name_map[room_id] + for room_id in response_body["extensions"]["account_data"] + .get("rooms") + .keys() + }, + {"room3", "room4", "room5"}, + exact=True, + ) + class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase): """Tests for the to-device sliding sync extension""" @@ -5871,242 +6044,289 @@ def test_room_account_data_incremental_sync(self) -> None: exact=True, ) - def test_room_account_data_relevant_rooms(self) -> None: + def test_wait_for_new_data(self) -> None: """ - Test out different variations of `lists`/`rooms` we are requesting account data for. + Test to make sure that the Sliding Sync request waits for new data to arrive. + + (Only applies to incremental syncs with a `timeout` specified) """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") - # Create a room and add some room account data - room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) - self.get_success( - self.account_data_handler.add_account_data_to_room( - user_id=user1_id, - room_id=room_id1, - account_data_type="org.matrix.roorarraz", - content={"roo": "rar"}, - ) - ) + room_id = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id, user1_id, tok=user1_tok) - # Create another room with some room account data - room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) - self.get_success( - self.account_data_handler.add_account_data_to_room( - user_id=user1_id, - room_id=room_id2, - account_data_type="org.matrix.roorarraz", - content={"roo": "rar"}, - ) - ) + sync_body = { + "lists": {}, + "extensions": { + "account_data": { + "enabled": True, + } + }, + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) - # Create another room with some room account data - room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok) - self.get_success( - self.account_data_handler.add_account_data_to_room( - user_id=user1_id, - room_id=room_id3, - account_data_type="org.matrix.roorarraz", - content={"roo": "rar"}, - ) + # Make an incremental Sliding Sync request with the account_data extension enabled + channel = self.make_request( + "POST", + self.sync_endpoint + f"?timeout=10000&pos={from_token}", + content=sync_body, + access_token=user1_tok, + await_result=False, ) - - # Create another room with some room account data - room_id4 = self.helper.create_room_as(user1_id, tok=user1_tok) + # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)` + with self.assertRaises(TimedOutException): + channel.await_result(timeout_ms=5000) + # Bump the global account data to trigger new results self.get_success( - self.account_data_handler.add_account_data_to_room( - user_id=user1_id, - room_id=room_id4, - account_data_type="org.matrix.roorarraz", - content={"roo": "rar"}, + self.account_data_handler.add_account_data_for_user( + user1_id, + "org.matrix.foobarbaz", + {"foo": "bar"}, ) ) + # Should respond before the 10 second timeout + channel.await_result(timeout_ms=3000) + self.assertEqual(channel.code, 200, channel.json_body) - # Create another room with some room account data - room_id5 = self.helper.create_room_as(user1_id, tok=user1_tok) - self.get_success( - self.account_data_handler.add_account_data_to_room( - user_id=user1_id, - room_id=room_id5, - account_data_type="org.matrix.roorarraz", - content={"roo": "rar"}, - ) + # We should see the global account data update + self.assertIncludes( + { + global_event["type"] + for global_event in channel.json_body["extensions"]["account_data"].get( + "global" + ) + }, + {"org.matrix.foobarbaz"}, + exact=True, + ) + self.assertIncludes( + channel.json_body["extensions"]["account_data"].get("rooms").keys(), + set(), + exact=True, ) - room_id_to_human_name_map = { - room_id1: "room1", - room_id2: "room2", - room_id3: "room3", - room_id4: "room4", - room_id5: "room5", - } - - main_sync_body = { - "lists": { - # We expect this list range to include room5 and room4 - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": 0, - }, - # We expect this list range to include room5, room4, room3 - "bar-list": { - "ranges": [[0, 2]], - "required_state": [], - "timeline_limit": 0, - }, - }, - "room_subscriptions": { - room_id1: { - "required_state": [], - "timeline_limit": 0, - } - }, - } + def test_wait_for_new_data_timeout(self) -> None: + """ + Test to make sure that the Sliding Sync request waits for new data to arrive but + no data ever arrives so we timeout. We're also making sure that the default data + from the account_data extension doesn't trigger a false-positive for new data. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") - # Mix lists and rooms sync_body = { - **main_sync_body, + "lists": {}, "extensions": { "account_data": { "enabled": True, - "lists": ["foo-list", "non-existent-list"], - "rooms": [room_id1, room_id2, "!non-existent-room"], } }, } - response_body, _ = self.do_sync(sync_body, tok=user1_tok) + _, from_token = self.do_sync(sync_body, tok=user1_tok) - # room1: ✅ Requested via `rooms` and a room subscription exists - # room2: ❌ Requested via `rooms` but not in the response (from lists or room subscriptions) - # room3: ❌ Not requested - # room4: ✅ Shows up because requested via `lists` and list exists in the response - # room5: ✅ Shows up because requested via `lists` and list exists in the response - self.assertIncludes( - { - room_id_to_human_name_map[room_id] - for room_id in response_body["extensions"]["account_data"] - .get("rooms") - .keys() - }, - {"room1", "room4", "room5"}, - exact=True, + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint + f"?timeout=10000&pos={from_token}", + content=sync_body, + access_token=user1_tok, + await_result=False, ) + # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)` + with self.assertRaises(TimedOutException): + channel.await_result(timeout_ms=5000) + # Wake-up `notifier.wait_for_events(...)` that will cause us test + # `SlidingSyncResult.__bool__` for new results. + self._bump_notifier_wait_for_events(user1_id) + # Block for a little bit more to ensure we don't see any new results. + with self.assertRaises(TimedOutException): + channel.await_result(timeout_ms=4000) + # Wait for the sync to complete (wait for the rest of the 10 second timeout, + # 5000 + 4000 + 1200 > 10000) + channel.await_result(timeout_ms=1200) + self.assertEqual(channel.code, 200, channel.json_body) - # Try wildcards (this is the default) - sync_body = { - **main_sync_body, - "extensions": { - "account_data": { - "enabled": True, - # "lists": ["*"], - # "rooms": ["*"], - } - }, - } - response_body, _ = self.do_sync(sync_body, tok=user1_tok) + self.assertIsNotNone( + channel.json_body["extensions"]["account_data"].get("global") + ) + self.assertIsNotNone( + channel.json_body["extensions"]["account_data"].get("rooms") + ) - # room1: ✅ Shows up because of default `rooms` wildcard and is in one of the room subscriptions - # room2: ❌ Not requested - # room3: ✅ Shows up because of default `lists` wildcard and is in a list - # room4: ✅ Shows up because of default `lists` wildcard and is in a list - # room5: ✅ Shows up because of default `lists` wildcard and is in a list - self.assertIncludes( +class SlidingSyncReceiptsExtensionTestCase(unittest.HomeserverTestCase): + """Tests for the receipts sliding sync extension""" + + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + room.register_servlets, + sync.register_servlets, + ] + + def default_config(self) -> JsonDict: + config = super().default_config() + # Enable sliding sync + config["experimental_features"] = {"msc3575_enabled": True} + return config + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + self.event_sources = hs.get_event_sources() + self.e2e_keys_handler = hs.get_e2e_keys_handler() + self.account_data_handler = hs.get_account_data_handler() + self.notifier = hs.get_notifier() + self.sync_endpoint = ( + "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync" + ) + + # TODO: Remove once https://github.com/element-hq/synapse/pull/17481 lands + def _bump_notifier_wait_for_events(self, user_id: str) -> None: + """ + Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding + Sync results. + """ + # We're expecting some new activity from this point onwards + from_token = self.event_sources.get_current_token() + + triggered_notifier_wait_for_events = False + + async def _on_new_acivity( + before_token: StreamToken, after_token: StreamToken + ) -> bool: + nonlocal triggered_notifier_wait_for_events + triggered_notifier_wait_for_events = True + return True + + # Listen for some new activity for the user. We're just trying to confirm that + # our bump below actually does what we think it does (triggers new activity for + # the user). + result_awaitable = self.notifier.wait_for_events( + user_id, + 1000, + _on_new_acivity, + from_token=from_token, + ) + + # Update the account data so that `notifier.wait_for_events(...)` wakes up. + # We're bumping account data because it won't show up in the Sliding Sync + # response so it won't affect whether we have results. + self.get_success( + self.account_data_handler.add_account_data_for_user( + user_id, + "org.matrix.foobarbaz", + {"foo": "bar"}, + ) + ) + + # Wait for our notifier result + self.get_success(result_awaitable) + + if not triggered_notifier_wait_for_events: + raise AssertionError( + "Expected `notifier.wait_for_events(...)` to be triggered" + ) + + def test_no_data_initial_sync(self) -> None: + """ + Test that enabling e2ee extension works during an intitial sync, even if there + is no-data + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Make an initial Sliding Sync request with the e2ee extension enabled + channel = self.make_request( + "POST", + self.sync_endpoint, { - room_id_to_human_name_map[room_id] - for room_id in response_body["extensions"]["account_data"] - .get("rooms") - .keys() + "lists": {}, + "extensions": { + "receipts": { + "enabled": True, + } + }, }, - {"room1", "room3", "room4", "room5"}, - exact=True, + access_token=user1_tok, ) + self.assertEqual(channel.code, 200, channel.json_body) - # Empty list will return nothing - sync_body = { - **main_sync_body, - "extensions": { - "account_data": { - "enabled": True, - "lists": [], - "rooms": [], - } - }, - } - response_body, _ = self.do_sync(sync_body, tok=user1_tok) + # Device list updates are only present for incremental syncs + self.assertIsNone(channel.json_body["extensions"]["e2ee"].get("device_lists")) - # room1: ❌ Not requested - # room2: ❌ Not requested - # room3: ❌ Not requested - # room4: ❌ Not requested - # room5: ❌ Not requested - self.assertIncludes( + # Both of these should be present even when empty + self.assertEqual( + channel.json_body["extensions"]["e2ee"]["device_one_time_keys_count"], { - room_id_to_human_name_map[room_id] - for room_id in response_body["extensions"]["account_data"] - .get("rooms") - .keys() + # This is always present because of + # https://github.com/element-hq/element-android/issues/3725 and + # https://github.com/matrix-org/synapse/issues/10456 + "signed_curve25519": 0 }, - set(), - exact=True, + ) + self.assertEqual( + channel.json_body["extensions"]["e2ee"]["device_unused_fallback_key_types"], + [], ) - # Try wildcard and none - sync_body = { - **main_sync_body, - "extensions": { - "account_data": { - "enabled": True, - "lists": ["*"], - "rooms": [], - } - }, - } - response_body, _ = self.do_sync(sync_body, tok=user1_tok) + def test_no_data_incremental_sync(self) -> None: + """ + Test that enabling e2ee extension works during an incremental sync, even if + there is no-data + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") - # room1: ❌ Not requested - # room2: ❌ Not requested - # room3: ✅ Shows up because of default `lists` wildcard and is in a list - # room4: ✅ Shows up because of default `lists` wildcard and is in a list - # room5: ✅ Shows up because of default `lists` wildcard and is in a list - self.assertIncludes( + from_token = self.event_sources.get_current_token() + + # Make an incremental Sliding Sync request with the e2ee extension enabled + channel = self.make_request( + "POST", + self.sync_endpoint + + f"?pos={self.get_success(from_token.to_string(self.store))}", { - room_id_to_human_name_map[room_id] - for room_id in response_body["extensions"]["account_data"] - .get("rooms") - .keys() + "lists": {}, + "extensions": { + "e2ee": { + "enabled": True, + } + }, }, - {"room3", "room4", "room5"}, - exact=True, + access_token=user1_tok, ) + self.assertEqual(channel.code, 200, channel.json_body) - # Try requesting a room that is only in a list - sync_body = { - **main_sync_body, - "extensions": { - "account_data": { - "enabled": True, - "lists": [], - "rooms": [room_id5], - } - }, - } - response_body, _ = self.do_sync(sync_body, tok=user1_tok) + # Device list shows up for incremental syncs + self.assertEqual( + channel.json_body["extensions"]["e2ee"] + .get("device_lists", {}) + .get("changed"), + [], + ) + self.assertEqual( + channel.json_body["extensions"]["e2ee"].get("device_lists", {}).get("left"), + [], + ) - # room1: ❌ Not requested - # room2: ❌ Not requested - # room3: ❌ Not requested - # room4: ❌ Not requested - # room5: ✅ Requested via `rooms` and is in a list - self.assertIncludes( + # Both of these should be present even when empty + self.assertEqual( + channel.json_body["extensions"]["e2ee"]["device_one_time_keys_count"], { - room_id_to_human_name_map[room_id] - for room_id in response_body["extensions"]["account_data"] - .get("rooms") - .keys() + # Note that "signed_curve25519" is always returned in key count responses + # regardless of whether we uploaded any keys for it. This is necessary until + # https://github.com/matrix-org/matrix-doc/issues/3298 is fixed. + # + # Also related: + # https://github.com/element-hq/element-android/issues/3725 and + # https://github.com/matrix-org/synapse/issues/10456 + "signed_curve25519": 0 }, - {"room5"}, - exact=True, + ) + self.assertEqual( + channel.json_body["extensions"]["e2ee"]["device_unused_fallback_key_types"], + [], ) def test_wait_for_new_data(self) -> None: @@ -6119,84 +6339,91 @@ def test_wait_for_new_data(self) -> None: user1_tok = self.login(user1_id, "pass") user2_id = self.register_user("user2", "pass") user2_tok = self.login(user2_id, "pass") + test_device_id = "TESTDEVICE" + user3_id = self.register_user("user3", "pass") + user3_tok = self.login(user3_id, "pass", device_id=test_device_id) room_id = self.helper.create_room_as(user2_id, tok=user2_tok) self.helper.join(room_id, user1_id, tok=user1_tok) + self.helper.join(room_id, user3_id, tok=user3_tok) - sync_body = { - "lists": {}, - "extensions": { - "account_data": { - "enabled": True, - } - }, - } - _, from_token = self.do_sync(sync_body, tok=user1_tok) + from_token = self.event_sources.get_current_token() - # Make an incremental Sliding Sync request with the account_data extension enabled + # Make the Sliding Sync request channel = self.make_request( "POST", - self.sync_endpoint + f"?timeout=10000&pos={from_token}", - content=sync_body, + self.sync_endpoint + + "?timeout=10000" + + f"&pos={self.get_success(from_token.to_string(self.store))}", + { + "lists": {}, + "extensions": { + "e2ee": { + "enabled": True, + } + }, + }, access_token=user1_tok, await_result=False, ) # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)` with self.assertRaises(TimedOutException): channel.await_result(timeout_ms=5000) - # Bump the global account data to trigger new results - self.get_success( - self.account_data_handler.add_account_data_for_user( - user1_id, - "org.matrix.foobarbaz", - {"foo": "bar"}, - ) + # Bump the device lists to trigger new results + # Have user3 update their device list + device_update_channel = self.make_request( + "PUT", + f"/devices/{test_device_id}", + { + "display_name": "New Device Name", + }, + access_token=user3_tok, + ) + self.assertEqual( + device_update_channel.code, 200, device_update_channel.json_body ) # Should respond before the 10 second timeout channel.await_result(timeout_ms=3000) self.assertEqual(channel.code, 200, channel.json_body) - # We should see the global account data update - self.assertIncludes( - { - global_event["type"] - for global_event in channel.json_body["extensions"]["account_data"].get( - "global" - ) - }, - {"org.matrix.foobarbaz"}, - exact=True, + # We should see the device list update + self.assertEqual( + channel.json_body["extensions"]["e2ee"] + .get("device_lists", {}) + .get("changed"), + [user3_id], ) - self.assertIncludes( - channel.json_body["extensions"]["account_data"].get("rooms").keys(), - set(), - exact=True, + self.assertEqual( + channel.json_body["extensions"]["e2ee"].get("device_lists", {}).get("left"), + [], ) def test_wait_for_new_data_timeout(self) -> None: """ Test to make sure that the Sliding Sync request waits for new data to arrive but no data ever arrives so we timeout. We're also making sure that the default data - from the account_data extension doesn't trigger a false-positive for new data. + from the E2EE extension doesn't trigger a false-positive for new data (see + `device_one_time_keys_count.signed_curve25519`). """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") - sync_body = { - "lists": {}, - "extensions": { - "account_data": { - "enabled": True, - } - }, - } - _, from_token = self.do_sync(sync_body, tok=user1_tok) + from_token = self.event_sources.get_current_token() # Make the Sliding Sync request channel = self.make_request( "POST", - self.sync_endpoint + f"?timeout=10000&pos={from_token}", - content=sync_body, + self.sync_endpoint + + "?timeout=10000" + + f"&pos={self.get_success(from_token.to_string(self.store))}", + { + "lists": {}, + "extensions": { + "e2ee": { + "enabled": True, + } + }, + }, access_token=user1_tok, await_result=False, ) @@ -6214,9 +6441,33 @@ def test_wait_for_new_data_timeout(self) -> None: channel.await_result(timeout_ms=1200) self.assertEqual(channel.code, 200, channel.json_body) - self.assertIsNotNone( - channel.json_body["extensions"]["account_data"].get("global") + # Device lists are present for incremental syncs but empty because no device changes + self.assertEqual( + channel.json_body["extensions"]["e2ee"] + .get("device_lists", {}) + .get("changed"), + [], ) - self.assertIsNotNone( - channel.json_body["extensions"]["account_data"].get("rooms") + self.assertEqual( + channel.json_body["extensions"]["e2ee"].get("device_lists", {}).get("left"), + [], + ) + + # Both of these should be present even when empty + self.assertEqual( + channel.json_body["extensions"]["e2ee"]["device_one_time_keys_count"], + { + # Note that "signed_curve25519" is always returned in key count responses + # regardless of whether we uploaded any keys for it. This is necessary until + # https://github.com/matrix-org/matrix-doc/issues/3298 is fixed. + # + # Also related: + # https://github.com/element-hq/element-android/issues/3725 and + # https://github.com/matrix-org/synapse/issues/10456 + "signed_curve25519": 0 + }, + ) + self.assertEqual( + channel.json_body["extensions"]["e2ee"]["device_unused_fallback_key_types"], + [], ) From a294b4196ac97662adb8ecd2021449ba653889d6 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Jul 2024 00:44:18 -0500 Subject: [PATCH 03/14] Generalize extension test --- synapse/handlers/sliding_sync.py | 2 +- synapse/rest/client/sync.py | 6 +++ tests/rest/client/test_sync.py | 82 +++++++++++++++++++++++++------- 3 files changed, 72 insertions(+), 18 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index eb475694026..0c7299137d2 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -2205,7 +2205,7 @@ async def get_receipts_extension_response( room_id = receipt["room_id"] type = receipt["type"] content = receipt["content"] - room_id_to_receipt_map[room_id] = {type: type, content: content} + room_id_to_receipt_map[room_id] = {"type": type, "content": content} return SlidingSyncResult.Extensions.ReceiptsExtension( room_id_to_receipt_map=room_id_to_receipt_map, diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 7cf1f564358..268c6521e05 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -1134,6 +1134,12 @@ async def encode_extensions( }, } + if extensions.receipts is not None: + serialized_extensions["receipts"] = { + # Same as the the top-level `account_data.events` field in Sync v2. + "rooms": extensions.receipts.room_id_to_receipt_map, + } + return serialized_extensions diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 50473139415..2581b58b5cc 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -1280,6 +1280,7 @@ class SlidingSyncTestCase(SlidingSyncBase): room.register_servlets, sync.register_servlets, devices.register_servlets, + receipts.register_servlets, ] def default_config(self) -> JsonDict: @@ -4625,10 +4626,12 @@ def test_room_subscriptions_world_readable(self) -> None: channel.json_body["rooms"].get(room_id1), channel.json_body["rooms"] ) + # Any extensions that use `lists`/`rooms` should be tested here @parameterized.expand([("account_data",), ("receipts",)]) def test_extensions_lists_rooms_relevant_rooms(self, extension_name: str) -> None: """ - Test out different variations of `lists`/`rooms` we are requesting extensions for. + With various extensions, test out requesting different variations of + `lists`/`rooms`. """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") @@ -4649,15 +4652,30 @@ def test_extensions_lists_rooms_relevant_rooms(self, extension_name: str) -> Non } for room_id in room_id_to_human_name_map.keys(): - # Add some account data to each room - self.get_success( - self.account_data_handler.add_account_data_to_room( - user_id=user1_id, - room_id=room_id, - account_data_type="org.matrix.roorarraz", - content={"roo": "rar"}, + if extension_name == "account_data": + # Add some account data to each room + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) ) - ) + elif extension_name == "receipts": + event_response = self.helper.send( + room_id, body="new event", tok=user1_tok + ) + # Read last event + channel = self.make_request( + "POST", + f"/rooms/{room_id}/receipt/{ReceiptTypes.READ}/{event_response["event_id"]}", + {}, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + else: + raise AssertionError(f"Unknown extension name: {extension_name}") main_sync_body = { "lists": { @@ -4686,7 +4704,7 @@ def test_extensions_lists_rooms_relevant_rooms(self, extension_name: str) -> Non sync_body = { **main_sync_body, "extensions": { - "account_data": { + extension_name: { "enabled": True, "lists": ["foo-list", "non-existent-list"], "rooms": [room_id1, room_id2, "!non-existent-room"], @@ -4703,7 +4721,7 @@ def test_extensions_lists_rooms_relevant_rooms(self, extension_name: str) -> Non self.assertIncludes( { room_id_to_human_name_map[room_id] - for room_id in response_body["extensions"]["account_data"] + for room_id in response_body["extensions"][extension_name] .get("rooms") .keys() }, @@ -4715,7 +4733,7 @@ def test_extensions_lists_rooms_relevant_rooms(self, extension_name: str) -> Non sync_body = { **main_sync_body, "extensions": { - "account_data": { + extension_name: { "enabled": True, # "lists": ["*"], # "rooms": ["*"], @@ -4732,7 +4750,7 @@ def test_extensions_lists_rooms_relevant_rooms(self, extension_name: str) -> Non self.assertIncludes( { room_id_to_human_name_map[room_id] - for room_id in response_body["extensions"]["account_data"] + for room_id in response_body["extensions"][extension_name] .get("rooms") .keys() }, @@ -4744,7 +4762,7 @@ def test_extensions_lists_rooms_relevant_rooms(self, extension_name: str) -> Non sync_body = { **main_sync_body, "extensions": { - "account_data": { + extension_name: { "enabled": True, "lists": [], "rooms": [], @@ -4761,7 +4779,7 @@ def test_extensions_lists_rooms_relevant_rooms(self, extension_name: str) -> Non self.assertIncludes( { room_id_to_human_name_map[room_id] - for room_id in response_body["extensions"]["account_data"] + for room_id in response_body["extensions"][extension_name] .get("rooms") .keys() }, @@ -4773,7 +4791,7 @@ def test_extensions_lists_rooms_relevant_rooms(self, extension_name: str) -> Non sync_body = { **main_sync_body, "extensions": { - "account_data": { + extension_name: { "enabled": True, "lists": ["*"], "rooms": [], @@ -4790,7 +4808,7 @@ def test_extensions_lists_rooms_relevant_rooms(self, extension_name: str) -> Non self.assertIncludes( { room_id_to_human_name_map[room_id] - for room_id in response_body["extensions"]["account_data"] + for room_id in response_body["extensions"][extension_name] .get("rooms") .keys() }, @@ -4798,6 +4816,35 @@ def test_extensions_lists_rooms_relevant_rooms(self, extension_name: str) -> Non exact=True, ) + # Try requesting a room that is only in a list + sync_body = { + **main_sync_body, + "extensions": { + extension_name: { + "enabled": True, + "lists": [], + "rooms": [room_id5], + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + # room1: ❌ Not requested + # room2: ❌ Not requested + # room3: ❌ Not requested + # room4: ❌ Not requested + # room5: ✅ Requested via `rooms` and is in a list + self.assertIncludes( + { + room_id_to_human_name_map[room_id] + for room_id in response_body["extensions"][extension_name] + .get("rooms") + .keys() + }, + {"room5"}, + exact=True, + ) + class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase): """Tests for the to-device sliding sync extension""" @@ -6156,6 +6203,7 @@ def test_wait_for_new_data_timeout(self) -> None: channel.json_body["extensions"]["account_data"].get("rooms") ) + class SlidingSyncReceiptsExtensionTestCase(unittest.HomeserverTestCase): """Tests for the receipts sliding sync extension""" From 67943931fb19a034369efdba1014a82b504d89bc Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Jul 2024 00:48:03 -0500 Subject: [PATCH 04/14] Use more up to date base --- tests/rest/client/test_sync.py | 494 +++++++++++++++++++++++---------- 1 file changed, 348 insertions(+), 146 deletions(-) diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 2581b58b5cc..a6acdc6443a 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -6279,102 +6279,335 @@ async def _on_new_acivity( def test_no_data_initial_sync(self) -> None: """ - Test that enabling e2ee extension works during an intitial sync, even if there - is no-data + Test that enabling the account_data extension works during an intitial sync, + even if there is no-data. """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") - # Make an initial Sliding Sync request with the e2ee extension enabled - channel = self.make_request( - "POST", - self.sync_endpoint, + # Make an initial Sliding Sync request with the account_data extension enabled + sync_body = { + "lists": {}, + "extensions": { + "receipts": { + "enabled": True, + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + self.assertIncludes( { - "lists": {}, - "extensions": { - "receipts": { - "enabled": True, - } - }, + global_event["type"] + for global_event in response_body["extensions"]["account_data"].get( + "global" + ) }, - access_token=user1_tok, + # Even though we don't have any global account data set, Synapse saves some + # default push rules for us. + {AccountDataTypes.PUSH_RULES}, + exact=True, + ) + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + set(), + exact=True, ) - self.assertEqual(channel.code, 200, channel.json_body) - # Device list updates are only present for incremental syncs - self.assertIsNone(channel.json_body["extensions"]["e2ee"].get("device_lists")) + def test_no_data_incremental_sync(self) -> None: + """ + Test that enabling account_data extension works during an incremental sync, even + if there is no-data. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") - # Both of these should be present even when empty - self.assertEqual( - channel.json_body["extensions"]["e2ee"]["device_one_time_keys_count"], + sync_body = { + "lists": {}, + "extensions": { + "receipts": { + "enabled": True, + } + }, + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Make an incremental Sliding Sync request with the account_data extension enabled + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + # There has been no account data changes since the `from_token` so we shouldn't + # see any account data here. + self.assertIncludes( { - # This is always present because of - # https://github.com/element-hq/element-android/issues/3725 and - # https://github.com/matrix-org/synapse/issues/10456 - "signed_curve25519": 0 + global_event["type"] + for global_event in response_body["extensions"]["account_data"].get( + "global" + ) }, + set(), + exact=True, ) - self.assertEqual( - channel.json_body["extensions"]["e2ee"]["device_unused_fallback_key_types"], - [], + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + set(), + exact=True, ) - def test_no_data_incremental_sync(self) -> None: + def test_global_account_data_initial_sync(self) -> None: """ - Test that enabling e2ee extension works during an incremental sync, even if - there is no-data + On initial sync, we should return all global account data on initial sync. """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") - from_token = self.event_sources.get_current_token() + # Update the global account data + self.get_success( + self.account_data_handler.add_account_data_for_user( + user_id=user1_id, + account_data_type="org.matrix.foobarbaz", + content={"foo": "bar"}, + ) + ) - # Make an incremental Sliding Sync request with the e2ee extension enabled - channel = self.make_request( - "POST", - self.sync_endpoint - + f"?pos={self.get_success(from_token.to_string(self.store))}", + # Make an initial Sliding Sync request with the account_data extension enabled + sync_body = { + "lists": {}, + "extensions": { + "receipts": { + "enabled": True, + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + # It should show us all of the global account data + self.assertIncludes( { - "lists": {}, - "extensions": { - "e2ee": { - "enabled": True, - } - }, + global_event["type"] + for global_event in response_body["extensions"]["account_data"].get( + "global" + ) }, - access_token=user1_tok, + {AccountDataTypes.PUSH_RULES, "org.matrix.foobarbaz"}, + exact=True, + ) + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + set(), + exact=True, ) - self.assertEqual(channel.code, 200, channel.json_body) - # Device list shows up for incremental syncs - self.assertEqual( - channel.json_body["extensions"]["e2ee"] - .get("device_lists", {}) - .get("changed"), - [], + def test_global_account_data_incremental_sync(self) -> None: + """ + On incremental sync, we should only account data that has changed since the + `from_token`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Add some global account data + self.get_success( + self.account_data_handler.add_account_data_for_user( + user_id=user1_id, + account_data_type="org.matrix.foobarbaz", + content={"foo": "bar"}, + ) ) - self.assertEqual( - channel.json_body["extensions"]["e2ee"].get("device_lists", {}).get("left"), - [], + + sync_body = { + "lists": {}, + "extensions": { + "receipts": { + "enabled": True, + } + }, + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Add some other global account data + self.get_success( + self.account_data_handler.add_account_data_for_user( + user_id=user1_id, + account_data_type="org.matrix.doodardaz", + content={"doo": "dar"}, + ) ) - # Both of these should be present even when empty - self.assertEqual( - channel.json_body["extensions"]["e2ee"]["device_one_time_keys_count"], + # Make an incremental Sliding Sync request with the account_data extension enabled + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + self.assertIncludes( { - # Note that "signed_curve25519" is always returned in key count responses - # regardless of whether we uploaded any keys for it. This is necessary until - # https://github.com/matrix-org/matrix-doc/issues/3298 is fixed. - # - # Also related: - # https://github.com/element-hq/element-android/issues/3725 and - # https://github.com/matrix-org/synapse/issues/10456 - "signed_curve25519": 0 + global_event["type"] + for global_event in response_body["extensions"]["account_data"].get( + "global" + ) }, + # We should only see the new global account data that happened after the `from_token` + {"org.matrix.doodardaz"}, + exact=True, ) - self.assertEqual( - channel.json_body["extensions"]["e2ee"]["device_unused_fallback_key_types"], - [], + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + set(), + exact=True, + ) + + def test_room_account_data_initial_sync(self) -> None: + """ + On initial sync, we return all account data for a given room but only for + rooms that we request and are being returned in the Sliding Sync response. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Create a room and add some room account data + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id1, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + + # Create another room with some room account data + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id2, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + + # Make an initial Sliding Sync request with the account_data extension enabled + sync_body = { + "lists": {}, + "room_subscriptions": { + room_id1: { + "required_state": [], + "timeline_limit": 0, + } + }, + "extensions": { + "receipts": { + "enabled": True, + "rooms": [room_id1, room_id2], + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + self.assertIsNotNone(response_body["extensions"]["account_data"].get("global")) + # Even though we requested room2, we only expect room1 to show up because that's + # the only room in the Sliding Sync response (room2 is not one of our room + # subscriptions or in a sliding window list). + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + {room_id1}, + exact=True, + ) + self.assertIncludes( + { + event["type"] + for event in response_body["extensions"]["account_data"] + .get("rooms") + .get(room_id1) + }, + {"org.matrix.roorarraz"}, + exact=True, + ) + + def test_room_account_data_incremental_sync(self) -> None: + """ + On incremental sync, we return all account data for a given room but only for + rooms that we request and are being returned in the Sliding Sync response. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Create a room and add some room account data + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id1, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + + # Create another room with some room account data + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id2, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + + sync_body = { + "lists": {}, + "room_subscriptions": { + room_id1: { + "required_state": [], + "timeline_limit": 0, + } + }, + "extensions": { + "receipts": { + "enabled": True, + "rooms": [room_id1, room_id2], + } + }, + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Add some other room account data + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id1, + account_data_type="org.matrix.roorarraz2", + content={"roo": "rar"}, + ) + ) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id2, + account_data_type="org.matrix.roorarraz2", + content={"roo": "rar"}, + ) + ) + + # Make an incremental Sliding Sync request with the account_data extension enabled + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + self.assertIsNotNone(response_body["extensions"]["account_data"].get("global")) + # Even though we requested room2, we only expect room1 to show up because that's + # the only room in the Sliding Sync response (room2 is not one of our room + # subscriptions or in a sliding window list). + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + {room_id1}, + exact=True, + ) + # We should only see the new room account data that happened after the `from_token` + self.assertIncludes( + { + event["type"] + for event in response_body["extensions"]["account_data"] + .get("rooms") + .get(room_id1) + }, + {"org.matrix.roorarraz2"}, + exact=True, ) def test_wait_for_new_data(self) -> None: @@ -6387,91 +6620,84 @@ def test_wait_for_new_data(self) -> None: user1_tok = self.login(user1_id, "pass") user2_id = self.register_user("user2", "pass") user2_tok = self.login(user2_id, "pass") - test_device_id = "TESTDEVICE" - user3_id = self.register_user("user3", "pass") - user3_tok = self.login(user3_id, "pass", device_id=test_device_id) room_id = self.helper.create_room_as(user2_id, tok=user2_tok) self.helper.join(room_id, user1_id, tok=user1_tok) - self.helper.join(room_id, user3_id, tok=user3_tok) - from_token = self.event_sources.get_current_token() + sync_body = { + "lists": {}, + "extensions": { + "receipts": { + "enabled": True, + } + }, + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) - # Make the Sliding Sync request + # Make an incremental Sliding Sync request with the account_data extension enabled channel = self.make_request( "POST", - self.sync_endpoint - + "?timeout=10000" - + f"&pos={self.get_success(from_token.to_string(self.store))}", - { - "lists": {}, - "extensions": { - "e2ee": { - "enabled": True, - } - }, - }, + self.sync_endpoint + f"?timeout=10000&pos={from_token}", + content=sync_body, access_token=user1_tok, await_result=False, ) # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)` with self.assertRaises(TimedOutException): channel.await_result(timeout_ms=5000) - # Bump the device lists to trigger new results - # Have user3 update their device list - device_update_channel = self.make_request( - "PUT", - f"/devices/{test_device_id}", - { - "display_name": "New Device Name", - }, - access_token=user3_tok, - ) - self.assertEqual( - device_update_channel.code, 200, device_update_channel.json_body + # Bump the global account data to trigger new results + self.get_success( + self.account_data_handler.add_account_data_for_user( + user1_id, + "org.matrix.foobarbaz", + {"foo": "bar"}, + ) ) # Should respond before the 10 second timeout channel.await_result(timeout_ms=3000) self.assertEqual(channel.code, 200, channel.json_body) - # We should see the device list update - self.assertEqual( - channel.json_body["extensions"]["e2ee"] - .get("device_lists", {}) - .get("changed"), - [user3_id], + # We should see the global account data update + self.assertIncludes( + { + global_event["type"] + for global_event in channel.json_body["extensions"]["account_data"].get( + "global" + ) + }, + {"org.matrix.foobarbaz"}, + exact=True, ) - self.assertEqual( - channel.json_body["extensions"]["e2ee"].get("device_lists", {}).get("left"), - [], + self.assertIncludes( + channel.json_body["extensions"]["account_data"].get("rooms").keys(), + set(), + exact=True, ) def test_wait_for_new_data_timeout(self) -> None: """ Test to make sure that the Sliding Sync request waits for new data to arrive but no data ever arrives so we timeout. We're also making sure that the default data - from the E2EE extension doesn't trigger a false-positive for new data (see - `device_one_time_keys_count.signed_curve25519`). + from the account_data extension doesn't trigger a false-positive for new data. """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") - from_token = self.event_sources.get_current_token() + sync_body = { + "lists": {}, + "extensions": { + "receipts": { + "enabled": True, + } + }, + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) # Make the Sliding Sync request channel = self.make_request( "POST", - self.sync_endpoint - + "?timeout=10000" - + f"&pos={self.get_success(from_token.to_string(self.store))}", - { - "lists": {}, - "extensions": { - "e2ee": { - "enabled": True, - } - }, - }, + self.sync_endpoint + f"?timeout=10000&pos={from_token}", + content=sync_body, access_token=user1_tok, await_result=False, ) @@ -6489,33 +6715,9 @@ def test_wait_for_new_data_timeout(self) -> None: channel.await_result(timeout_ms=1200) self.assertEqual(channel.code, 200, channel.json_body) - # Device lists are present for incremental syncs but empty because no device changes - self.assertEqual( - channel.json_body["extensions"]["e2ee"] - .get("device_lists", {}) - .get("changed"), - [], - ) - self.assertEqual( - channel.json_body["extensions"]["e2ee"].get("device_lists", {}).get("left"), - [], - ) - - # Both of these should be present even when empty - self.assertEqual( - channel.json_body["extensions"]["e2ee"]["device_one_time_keys_count"], - { - # Note that "signed_curve25519" is always returned in key count responses - # regardless of whether we uploaded any keys for it. This is necessary until - # https://github.com/matrix-org/matrix-doc/issues/3298 is fixed. - # - # Also related: - # https://github.com/element-hq/element-android/issues/3725 and - # https://github.com/matrix-org/synapse/issues/10456 - "signed_curve25519": 0 - }, + self.assertIsNotNone( + channel.json_body["extensions"]["account_data"].get("global") ) - self.assertEqual( - channel.json_body["extensions"]["e2ee"]["device_unused_fallback_key_types"], - [], + self.assertIsNotNone( + channel.json_body["extensions"]["account_data"].get("rooms") ) From f1db02e31b628ea505ff942529c508746dc0764e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Jul 2024 13:43:53 -0500 Subject: [PATCH 05/14] Remove tests that aren't necessary --- synapse/handlers/account_data.py | 1 - tests/rest/client/test_sync.py | 100 ------------------------------- 2 files changed, 101 deletions(-) diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index 8041326cd50..97a463d8d0d 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -327,7 +327,6 @@ async def get_new_events( explicit_room_id: Optional[str] = None, ) -> Tuple[List[JsonDict], int]: user_id = user.to_string() - # TODO: Take `to_key` into account last_stream_id = from_key current_stream_id = self.store.get_max_account_data_stream_id() diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 4422deb884d..c60d0e1d25b 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -5935,106 +5935,6 @@ def test_no_data_incremental_sync(self) -> None: exact=True, ) - def test_global_account_data_initial_sync(self) -> None: - """ - On initial sync, we should return all global account data on initial sync. - """ - user1_id = self.register_user("user1", "pass") - user1_tok = self.login(user1_id, "pass") - - # Update the global account data - self.get_success( - self.account_data_handler.add_account_data_for_user( - user_id=user1_id, - account_data_type="org.matrix.foobarbaz", - content={"foo": "bar"}, - ) - ) - - # Make an initial Sliding Sync request with the account_data extension enabled - sync_body = { - "lists": {}, - "extensions": { - "receipts": { - "enabled": True, - } - }, - } - response_body, _ = self.do_sync(sync_body, tok=user1_tok) - - # It should show us all of the global account data - self.assertIncludes( - { - global_event["type"] - for global_event in response_body["extensions"]["account_data"].get( - "global" - ) - }, - {AccountDataTypes.PUSH_RULES, "org.matrix.foobarbaz"}, - exact=True, - ) - self.assertIncludes( - response_body["extensions"]["account_data"].get("rooms").keys(), - set(), - exact=True, - ) - - def test_global_account_data_incremental_sync(self) -> None: - """ - On incremental sync, we should only account data that has changed since the - `from_token`. - """ - user1_id = self.register_user("user1", "pass") - user1_tok = self.login(user1_id, "pass") - - # Add some global account data - self.get_success( - self.account_data_handler.add_account_data_for_user( - user_id=user1_id, - account_data_type="org.matrix.foobarbaz", - content={"foo": "bar"}, - ) - ) - - sync_body = { - "lists": {}, - "extensions": { - "receipts": { - "enabled": True, - } - }, - } - _, from_token = self.do_sync(sync_body, tok=user1_tok) - - # Add some other global account data - self.get_success( - self.account_data_handler.add_account_data_for_user( - user_id=user1_id, - account_data_type="org.matrix.doodardaz", - content={"doo": "dar"}, - ) - ) - - # Make an incremental Sliding Sync request with the account_data extension enabled - response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) - - self.assertIncludes( - { - global_event["type"] - for global_event in response_body["extensions"]["account_data"].get( - "global" - ) - }, - # We should only see the new global account data that happened after the `from_token` - {"org.matrix.doodardaz"}, - exact=True, - ) - self.assertIncludes( - response_body["extensions"]["account_data"].get("rooms").keys(), - set(), - exact=True, - ) - def test_room_account_data_initial_sync(self) -> None: """ On initial sync, we return all account data for a given room but only for From 2a0704cfb49b03678a25a3bd626c68841af289e8 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Jul 2024 16:14:43 -0500 Subject: [PATCH 06/14] Add tests --- synapse/handlers/sliding_sync.py | 2 +- tests/rest/client/test_sync.py | 458 +++++++++++++++++++------------ 2 files changed, 278 insertions(+), 182 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 0c7299137d2..465df96ebb3 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -2183,7 +2183,7 @@ async def get_receipts_extension_response( actual_room_ids=actual_room_ids, ) - room_id_to_receipt_map: Mapping[str, JsonMapping] = {} + room_id_to_receipt_map: Dict[str, JsonMapping] = {} if len(relevant_room_ids) > 0: receipt_source = self.event_sources.sources.receipt receipts, _ = await receipt_source.get_new_events( diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index c60d0e1d25b..1fc0d18fdfa 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -30,6 +30,7 @@ import synapse.rest.admin from synapse.api.constants import ( AccountDataTypes, + EduTypes, EventContentFields, EventTypes, HistoryVisibility, @@ -1376,6 +1377,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main self.event_sources = hs.get_event_sources() self.storage_controllers = hs.get_storage_controllers() + self.account_data_handler = hs.get_account_data_handler() def _assertRequiredStateIncludes( self, @@ -5786,7 +5788,7 @@ def test_wait_for_new_data_timeout(self) -> None: ) -class SlidingSyncReceiptsExtensionTestCase(unittest.HomeserverTestCase): +class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase): """Tests for the receipts sliding sync extension""" servlets = [ @@ -5794,6 +5796,7 @@ class SlidingSyncReceiptsExtensionTestCase(unittest.HomeserverTestCase): login.register_servlets, room.register_servlets, sync.register_servlets, + receipts.register_servlets, ] def default_config(self) -> JsonDict: @@ -5812,62 +5815,15 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync" ) - # TODO: Remove once https://github.com/element-hq/synapse/pull/17481 lands - def _bump_notifier_wait_for_events(self, user_id: str) -> None: - """ - Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding - Sync results. - """ - # We're expecting some new activity from this point onwards - from_token = self.event_sources.get_current_token() - - triggered_notifier_wait_for_events = False - - async def _on_new_acivity( - before_token: StreamToken, after_token: StreamToken - ) -> bool: - nonlocal triggered_notifier_wait_for_events - triggered_notifier_wait_for_events = True - return True - - # Listen for some new activity for the user. We're just trying to confirm that - # our bump below actually does what we think it does (triggers new activity for - # the user). - result_awaitable = self.notifier.wait_for_events( - user_id, - 1000, - _on_new_acivity, - from_token=from_token, - ) - - # Update the account data so that `notifier.wait_for_events(...)` wakes up. - # We're bumping account data because it won't show up in the Sliding Sync - # response so it won't affect whether we have results. - self.get_success( - self.account_data_handler.add_account_data_for_user( - user_id, - "org.matrix.foobarbaz", - {"foo": "bar"}, - ) - ) - - # Wait for our notifier result - self.get_success(result_awaitable) - - if not triggered_notifier_wait_for_events: - raise AssertionError( - "Expected `notifier.wait_for_events(...)` to be triggered" - ) - def test_no_data_initial_sync(self) -> None: """ - Test that enabling the account_data extension works during an intitial sync, + Test that enabling the receipts extension works during an intitial sync, even if there is no-data. """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") - # Make an initial Sliding Sync request with the account_data extension enabled + # Make an initial Sliding Sync request with the receipts extension enabled sync_body = { "lists": {}, "extensions": { @@ -5879,26 +5835,14 @@ def test_no_data_initial_sync(self) -> None: response_body, _ = self.do_sync(sync_body, tok=user1_tok) self.assertIncludes( - { - global_event["type"] - for global_event in response_body["extensions"]["account_data"].get( - "global" - ) - }, - # Even though we don't have any global account data set, Synapse saves some - # default push rules for us. - {AccountDataTypes.PUSH_RULES}, - exact=True, - ) - self.assertIncludes( - response_body["extensions"]["account_data"].get("rooms").keys(), + response_body["extensions"]["receipts"].get("rooms").keys(), set(), exact=True, ) def test_no_data_incremental_sync(self) -> None: """ - Test that enabling account_data extension works during an incremental sync, even + Test that enabling receipts extension works during an incremental sync, even if there is no-data. """ user1_id = self.register_user("user1", "pass") @@ -5914,58 +5858,88 @@ def test_no_data_incremental_sync(self) -> None: } _, from_token = self.do_sync(sync_body, tok=user1_tok) - # Make an incremental Sliding Sync request with the account_data extension enabled + # Make an incremental Sliding Sync request with the receipts extension enabled response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) - # There has been no account data changes since the `from_token` so we shouldn't - # see any account data here. self.assertIncludes( - { - global_event["type"] - for global_event in response_body["extensions"]["account_data"].get( - "global" - ) - }, - set(), - exact=True, - ) - self.assertIncludes( - response_body["extensions"]["account_data"].get("rooms").keys(), + response_body["extensions"]["receipts"].get("rooms").keys(), set(), exact=True, ) - def test_room_account_data_initial_sync(self) -> None: + def test_receipts_initial_sync(self) -> None: """ - On initial sync, we return all account data for a given room but only for + On initial sync, we return all receipts for a given room but only for rooms that we request and are being returned in the Sliding Sync response. """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + user3_id = self.register_user("user3", "pass") + user3_tok = self.login(user3_id, "pass") - # Create a room and add some room account data - room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) - self.get_success( - self.account_data_handler.add_account_data_to_room( - user_id=user1_id, - room_id=room_id1, - account_data_type="org.matrix.roorarraz", - content={"roo": "rar"}, - ) + # Create a room + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.join(room_id1, user3_id, tok=user3_tok) + event_response1 = self.helper.send(room_id1, body="new event", tok=user2_tok) + # User1 reads the last event + channel = self.make_request( + "POST", + f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{event_response1['event_id']}", + {}, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + # User2 reads the last event + channel = self.make_request( + "POST", + f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{event_response1['event_id']}", + {}, + access_token=user2_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + # User3 privately reads the last event (make sure this doesn't leak to the other users) + channel = self.make_request( + "POST", + f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ_PRIVATE}/{event_response1['event_id']}", + {}, + access_token=user3_tok, ) + self.assertEqual(channel.code, 200, channel.json_body) - # Create another room with some room account data - room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) - self.get_success( - self.account_data_handler.add_account_data_to_room( - user_id=user1_id, - room_id=room_id2, - account_data_type="org.matrix.roorarraz", - content={"roo": "rar"}, - ) + # Create another room + room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id2, user1_id, tok=user1_tok) + self.helper.join(room_id2, user3_id, tok=user3_tok) + event_response2 = self.helper.send(room_id2, body="new event", tok=user2_tok) + # User1 reads the last event + channel = self.make_request( + "POST", + f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ}/{event_response2['event_id']}", + {}, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + # User2 reads the last event + channel = self.make_request( + "POST", + f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ}/{event_response1['event_id']}", + {}, + access_token=user2_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + # User3 privately reads the last event (make sure this doesn't leak to the other users) + channel = self.make_request( + "POST", + f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ_PRIVATE}/{event_response2['event_id']}", + {}, + access_token=user3_tok, ) + self.assertEqual(channel.code, 200, channel.json_body) - # Make an initial Sliding Sync request with the account_data extension enabled + # Make an initial Sliding Sync request with the receipts extension enabled sync_body = { "lists": {}, "room_subscriptions": { @@ -5983,55 +5957,99 @@ def test_room_account_data_initial_sync(self) -> None: } response_body, _ = self.do_sync(sync_body, tok=user1_tok) - self.assertIsNotNone(response_body["extensions"]["account_data"].get("global")) # Even though we requested room2, we only expect room1 to show up because that's # the only room in the Sliding Sync response (room2 is not one of our room # subscriptions or in a sliding window list). self.assertIncludes( - response_body["extensions"]["account_data"].get("rooms").keys(), + response_body["extensions"]["receipts"].get("rooms").keys(), {room_id1}, exact=True, ) + # Sanity check that it's the correct ephemeral event type + self.assertEqual( + response_body["extensions"]["receipts"]["rooms"][room_id1]["type"], + EduTypes.RECEIPT, + ) + # We can see user1 and user2 read receipts self.assertIncludes( - { - event["type"] - for event in response_body["extensions"]["account_data"] - .get("rooms") - .get(room_id1) - }, - {"org.matrix.roorarraz"}, + response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][ + event_response1["event_id"] + ][ReceiptTypes.READ].keys(), + {user1_id, user2_id}, + exact=True, + ) + # User1 did not have a private read receipt and we shouldn't leak others' + # private read receipts + self.assertIncludes( + response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][ + event_response1["event_id"] + ] + .get(ReceiptTypes.READ_PRIVATE, {}) + .keys(), + set(), exact=True, ) - def test_room_account_data_incremental_sync(self) -> None: + def test_receipts_incremental_sync(self) -> None: """ - On incremental sync, we return all account data for a given room but only for + On incremental sync, we return all receipts for a given room but only for rooms that we request and are being returned in the Sliding Sync response. """ + user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + user3_id = self.register_user("user3", "pass") + user3_tok = self.login(user3_id, "pass") - # Create a room and add some room account data - room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) - self.get_success( - self.account_data_handler.add_account_data_to_room( - user_id=user1_id, - room_id=room_id1, - account_data_type="org.matrix.roorarraz", - content={"roo": "rar"}, - ) + # Create room1 + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.join(room_id1, user3_id, tok=user3_tok) + event_response1 = self.helper.send(room_id1, body="new event", tok=user2_tok) + # User2 reads the last event (before the `from_token`) + channel = self.make_request( + "POST", + f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{event_response1['event_id']}", + {}, + access_token=user2_tok, ) + self.assertEqual(channel.code, 200, channel.json_body) - # Create another room with some room account data - room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) - self.get_success( - self.account_data_handler.add_account_data_to_room( - user_id=user1_id, - room_id=room_id2, - account_data_type="org.matrix.roorarraz", - content={"roo": "rar"}, - ) + # Create room2 + room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id2, user1_id, tok=user1_tok) + event_response2 = self.helper.send(room_id2, body="new event", tok=user2_tok) + # User1 reads the last event (before the `from_token`) + channel = self.make_request( + "POST", + f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ}/{event_response2['event_id']}", + {}, + access_token=user1_tok, ) + self.assertEqual(channel.code, 200, channel.json_body) + + # Create room3 + room_id3 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id3, user1_id, tok=user1_tok) + self.helper.join(room_id3, user3_id, tok=user3_tok) + event_response3 = self.helper.send(room_id3, body="new event", tok=user2_tok) + + # Create room4 + room_id4 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id4, user1_id, tok=user1_tok) + self.helper.join(room_id4, user3_id, tok=user3_tok) + event_response4 = self.helper.send(room_id4, body="new event", tok=user2_tok) + # User1 reads the last event (before the `from_token`) + channel = self.make_request( + "POST", + f"/rooms/{room_id4}/receipt/{ReceiptTypes.READ}/{event_response4['event_id']}", + {}, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + sync_body = { "lists": {}, @@ -6039,56 +6057,116 @@ def test_room_account_data_incremental_sync(self) -> None: room_id1: { "required_state": [], "timeline_limit": 0, + }, + room_id3: { + "required_state": [], + "timeline_limit": 0, + }, + room_id4: { + "required_state": [], + "timeline_limit": 0, } }, "extensions": { "receipts": { "enabled": True, - "rooms": [room_id1, room_id2], + "rooms": [room_id1, room_id2, room_id3, room_id4], } }, } _, from_token = self.do_sync(sync_body, tok=user1_tok) - # Add some other room account data - self.get_success( - self.account_data_handler.add_account_data_to_room( - user_id=user1_id, - room_id=room_id1, - account_data_type="org.matrix.roorarraz2", - content={"roo": "rar"}, - ) + # Add some more read receipts after the `from_token` + # + # User1 reads room1 + channel = self.make_request( + "POST", + f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{event_response1['event_id']}", + {}, + access_token=user1_tok, ) - self.get_success( - self.account_data_handler.add_account_data_to_room( - user_id=user1_id, - room_id=room_id2, - account_data_type="org.matrix.roorarraz2", - content={"roo": "rar"}, - ) + self.assertEqual(channel.code, 200, channel.json_body) + # User1 privately reads room2 + channel = self.make_request( + "POST", + f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ_PRIVATE}/{event_response2['event_id']}", + {}, + access_token=user1_tok, ) + self.assertEqual(channel.code, 200, channel.json_body) + # User3 reads room3 + channel = self.make_request( + "POST", + f"/rooms/{room_id3}/receipt/{ReceiptTypes.READ}/{event_response3['event_id']}", + {}, + access_token=user3_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + # No activity for room4 after the `from_token` # Make an incremental Sliding Sync request with the account_data extension enabled response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) - self.assertIsNotNone(response_body["extensions"]["account_data"].get("global")) - # Even though we requested room2, we only expect room1 to show up because that's - # the only room in the Sliding Sync response (room2 is not one of our room - # subscriptions or in a sliding window list). + # Even though we requested room2, we only expect rooms to show up if they are + # already in the Sliding Sync response. room4 doesn't show up because there is + # no activity after the `from_token`. self.assertIncludes( - response_body["extensions"]["account_data"].get("rooms").keys(), - {room_id1}, + response_body["extensions"]["receipts"].get("rooms").keys(), + {room_id1, room_id3}, exact=True, ) - # We should only see the new room account data that happened after the `from_token` + + # Check room1: + # + # Sanity check that it's the correct ephemeral event type + self.assertEqual( + response_body["extensions"]["receipts"]["rooms"][room_id1]["type"], + EduTypes.RECEIPT, + ) + # We only see that user1 has read something in room1 since the `from_token` self.assertIncludes( - { - event["type"] - for event in response_body["extensions"]["account_data"] - .get("rooms") - .get(room_id1) - }, - {"org.matrix.roorarraz2"}, + response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][ + event_response1["event_id"] + ][ReceiptTypes.READ].keys(), + {user1_id}, + exact=True, + ) + # User1 did not send a private read receipt in this room and we shouldn't leak + # others' private read receipts + self.assertIncludes( + response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][ + event_response1["event_id"] + ] + .get(ReceiptTypes.READ_PRIVATE, {}) + .keys(), + set(), + exact=True, + ) + + # Check room3: + # + # Sanity check that it's the correct ephemeral event type + self.assertEqual( + response_body["extensions"]["receipts"]["rooms"][room_id3]["type"], + EduTypes.RECEIPT, + ) + # We only see that user3 has read something in room1 since the `from_token` + self.assertIncludes( + response_body["extensions"]["receipts"]["rooms"][room_id3]["content"][ + event_response3["event_id"] + ][ReceiptTypes.READ].keys(), + {user3_id}, + exact=True, + ) + # User1 did not send a private read receipt in this room and we shouldn't leak + # others' private read receipts + self.assertIncludes( + response_body["extensions"]["receipts"]["rooms"][room_id3]["content"][ + event_response3["event_id"] + ] + .get(ReceiptTypes.READ_PRIVATE, {}) + .keys(), + set(), exact=True, ) @@ -6105,12 +6183,20 @@ def test_wait_for_new_data(self) -> None: room_id = self.helper.create_room_as(user2_id, tok=user2_tok) self.helper.join(room_id, user1_id, tok=user1_tok) + event_response = self.helper.send(room_id, body="new event", tok=user2_tok) sync_body = { "lists": {}, + "room_subscriptions": { + room_id: { + "required_state": [], + "timeline_limit": 0, + }, + }, "extensions": { "receipts": { "enabled": True, + "rooms": [room_id], } }, } @@ -6127,31 +6213,40 @@ def test_wait_for_new_data(self) -> None: # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)` with self.assertRaises(TimedOutException): channel.await_result(timeout_ms=5000) - # Bump the global account data to trigger new results - self.get_success( - self.account_data_handler.add_account_data_for_user( - user1_id, - "org.matrix.foobarbaz", - {"foo": "bar"}, - ) + # Bump the receipts to trigger new results + receipt_channel = self.make_request( + "POST", + f"/rooms/{room_id}/receipt/{ReceiptTypes.READ}/{event_response['event_id']}", + {}, + access_token=user2_tok, ) + self.assertEqual(receipt_channel.code, 200, receipt_channel.json_body) # Should respond before the 10 second timeout channel.await_result(timeout_ms=3000) self.assertEqual(channel.code, 200, channel.json_body) - # We should see the global account data update + # We should see the new receipt self.assertIncludes( - { - global_event["type"] - for global_event in channel.json_body["extensions"]["account_data"].get( - "global" - ) - }, - {"org.matrix.foobarbaz"}, + channel.json_body.get("extensions", {}).get("receipts", {}).get("rooms", {}).keys(), + {room_id}, exact=True, + message=channel.json_body, ) self.assertIncludes( - channel.json_body["extensions"]["account_data"].get("rooms").keys(), + channel.json_body["extensions"]["receipts"]["rooms"][room_id]["content"][ + event_response["event_id"] + ][ReceiptTypes.READ].keys(), + {user2_id}, + exact=True, + ) + # User1 did not send a private read receipt in this room and we shouldn't leak + # others' private read receipts + self.assertIncludes( + channel.json_body["extensions"]["receipts"]["rooms"][room_id]["content"][ + event_response["event_id"] + ] + .get(ReceiptTypes.READ_PRIVATE, {}) + .keys(), set(), exact=True, ) @@ -6160,7 +6255,7 @@ def test_wait_for_new_data_timeout(self) -> None: """ Test to make sure that the Sliding Sync request waits for new data to arrive but no data ever arrives so we timeout. We're also making sure that the default data - from the account_data extension doesn't trigger a false-positive for new data. + from the receipts extension doesn't trigger a false-positive for new data. """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") @@ -6188,7 +6283,9 @@ def test_wait_for_new_data_timeout(self) -> None: channel.await_result(timeout_ms=5000) # Wake-up `notifier.wait_for_events(...)` that will cause us test # `SlidingSyncResult.__bool__` for new results. - self._bump_notifier_wait_for_events(user1_id) + self._bump_notifier_wait_for_events( + user1_id, wake_stream_key=StreamKeyType.ACCOUNT_DATA + ) # Block for a little bit more to ensure we don't see any new results. with self.assertRaises(TimedOutException): channel.await_result(timeout_ms=4000) @@ -6197,9 +6294,8 @@ def test_wait_for_new_data_timeout(self) -> None: channel.await_result(timeout_ms=1200) self.assertEqual(channel.code, 200, channel.json_body) - self.assertIsNotNone( - channel.json_body["extensions"]["account_data"].get("global") - ) - self.assertIsNotNone( - channel.json_body["extensions"]["account_data"].get("rooms") + self.assertIncludes( + channel.json_body["extensions"]["receipts"].get("rooms").keys(), + set(), + exact=True, ) From 8e5d804a1213f473f9227639f86bdba1eefeb387 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Jul 2024 16:17:03 -0500 Subject: [PATCH 07/14] Fix lints --- tests/rest/client/test_sync.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 1fc0d18fdfa..a7bd3e71bc6 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -5995,7 +5995,7 @@ def test_receipts_incremental_sync(self) -> None: On incremental sync, we return all receipts for a given room but only for rooms that we request and are being returned in the Sliding Sync response. """ - + user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") user2_id = self.register_user("user2", "pass") @@ -6050,7 +6050,6 @@ def test_receipts_incremental_sync(self) -> None: ) self.assertEqual(channel.code, 200, channel.json_body) - sync_body = { "lists": {}, "room_subscriptions": { @@ -6065,7 +6064,7 @@ def test_receipts_incremental_sync(self) -> None: room_id4: { "required_state": [], "timeline_limit": 0, - } + }, }, "extensions": { "receipts": { @@ -6227,10 +6226,13 @@ def test_wait_for_new_data(self) -> None: # We should see the new receipt self.assertIncludes( - channel.json_body.get("extensions", {}).get("receipts", {}).get("rooms", {}).keys(), + channel.json_body.get("extensions", {}) + .get("receipts", {}) + .get("rooms", {}) + .keys(), {room_id}, exact=True, - message=channel.json_body, + message=str(channel.json_body), ) self.assertIncludes( channel.json_body["extensions"]["receipts"]["rooms"][room_id]["content"][ From ed4d970834bfd8e79e6526ef0172d00918f77dd8 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Jul 2024 16:18:15 -0500 Subject: [PATCH 08/14] Add changelog --- changelog.d/17489.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17489.feature diff --git a/changelog.d/17489.feature b/changelog.d/17489.feature new file mode 100644 index 00000000000..5ace1e675e6 --- /dev/null +++ b/changelog.d/17489.feature @@ -0,0 +1 @@ +Add receipts extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. From 80f6ac54b641dd6dcf0d929472433b4d76f9b2e0 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Jul 2024 16:24:40 -0500 Subject: [PATCH 09/14] Some clean-up --- tests/rest/client/test_sync.py | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index a7bd3e71bc6..33b4edf49be 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -5799,21 +5799,8 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase): receipts.register_servlets, ] - def default_config(self) -> JsonDict: - config = super().default_config() - # Enable sliding sync - config["experimental_features"] = {"msc3575_enabled": True} - return config - def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main - self.event_sources = hs.get_event_sources() - self.e2e_keys_handler = hs.get_e2e_keys_handler() - self.account_data_handler = hs.get_account_data_handler() - self.notifier = hs.get_notifier() - self.sync_endpoint = ( - "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync" - ) def test_no_data_initial_sync(self) -> None: """ @@ -6103,7 +6090,7 @@ def test_receipts_incremental_sync(self) -> None: self.assertEqual(channel.code, 200, channel.json_body) # No activity for room4 after the `from_token` - # Make an incremental Sliding Sync request with the account_data extension enabled + # Make an incremental Sliding Sync request with the receipts extension enabled response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) # Even though we requested room2, we only expect rooms to show up if they are @@ -6201,7 +6188,7 @@ def test_wait_for_new_data(self) -> None: } _, from_token = self.do_sync(sync_body, tok=user1_tok) - # Make an incremental Sliding Sync request with the account_data extension enabled + # Make an incremental Sliding Sync request with the receipts extension enabled channel = self.make_request( "POST", self.sync_endpoint + f"?timeout=10000&pos={from_token}", From b5b273190cc6b158abb3ca7785b8c96d3d1daac3 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Jul 2024 16:51:14 -0500 Subject: [PATCH 10/14] Fix nested quote syntax --- tests/rest/client/test_sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 33b4edf49be..7ef254f7f81 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -4480,7 +4480,7 @@ def test_extensions_lists_rooms_relevant_rooms(self, extension_name: str) -> Non # Read last event channel = self.make_request( "POST", - f"/rooms/{room_id}/receipt/{ReceiptTypes.READ}/{event_response["event_id"]}", + f"/rooms/{room_id}/receipt/{ReceiptTypes.READ}/{event_response['event_id']}", {}, access_token=user1_tok, ) From 5ad59496175d2649aa37c35ec3d1e7e063fe275b Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 29 Jul 2024 20:56:37 -0500 Subject: [PATCH 11/14] Add TODO --- synapse/handlers/sliding_sync.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index bad9fbbaeba..ed294b8380f 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -642,6 +642,9 @@ async def handle_room(room_id: str) -> None: extensions = await self.get_extensions_response( sync_config=sync_config, actual_lists=lists, + # TODO: Once https://github.com/element-hq/synapse/pull/17479 merges, this + # will need to be updated to make sure it includes everything before the + # pre-filter on `relevant_room_map`. actual_room_ids=set(rooms.keys()), from_token=from_token, to_token=to_token, From 93ad8d64b30d29c3d2b70ce9c2e5742d9dc09148 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 29 Jul 2024 22:18:38 -0500 Subject: [PATCH 12/14] Only timeline receipts on initial sync See https://github.com/element-hq/synapse/pull/17489#discussion_r1694927410 --- synapse/handlers/sliding_sync.py | 27 ++++ synapse/types/handlers/__init__.py | 2 +- tests/rest/client/test_sync.py | 227 +++++++++++++++++++++++++---- 3 files changed, 224 insertions(+), 32 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index ed294b8380f..26153c46b3e 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -646,6 +646,7 @@ async def handle_room(room_id: str) -> None: # will need to be updated to make sure it includes everything before the # pre-filter on `relevant_room_map`. actual_room_ids=set(rooms.keys()), + actual_room_response_map=rooms, from_token=from_token, to_token=to_token, ) @@ -1853,6 +1854,7 @@ async def get_extensions_response( sync_config: SlidingSyncConfig, actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList], actual_room_ids: Set[str], + actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult], to_token: StreamToken, from_token: Optional[SlidingSyncStreamToken], ) -> SlidingSyncResult.Extensions: @@ -1863,6 +1865,8 @@ async def get_extensions_response( actual_lists: Sliding window API. A map of list key to list results in the Sliding Sync response. actual_room_ids: The actual room IDs in the the Sliding Sync response. + actual_room_response_map: A map of room ID to room results in the the + Sliding Sync response. to_token: The point in the stream to sync up to. from_token: The point in the stream to sync from. """ @@ -1904,6 +1908,7 @@ async def get_extensions_response( sync_config=sync_config, actual_lists=actual_lists, actual_room_ids=actual_room_ids, + actual_room_response_map=actual_room_response_map, receipts_request=sync_config.extensions.receipts, to_token=to_token, from_token=from_token, @@ -2206,6 +2211,7 @@ async def get_receipts_extension_response( sync_config: SlidingSyncConfig, actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList], actual_room_ids: Set[str], + actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult], receipts_request: SlidingSyncConfig.Extensions.ReceiptsExtension, to_token: StreamToken, from_token: Optional[SlidingSyncStreamToken], @@ -2217,6 +2223,8 @@ async def get_receipts_extension_response( actual_lists: Sliding window API. A map of list key to list results in the Sliding Sync response. actual_room_ids: The actual room IDs in the the Sliding Sync response. + actual_room_response_map: A map of room ID to room results in the the + Sliding Sync response. account_data_request: The account_data extension from the request to_token: The point in the stream to sync up to. from_token: The point in the stream to sync from. @@ -2234,6 +2242,7 @@ async def get_receipts_extension_response( room_id_to_receipt_map: Dict[str, JsonMapping] = {} if len(relevant_room_ids) > 0: + receipt_source = self.event_sources.sources.receipt receipts, _ = await receipt_source.get_new_events( user=sync_config.user, @@ -2254,6 +2263,24 @@ async def get_receipts_extension_response( room_id = receipt["room_id"] type = receipt["type"] content = receipt["content"] + + room_result = actual_room_response_map.get(room_id) + if room_result is not None: + if room_result.initial: + # TODO: In the future, it would be good to fetch less receipts + # out of the database in the first place but we would need to + # add a new `event_id` index to `receipts_linearized`. + relevant_event_ids = [ + event.event_id for event in room_result.timeline_events + ] + + assert isinstance(content, dict) + content = { + event_id: content_value + for event_id, content_value in content.items() + if event_id in relevant_event_ids + } + room_id_to_receipt_map[room_id] = {"type": type, "content": content} return SlidingSyncResult.Extensions.ReceiptsExtension( diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index ea360cfdad3..2bef4e2db41 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -152,7 +152,7 @@ class SlidingSyncResult: Attributes: next_pos: The next position token in the sliding window to request (next_batch). lists: Sliding window API. A map of list key to list results. - rooms: Room subscription API. A map of room ID to room subscription to room results. + rooms: Room subscription API. A map of room ID to room results. extensions: Extensions API. A map of extension key to extension results. """ diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 227b74bec46..a375af16699 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -6300,10 +6300,12 @@ def test_no_data_incremental_sync(self) -> None: exact=True, ) - def test_receipts_initial_sync(self) -> None: + def test_receipts_initial_sync_with_timeline(self) -> None: """ - On initial sync, we return all receipts for a given room but only for - rooms that we request and are being returned in the Sliding Sync response. + On initial sync, we only return receipts for events in a given room's timeline. + + We also make sure that we only return receipts for rooms that we request and are + already being returned in the Sliding Sync response. """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") @@ -6311,16 +6313,24 @@ def test_receipts_initial_sync(self) -> None: user2_tok = self.login(user2_id, "pass") user3_id = self.register_user("user3", "pass") user3_tok = self.login(user3_id, "pass") + user4_id = self.register_user("user4", "pass") + user4_tok = self.login(user4_id, "pass") # Create a room room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) self.helper.join(room_id1, user1_id, tok=user1_tok) self.helper.join(room_id1, user3_id, tok=user3_tok) - event_response1 = self.helper.send(room_id1, body="new event", tok=user2_tok) + self.helper.join(room_id1, user4_id, tok=user4_tok) + room1_event_response1 = self.helper.send( + room_id1, body="new event1", tok=user2_tok + ) + room1_event_response2 = self.helper.send( + room_id1, body="new event2", tok=user2_tok + ) # User1 reads the last event channel = self.make_request( "POST", - f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{event_response1['event_id']}", + f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{room1_event_response2['event_id']}", {}, access_token=user1_tok, ) @@ -6328,29 +6338,40 @@ def test_receipts_initial_sync(self) -> None: # User2 reads the last event channel = self.make_request( "POST", - f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{event_response1['event_id']}", + f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{room1_event_response2['event_id']}", {}, access_token=user2_tok, ) self.assertEqual(channel.code, 200, channel.json_body) - # User3 privately reads the last event (make sure this doesn't leak to the other users) + # User3 reads the first event channel = self.make_request( "POST", - f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ_PRIVATE}/{event_response1['event_id']}", + f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{room1_event_response1['event_id']}", {}, access_token=user3_tok, ) self.assertEqual(channel.code, 200, channel.json_body) + # User4 privately reads the last event (make sure this doesn't leak to the other users) + channel = self.make_request( + "POST", + f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ_PRIVATE}/{room1_event_response2['event_id']}", + {}, + access_token=user4_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) # Create another room room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok) self.helper.join(room_id2, user1_id, tok=user1_tok) self.helper.join(room_id2, user3_id, tok=user3_tok) - event_response2 = self.helper.send(room_id2, body="new event", tok=user2_tok) + self.helper.join(room_id2, user4_id, tok=user4_tok) + room2_event_response1 = self.helper.send( + room_id2, body="new event2", tok=user2_tok + ) # User1 reads the last event channel = self.make_request( "POST", - f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ}/{event_response2['event_id']}", + f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ}/{room2_event_response1['event_id']}", {}, access_token=user1_tok, ) @@ -6358,17 +6379,17 @@ def test_receipts_initial_sync(self) -> None: # User2 reads the last event channel = self.make_request( "POST", - f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ}/{event_response1['event_id']}", + f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ}/{room2_event_response1['event_id']}", {}, access_token=user2_tok, ) self.assertEqual(channel.code, 200, channel.json_body) - # User3 privately reads the last event (make sure this doesn't leak to the other users) + # User4 privately reads the last event (make sure this doesn't leak to the other users) channel = self.make_request( "POST", - f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ_PRIVATE}/{event_response2['event_id']}", + f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ_PRIVATE}/{room2_event_response1['event_id']}", {}, - access_token=user3_tok, + access_token=user4_tok, ) self.assertEqual(channel.code, 200, channel.json_body) @@ -6378,7 +6399,8 @@ def test_receipts_initial_sync(self) -> None: "room_subscriptions": { room_id1: { "required_state": [], - "timeline_limit": 0, + # On initial sync, we only have receipts for events in the timeline + "timeline_limit": 1, } }, "extensions": { @@ -6390,6 +6412,17 @@ def test_receipts_initial_sync(self) -> None: } response_body, _ = self.do_sync(sync_body, tok=user1_tok) + # Only the latest event in the room is in the timelie because the `timeline_limit` is 1 + self.assertIncludes( + { + event["event_id"] + for event in response_body["rooms"][room_id1].get("timeline", []) + }, + {room1_event_response2["event_id"]}, + exact=True, + message=str(response_body["rooms"][room_id1]), + ) + # Even though we requested room2, we only expect room1 to show up because that's # the only room in the Sliding Sync response (room2 is not one of our room # subscriptions or in a sliding window list). @@ -6406,7 +6439,7 @@ def test_receipts_initial_sync(self) -> None: # We can see user1 and user2 read receipts self.assertIncludes( response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][ - event_response1["event_id"] + room1_event_response2["event_id"] ][ReceiptTypes.READ].keys(), {user1_id, user2_id}, exact=True, @@ -6415,7 +6448,7 @@ def test_receipts_initial_sync(self) -> None: # private read receipts self.assertIncludes( response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][ - event_response1["event_id"] + room1_event_response2["event_id"] ] .get(ReceiptTypes.READ_PRIVATE, {}) .keys(), @@ -6423,10 +6456,18 @@ def test_receipts_initial_sync(self) -> None: exact=True, ) + # We shouldn't see receipts for event2 since it wasn't in the timeline and this is an initial sync + self.assertIsNone( + response_body["extensions"]["receipts"]["rooms"][room_id1]["content"].get( + room1_event_response1["event_id"] + ) + ) + def test_receipts_incremental_sync(self) -> None: """ - On incremental sync, we return all receipts for a given room but only for - rooms that we request and are being returned in the Sliding Sync response. + On incremental sync, we return all receipts in the token range for a given room + but only for rooms that we request and are being returned in the Sliding Sync + response. """ user1_id = self.register_user("user1", "pass") @@ -6440,11 +6481,13 @@ def test_receipts_incremental_sync(self) -> None: room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) self.helper.join(room_id1, user1_id, tok=user1_tok) self.helper.join(room_id1, user3_id, tok=user3_tok) - event_response1 = self.helper.send(room_id1, body="new event", tok=user2_tok) + room1_event_response1 = self.helper.send( + room_id1, body="new event2", tok=user2_tok + ) # User2 reads the last event (before the `from_token`) channel = self.make_request( "POST", - f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{event_response1['event_id']}", + f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{room1_event_response1['event_id']}", {}, access_token=user2_tok, ) @@ -6453,11 +6496,13 @@ def test_receipts_incremental_sync(self) -> None: # Create room2 room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok) self.helper.join(room_id2, user1_id, tok=user1_tok) - event_response2 = self.helper.send(room_id2, body="new event", tok=user2_tok) + room2_event_response1 = self.helper.send( + room_id2, body="new event2", tok=user2_tok + ) # User1 reads the last event (before the `from_token`) channel = self.make_request( "POST", - f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ}/{event_response2['event_id']}", + f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ}/{room2_event_response1['event_id']}", {}, access_token=user1_tok, ) @@ -6467,7 +6512,9 @@ def test_receipts_incremental_sync(self) -> None: room_id3 = self.helper.create_room_as(user2_id, tok=user2_tok) self.helper.join(room_id3, user1_id, tok=user1_tok) self.helper.join(room_id3, user3_id, tok=user3_tok) - event_response3 = self.helper.send(room_id3, body="new event", tok=user2_tok) + room3_event_response1 = self.helper.send( + room_id3, body="new event", tok=user2_tok + ) # Create room4 room_id4 = self.helper.create_room_as(user2_id, tok=user2_tok) @@ -6513,7 +6560,7 @@ def test_receipts_incremental_sync(self) -> None: # User1 reads room1 channel = self.make_request( "POST", - f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{event_response1['event_id']}", + f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{room1_event_response1['event_id']}", {}, access_token=user1_tok, ) @@ -6521,7 +6568,7 @@ def test_receipts_incremental_sync(self) -> None: # User1 privately reads room2 channel = self.make_request( "POST", - f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ_PRIVATE}/{event_response2['event_id']}", + f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ_PRIVATE}/{room2_event_response1['event_id']}", {}, access_token=user1_tok, ) @@ -6529,7 +6576,7 @@ def test_receipts_incremental_sync(self) -> None: # User3 reads room3 channel = self.make_request( "POST", - f"/rooms/{room_id3}/receipt/{ReceiptTypes.READ}/{event_response3['event_id']}", + f"/rooms/{room_id3}/receipt/{ReceiptTypes.READ}/{room3_event_response1['event_id']}", {}, access_token=user3_tok, ) @@ -6558,7 +6605,7 @@ def test_receipts_incremental_sync(self) -> None: # We only see that user1 has read something in room1 since the `from_token` self.assertIncludes( response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][ - event_response1["event_id"] + room1_event_response1["event_id"] ][ReceiptTypes.READ].keys(), {user1_id}, exact=True, @@ -6567,13 +6614,23 @@ def test_receipts_incremental_sync(self) -> None: # others' private read receipts self.assertIncludes( response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][ - event_response1["event_id"] + room1_event_response1["event_id"] ] .get(ReceiptTypes.READ_PRIVATE, {}) .keys(), set(), exact=True, ) + # No events in the timeline since they were sent before the `from_token` + self.assertIncludes( + { + event["event_id"] + for event in response_body["rooms"][room_id1].get("timeline", []) + }, + set(), + exact=True, + message=str(response_body["rooms"][room_id1]), + ) # Check room3: # @@ -6585,7 +6642,7 @@ def test_receipts_incremental_sync(self) -> None: # We only see that user3 has read something in room1 since the `from_token` self.assertIncludes( response_body["extensions"]["receipts"]["rooms"][room_id3]["content"][ - event_response3["event_id"] + room3_event_response1["event_id"] ][ReceiptTypes.READ].keys(), {user3_id}, exact=True, @@ -6594,13 +6651,121 @@ def test_receipts_incremental_sync(self) -> None: # others' private read receipts self.assertIncludes( response_body["extensions"]["receipts"]["rooms"][room_id3]["content"][ - event_response3["event_id"] + room3_event_response1["event_id"] ] .get(ReceiptTypes.READ_PRIVATE, {}) .keys(), set(), exact=True, ) + # No events in the timeline since they were sent before the `from_token` + self.assertIncludes( + { + event["event_id"] + for event in response_body["rooms"][room_id3].get("timeline", []) + }, + set(), + exact=True, + message=str(response_body["rooms"][room_id3]), + ) + + def test_receipts_incremental_sync_all_live_receipts(self) -> None: + """ + On incremental sync, we return all receipts in the token range for a given room + even if they are not in the timeline. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # Create room1 + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + sync_body = { + "lists": {}, + "room_subscriptions": { + room_id1: { + "required_state": [], + # The timeline will only include event2 + "timeline_limit": 1, + }, + }, + "extensions": { + "receipts": { + "enabled": True, + "rooms": [room_id1], + } + }, + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + room1_event_response1 = self.helper.send( + room_id1, body="new event1", tok=user2_tok + ) + room1_event_response2 = self.helper.send( + room_id1, body="new event2", tok=user2_tok + ) + + # User1 reads event1 + channel = self.make_request( + "POST", + f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{room1_event_response1['event_id']}", + {}, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + # User2 reads event2 + channel = self.make_request( + "POST", + f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{room1_event_response2['event_id']}", + {}, + access_token=user2_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # Make an incremental Sliding Sync request with the receipts extension enabled + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + # We should see room1 because it has receipts in the token range + self.assertIncludes( + response_body["extensions"]["receipts"].get("rooms").keys(), + {room_id1}, + exact=True, + ) + # Sanity check that it's the correct ephemeral event type + self.assertEqual( + response_body["extensions"]["receipts"]["rooms"][room_id1]["type"], + EduTypes.RECEIPT, + ) + # We should see all receipts in the token range regardless of whether the events + # are in the timeline + self.assertIncludes( + response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][ + room1_event_response1["event_id"] + ][ReceiptTypes.READ].keys(), + {user1_id}, + exact=True, + ) + self.assertIncludes( + response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][ + room1_event_response2["event_id"] + ][ReceiptTypes.READ].keys(), + {user2_id}, + exact=True, + ) + # Only the latest event in the timeline because the `timeline_limit` is 1 + self.assertIncludes( + { + event["event_id"] + for event in response_body["rooms"][room_id1].get("timeline", []) + }, + {room1_event_response2["event_id"]}, + exact=True, + message=str(response_body["rooms"][room_id1]), + ) def test_wait_for_new_data(self) -> None: """ From e3c0a600aa9fafce52be3eb2bcae6e8c4a25243d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 29 Jul 2024 22:19:57 -0500 Subject: [PATCH 13/14] Remove extra newline --- synapse/handlers/sliding_sync.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 26153c46b3e..b5f59c4c16f 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -2242,7 +2242,6 @@ async def get_receipts_extension_response( room_id_to_receipt_map: Dict[str, JsonMapping] = {} if len(relevant_room_ids) > 0: - receipt_source = self.event_sources.sources.receipt receipts, _ = await receipt_source.get_new_events( user=sync_config.user, From 8e8341dc3d1056cb1ffa19fbcb4a1a00390d042a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Jul 2024 11:21:01 -0500 Subject: [PATCH 14/14] Fix using the correct room list after #17479 merged See https://github.com/element-hq/synapse/pull/17489#discussion_r1694902525 Fixes after https://github.com/element-hq/synapse/pull/17479 merged --- synapse/handlers/sliding_sync.py | 25 ++++++++++++++----------- tests/rest/client/test_sync.py | 20 ++------------------ 2 files changed, 16 insertions(+), 29 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 778dddb4001..7a734f6712f 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -494,8 +494,7 @@ async def current_sync_for_user( # Assemble sliding window lists lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} - # Keep track of the rooms that we're going to display and need to fetch more - # info about + # Keep track of the rooms that we can display and need to fetch more info about relevant_room_map: Dict[str, RoomSyncConfig] = {} if has_lists and sync_config.lists is not None: sync_room_map = await self.filter_rooms_relevant_for_sync( @@ -623,6 +622,8 @@ async def current_sync_for_user( # Filter out rooms that haven't received updates and we've sent down # previously. + # Keep track of the rooms that we're going to display and need to fetch more info about + relevant_rooms_to_send_map = relevant_room_map if from_token: rooms_should_send = set() @@ -660,7 +661,7 @@ async def current_sync_for_user( relevant_room_map.keys(), from_token.stream_token.room_key ) rooms_should_send.update(rooms_that_have_updates) - relevant_room_map = { + relevant_rooms_to_send_map = { room_id: room_sync_config for room_id, room_sync_config in relevant_room_map.items() if room_id in rooms_should_send @@ -672,7 +673,7 @@ async def handle_room(room_id: str) -> None: room_sync_result = await self.get_room_sync_data( sync_config=sync_config, room_id=room_id, - room_sync_config=relevant_room_map[room_id], + room_sync_config=relevant_rooms_to_send_map[room_id], room_membership_for_user_at_to_token=room_membership_for_user_map[ room_id ], @@ -684,17 +685,19 @@ async def handle_room(room_id: str) -> None: if room_sync_result or not from_token: rooms[room_id] = room_sync_result - if relevant_room_map: + if relevant_rooms_to_send_map: with start_active_span("sliding_sync.generate_room_entries"): - await concurrently_execute(handle_room, relevant_room_map, 10) + await concurrently_execute(handle_room, relevant_rooms_to_send_map, 10) extensions = await self.get_extensions_response( sync_config=sync_config, actual_lists=lists, - # TODO: Once https://github.com/element-hq/synapse/pull/17479 merges, this - # will need to be updated to make sure it includes everything before the - # pre-filter on `relevant_room_map`. - actual_room_ids=set(rooms.keys()), + # We're purposely using `relevant_room_map` instead of + # `relevant_rooms_to_send_map` here. This needs to be all room_ids we could + # send regardless of whether they have an event update or not. The + # extensions care about more than just normal events in the rooms (like + # account data, read receipts, typing indicators, to-device messages, etc). + actual_room_ids=set(relevant_room_map.keys()), actual_room_response_map=rooms, from_token=from_token, to_token=to_token, @@ -704,7 +707,7 @@ async def handle_room(room_id: str) -> None: connection_position = await self.connection_store.record_rooms( sync_config=sync_config, from_token=from_token, - sent_room_ids=relevant_room_map.keys(), + sent_room_ids=relevant_rooms_to_send_map.keys(), # TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids` unsent_room_ids=[], ) diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 9c3426b02cb..1184adde70f 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -6657,15 +6657,7 @@ def test_receipts_incremental_sync(self) -> None: exact=True, ) # No events in the timeline since they were sent before the `from_token` - self.assertIncludes( - { - event["event_id"] - for event in response_body["rooms"][room_id1].get("timeline", []) - }, - set(), - exact=True, - message=str(response_body["rooms"][room_id1]), - ) + self.assertNotIn(room_id1, response_body["rooms"]) # Check room3: # @@ -6694,15 +6686,7 @@ def test_receipts_incremental_sync(self) -> None: exact=True, ) # No events in the timeline since they were sent before the `from_token` - self.assertIncludes( - { - event["event_id"] - for event in response_body["rooms"][room_id3].get("timeline", []) - }, - set(), - exact=True, - message=str(response_body["rooms"][room_id3]), - ) + self.assertNotIn(room_id3, response_body["rooms"]) def test_receipts_incremental_sync_all_live_receipts(self) -> None: """