From e22c941c6016528b303ac5fdb595651a7f9d609a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 13 Oct 2021 08:09:57 -0400 Subject: [PATCH 1/9] Add a thread relation type. --- synapse/api/constants.py | 1 + synapse/storage/databases/main/events.py | 1 + 2 files changed, 2 insertions(+) diff --git a/synapse/api/constants.py b/synapse/api/constants.py index a31f037748a3..a33ac341614a 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -176,6 +176,7 @@ class RelationTypes: ANNOTATION = "m.annotation" REPLACE = "m.replace" REFERENCE = "m.reference" + THREAD = "io.element.thread" class LimitBlockingTypes: diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 37439f85628e..1b26dec9d1e9 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1710,6 +1710,7 @@ def _handle_event_relations(self, txn, event): RelationTypes.ANNOTATION, RelationTypes.REFERENCE, RelationTypes.REPLACE, + RelationTypes.THREAD, ): # Unknown relation type return From 4f64841ab99a9e1585767d0196eaf33e60255eb0 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 13 Oct 2021 11:51:23 -0400 Subject: [PATCH 2/9] Only include an aggregation key if one is provided. --- synapse/rest/client/relations.py | 3 ++- tests/rest/client/test_relations.py | 11 ++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/synapse/rest/client/relations.py b/synapse/rest/client/relations.py index d695c18be2a4..58f669907399 100644 --- a/synapse/rest/client/relations.py +++ b/synapse/rest/client/relations.py @@ -128,9 +128,10 @@ async def on_PUT_or_POST( content["m.relates_to"] = { "event_id": parent_id, - "key": aggregation_key, "rel_type": relation_type, } + if aggregation_key is not None: + content["m.relates_to"]["key"] = aggregation_key event_dict = { "type": event_type, diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index 3c7d49f0b464..d6319d21139a 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -101,10 +101,10 @@ def test_deny_double_react(self): def test_basic_paginate_relations(self): """Tests that calling pagination API correctly the latest relations.""" - channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction") + channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", "a") self.assertEquals(200, channel.code, channel.json_body) - channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction") + channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", "b") self.assertEquals(200, channel.code, channel.json_body) annotation_id = channel.json_body["event_id"] @@ -141,8 +141,10 @@ def test_repeated_paginate_relations(self): """ expected_event_ids = [] - for _ in range(10): - channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction") + for idx in range(10): + channel = self._send_relation( + RelationTypes.ANNOTATION, "m.reaction", chr(ord("a") + idx) + ) self.assertEquals(200, channel.code, channel.json_body) expected_event_ids.append(channel.json_body["event_id"]) @@ -559,7 +561,6 @@ def test_edit_reply(self): { "m.relates_to": { "event_id": self.parent_id, - "key": None, "rel_type": "m.reference", } }, From 02f021b0f4bc3a8d91bd307a09dbf7a055f4797c Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 7 Oct 2021 14:23:04 -0400 Subject: [PATCH 3/9] Include a thread-summary for bundled relations. --- synapse/events/utils.py | 17 ++++++ synapse/storage/databases/main/events.py | 3 + synapse/storage/databases/main/relations.py | 63 ++++++++++++++++++++- tests/rest/client/test_relations.py | 29 +++++++++- 4 files changed, 110 insertions(+), 2 deletions(-) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 23bd24d96394..e531d58ab3ea 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -464,6 +464,23 @@ async def serialize_event( "sender": edit.sender, } + # If this event is the start of a thread, include a summary of the replies. + ( + thread_count, + thread_senders, + latest_thread_event, + ) = await self.store.get_thread_summary(event_id) + if latest_thread_event: + r = serialized_event["unsigned"].setdefault("m.relations", {}) + r[RelationTypes.THREAD] = { + # Don't bundle aggregations as this could recurse forever. + "latest_event": await self.serialize_event( + latest_thread_event, time_now, bundle_aggregations=False + ), + "senders": thread_senders, + "count": thread_count, + } + return serialized_event async def serialize_events( diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 1b26dec9d1e9..8d9086ecf0a1 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1741,6 +1741,9 @@ def _handle_event_relations(self, txn, event): if rel_type == RelationTypes.REPLACE: txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,)) + if rel_type == RelationTypes.THREAD: + txn.call_after(self.store.get_thread_summary.invalidate, (parent_id,)) + def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase): """Handles keeping track of insertion events and edges/connections. Part of MSC2716. diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 2bbf6d6a95ed..614c0a947898 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import Optional +from typing import Iterable, Optional, Tuple import attr @@ -269,6 +269,67 @@ def _get_applicable_edit_txn(txn): return await self.get_event(edit_id, allow_none=True) + @cached() + async def get_thread_summary( + self, event_id: str + ) -> Tuple[int, Iterable[str], Optional[EventBase]]: + """Get the number of threaded replies, the senders of those replies, and + the latest reply (if any) for the given event. + + Args: + event_id: The original event ID + + Returns: + The number of items in the thread and the most recent response, if any. + """ + + def _get_thread_summary_txn(txn) -> Tuple[int, Iterable[str], Optional[str]]: + # Fetch the count of threaded events and the latest event ID. + # TODO Should this only allow m.room.message events. + sql = """ + SELECT COUNT(DISTINCT event_id), event_id + FROM event_relations + INNER JOIN events USING (event_id) + WHERE + relates_to_id = ? + AND relation_type = ? + ORDER BY MAX(stream_ordering) + LIMIT 1 + """ + + txn.execute(sql, (event_id, RelationTypes.THREAD)) + row = txn.fetchone() + if row is None: + return 0, (), None + + count, latest_event_id = row + + # Fetch the threaded event senders. + sql = """ + SELECT DISTINCT sender + FROM event_relations + INNER JOIN events USING (event_id) + WHERE + relates_to_id = ? + AND relation_type = ? + """ + txn.execute(sql, (event_id, RelationTypes.THREAD)) + + # There must be at least one result. + senders = [row[0] for row in txn.fetchall()] + + return count, senders, latest_event_id + + count, senders, latest_event_id = await self.db_pool.runInteraction( + "get_thread_summary", _get_thread_summary_txn + ) + + latest_event = None + if latest_event_id: + latest_event = await self.get_event(latest_event_id, allow_none=True) + + return count, senders, latest_event + async def has_user_annotated_event( self, parent_id: str, event_type: str, aggregation_key: str, sender: str ) -> bool: diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index d6319d21139a..b6194a38a3b5 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -389,7 +389,7 @@ def test_aggregation_must_be_annotation(self): self.assertEquals(400, channel.code, channel.json_body) def test_aggregation_get_event(self): - """Test that annotations and references get correctly bundled when + """Test that annotations, references, and threads get correctly bundled when getting the parent event. """ @@ -412,6 +412,13 @@ def test_aggregation_get_event(self): self.assertEquals(200, channel.code, channel.json_body) reply_2 = channel.json_body["event_id"] + channel = self._send_relation(RelationTypes.THREAD, "m.room.test") + self.assertEquals(200, channel.code, channel.json_body) + + channel = self._send_relation(RelationTypes.THREAD, "m.room.test") + self.assertEquals(200, channel.code, channel.json_body) + thread_2 = channel.json_body["event_id"] + channel = self.make_request( "GET", "/rooms/%s/event/%s" % (self.room, self.parent_id), @@ -431,6 +438,26 @@ def test_aggregation_get_event(self): RelationTypes.REFERENCE: { "chunk": [{"event_id": reply_1}, {"event_id": reply_2}] }, + RelationTypes.THREAD: { + "count": 2, + "senders": [self.user_id], + "latest_event": { + "age": 100, + "content": { + "m.relates_to": { + "event_id": self.parent_id, + "rel_type": RelationTypes.THREAD, + } + }, + "event_id": thread_2, + "origin_server_ts": 1600, + "room_id": self.room, + "sender": self.user_id, + "type": "m.room.test", + "unsigned": {"age": 100}, + "user_id": self.user_id, + }, + }, }, ) From cf387c293382a7149b650858bb402e683902728e Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 14 Oct 2021 10:37:39 -0400 Subject: [PATCH 4/9] Add an experimental configuration flag. --- synapse/config/experimental.py | 3 +++ synapse/events/utils.py | 32 +++++++++++++++-------------- tests/rest/client/test_relations.py | 1 + 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 7b0381c06a27..7a76716e6a2a 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -41,3 +41,6 @@ def read_config(self, config: JsonDict, **kwargs): # MSC3266 (room summary api) self.msc3266_enabled: bool = experimental.get("msc3266_enabled", False) + + # MSC3440 (thread relation) + self.msc3440_enabled: bool = experimental.get("msc3440_enabled", False) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index e531d58ab3ea..bfdea8f5c64a 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -388,6 +388,7 @@ def __init__(self, hs: "HomeServer"): self.experimental_msc1849_support_enabled = ( hs.config.server.experimental_msc1849_support_enabled ) + self._msc3440_enabled = hs.config.experimental.msc3440_enabled async def serialize_event( self, @@ -465,21 +466,22 @@ async def serialize_event( } # If this event is the start of a thread, include a summary of the replies. - ( - thread_count, - thread_senders, - latest_thread_event, - ) = await self.store.get_thread_summary(event_id) - if latest_thread_event: - r = serialized_event["unsigned"].setdefault("m.relations", {}) - r[RelationTypes.THREAD] = { - # Don't bundle aggregations as this could recurse forever. - "latest_event": await self.serialize_event( - latest_thread_event, time_now, bundle_aggregations=False - ), - "senders": thread_senders, - "count": thread_count, - } + if self._msc3440_enabled: + ( + thread_count, + thread_senders, + latest_thread_event, + ) = await self.store.get_thread_summary(event_id) + if latest_thread_event: + r = serialized_event["unsigned"].setdefault("m.relations", {}) + r[RelationTypes.THREAD] = { + # Don't bundle aggregations as this could recurse forever. + "latest_event": await self.serialize_event( + latest_thread_event, time_now, bundle_aggregations=False + ), + "senders": thread_senders, + "count": thread_count, + } return serialized_event diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index b6194a38a3b5..9673b3eb6760 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -388,6 +388,7 @@ def test_aggregation_must_be_annotation(self): ) self.assertEquals(400, channel.code, channel.json_body) + @unittest.override_config({"experimental_features": {"msc3440_enabled": True}}) def test_aggregation_get_event(self): """Test that annotations, references, and threads get correctly bundled when getting the parent event. From 8ff22d086c0b0be9d12afa3713d164f4c97a1167 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 14 Oct 2021 10:38:58 -0400 Subject: [PATCH 5/9] Newsfragment --- changelog.d/11088.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/11088.feature diff --git a/changelog.d/11088.feature b/changelog.d/11088.feature new file mode 100644 index 000000000000..76b0d280845e --- /dev/null +++ b/changelog.d/11088.feature @@ -0,0 +1 @@ +Experimental support for the thread relation defined in [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440). From 7569579fb684651285fb4caf3dcf6618fd0a1d29 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 14 Oct 2021 13:29:17 -0400 Subject: [PATCH 6/9] Fix queries on Postgres. --- synapse/storage/databases/main/relations.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 614c0a947898..ed8a74b9dbec 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -287,13 +287,13 @@ def _get_thread_summary_txn(txn) -> Tuple[int, Iterable[str], Optional[str]]: # Fetch the count of threaded events and the latest event ID. # TODO Should this only allow m.room.message events. sql = """ - SELECT COUNT(DISTINCT event_id), event_id + SELECT event_id FROM event_relations INNER JOIN events USING (event_id) WHERE relates_to_id = ? AND relation_type = ? - ORDER BY MAX(stream_ordering) + ORDER BY stream_ordering DESC LIMIT 1 """ @@ -302,7 +302,17 @@ def _get_thread_summary_txn(txn) -> Tuple[int, Iterable[str], Optional[str]]: if row is None: return 0, (), None - count, latest_event_id = row + latest_event_id = row[0] + + sql = """ + SELECT COUNT(event_id) + FROM event_relations + WHERE + relates_to_id = ? + AND relation_type = ? + """ + txn.execute(sql, (event_id, RelationTypes.THREAD)) + count = txn.fetchone()[0] # Fetch the threaded event senders. sql = """ From f93f3ad4a3a7e551a1daaec3b6e324b5ddefc1fe Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 19 Oct 2021 08:56:54 -0400 Subject: [PATCH 7/9] Review feedback. --- synapse/storage/databases/main/relations.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index ed8a74b9dbec..18b8afa85031 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -293,7 +293,7 @@ def _get_thread_summary_txn(txn) -> Tuple[int, Iterable[str], Optional[str]]: WHERE relates_to_id = ? AND relation_type = ? - ORDER BY stream_ordering DESC + ORDER BY depth DESC, stream_ordering DESC LIMIT 1 """ @@ -305,7 +305,7 @@ def _get_thread_summary_txn(txn) -> Tuple[int, Iterable[str], Optional[str]]: latest_event_id = row[0] sql = """ - SELECT COUNT(event_id) + SELECT COALESCE(COUNT(event_id), 0) FROM event_relations WHERE relates_to_id = ? From 8b245b061fb1b6e30c8def2ce7234df1ae1c9d6e Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 19 Oct 2021 12:40:09 -0400 Subject: [PATCH 8/9] Do not include the thread senders in the thread. --- synapse/events/utils.py | 2 -- synapse/storage/databases/main/relations.py | 28 ++++++--------------- tests/rest/client/test_relations.py | 1 - 3 files changed, 7 insertions(+), 24 deletions(-) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index a8bdcae6dcd4..6fa631aa1d4d 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -467,7 +467,6 @@ async def serialize_event( if self._msc3440_enabled: ( thread_count, - thread_senders, latest_thread_event, ) = await self.store.get_thread_summary(event_id) if latest_thread_event: @@ -477,7 +476,6 @@ async def serialize_event( "latest_event": await self.serialize_event( latest_thread_event, time_now, bundle_aggregations=False ), - "senders": thread_senders, "count": thread_count, } diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 18b8afa85031..da8d8635d736 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import Iterable, Optional, Tuple +from typing import Optional, Tuple import attr @@ -272,7 +272,7 @@ def _get_applicable_edit_txn(txn): @cached() async def get_thread_summary( self, event_id: str - ) -> Tuple[int, Iterable[str], Optional[EventBase]]: + ) -> Tuple[int, Optional[EventBase]]: """Get the number of threaded replies, the senders of those replies, and the latest reply (if any) for the given event. @@ -283,7 +283,7 @@ async def get_thread_summary( The number of items in the thread and the most recent response, if any. """ - def _get_thread_summary_txn(txn) -> Tuple[int, Iterable[str], Optional[str]]: + def _get_thread_summary_txn(txn) -> Tuple[int, Optional[str]]: # Fetch the count of threaded events and the latest event ID. # TODO Should this only allow m.room.message events. sql = """ @@ -300,7 +300,7 @@ def _get_thread_summary_txn(txn) -> Tuple[int, Iterable[str], Optional[str]]: txn.execute(sql, (event_id, RelationTypes.THREAD)) row = txn.fetchone() if row is None: - return 0, (), None + return 0, None latest_event_id = row[0] @@ -314,23 +314,9 @@ def _get_thread_summary_txn(txn) -> Tuple[int, Iterable[str], Optional[str]]: txn.execute(sql, (event_id, RelationTypes.THREAD)) count = txn.fetchone()[0] - # Fetch the threaded event senders. - sql = """ - SELECT DISTINCT sender - FROM event_relations - INNER JOIN events USING (event_id) - WHERE - relates_to_id = ? - AND relation_type = ? - """ - txn.execute(sql, (event_id, RelationTypes.THREAD)) - - # There must be at least one result. - senders = [row[0] for row in txn.fetchall()] - - return count, senders, latest_event_id + return count, latest_event_id - count, senders, latest_event_id = await self.db_pool.runInteraction( + count, latest_event_id = await self.db_pool.runInteraction( "get_thread_summary", _get_thread_summary_txn ) @@ -338,7 +324,7 @@ def _get_thread_summary_txn(txn) -> Tuple[int, Iterable[str], Optional[str]]: if latest_event_id: latest_event = await self.get_event(latest_event_id, allow_none=True) - return count, senders, latest_event + return count, latest_event async def has_user_annotated_event( self, parent_id: str, event_type: str, aggregation_key: str, sender: str diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index 9673b3eb6760..78c2fb86b983 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -441,7 +441,6 @@ def test_aggregation_get_event(self): }, RelationTypes.THREAD: { "count": 2, - "senders": [self.user_id], "latest_event": { "age": 100, "content": { From 4da5294a23b46737b352206ebe306790e5a74990 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 20 Oct 2021 07:28:37 -0400 Subject: [PATCH 9/9] Review comments. Co-authored-by: Erik Johnston --- synapse/storage/databases/main/relations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index da8d8635d736..40760fbd1b36 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -293,7 +293,7 @@ def _get_thread_summary_txn(txn) -> Tuple[int, Optional[str]]: WHERE relates_to_id = ? AND relation_type = ? - ORDER BY depth DESC, stream_ordering DESC + ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT 1 """