From 7ff63f75fd3fc2e9555d52fb7bf2abfe5940f350 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Oct 2020 14:35:00 +0000 Subject: [PATCH 01/10] Properly type EventsStreamEventRow --- synapse/replication/tcp/streams/events.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index 82e9e0d64ece..1ace7c6c3c20 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -15,7 +15,7 @@ # limitations under the License. import heapq from collections.abc import Iterable -from typing import List, Tuple, Type +from typing import List, Tuple, Type, Optional import attr @@ -81,12 +81,12 @@ def from_data(cls, data): class EventsStreamEventRow(BaseEventsStreamRow): TypeId = "ev" - event_id = attr.ib() # str - room_id = attr.ib() # str - type = attr.ib() # str - state_key = attr.ib() # str, optional - redacts = attr.ib() # str, optional - relates_to = attr.ib() # str, optional + event_id = attr.ib(type=str) + room_id = attr.ib(type=str) + type = attr.ib(type=str) + state_key = attr.ib(type=Optional[str]) + redacts = attr.ib(type=Optional[str]) + relates_to = attr.ib(type=Optional[str]) @attr.s(slots=True, frozen=True) From 040a8daa3c53ccbded892add3dec50ddccd08faf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Oct 2020 14:47:24 +0000 Subject: [PATCH 02/10] Include membership in EventsStreamEventRow --- synapse/replication/tcp/streams/events.py | 8 ++++++-- synapse/storage/databases/main/events_worker.py | 6 ++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index 1ace7c6c3c20..2892a314f6e9 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -15,12 +15,15 @@ # limitations under the License. import heapq from collections.abc import Iterable -from typing import List, Tuple, Type, Optional +from typing import TYPE_CHECKING, List, Optional, Tuple, Type import attr from ._base import Stream, StreamUpdateResult, Token +if TYPE_CHECKING: + from synapse.server import HomeServer + """Handling of the 'events' replication stream This stream contains rows of various types. Each row therefore contains a 'type' @@ -87,6 +90,7 @@ class EventsStreamEventRow(BaseEventsStreamRow): state_key = attr.ib(type=Optional[str]) redacts = attr.ib(type=Optional[str]) relates_to = attr.ib(type=Optional[str]) + membership = attr.ib(type=Optional[str]) @attr.s(slots=True, frozen=True) @@ -113,7 +117,7 @@ class EventsStream(Stream): NAME = "events" - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self._store = hs.get_datastore() super().__init__( hs.get_instance_name(), diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 6e7f16f39c05..6c2e6153b2fc 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1065,11 +1065,12 @@ async def get_all_new_forward_event_rows( def get_all_new_forward_event_rows(txn): sql = ( "SELECT e.stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts, relates_to_id" + " state_key, redacts, relates_to_id, membership" " FROM events AS e" " LEFT JOIN redactions USING (event_id)" " LEFT JOIN state_events USING (event_id)" " LEFT JOIN event_relations USING (event_id)" + " LEFT JOIN room_memberships USING (event_id)" " WHERE ? < stream_ordering AND stream_ordering <= ?" " AND instance_name = ?" " ORDER BY stream_ordering ASC" @@ -1100,12 +1101,13 @@ async def get_ex_outlier_stream_rows( def get_ex_outlier_stream_rows_txn(txn): sql = ( "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts, relates_to_id" + " state_key, redacts, relates_to_id, membership" " FROM events AS e" " INNER JOIN ex_outlier_stream AS out USING (event_id)" " LEFT JOIN redactions USING (event_id)" " LEFT JOIN state_events USING (event_id)" " LEFT JOIN event_relations USING (event_id)" + " LEFT JOIN room_memberships USING (event_id)" " WHERE ? < event_stream_ordering" " AND event_stream_ordering <= ?" " AND out.instance_name = ?" From 6df1d7046dae50535e348a3a4b9fe274414804e6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Oct 2020 14:50:35 +0000 Subject: [PATCH 03/10] Include rejection state in EventsStreamEventRow --- synapse/replication/tcp/streams/events.py | 1 + synapse/storage/databases/main/events_worker.py | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index 2892a314f6e9..86a62b71eb87 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -91,6 +91,7 @@ class EventsStreamEventRow(BaseEventsStreamRow): redacts = attr.ib(type=Optional[str]) relates_to = attr.ib(type=Optional[str]) membership = attr.ib(type=Optional[str]) + rejected = attr.ib(type=bool) @attr.s(slots=True, frozen=True) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 6c2e6153b2fc..9884607c49c2 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1065,12 +1065,13 @@ async def get_all_new_forward_event_rows( def get_all_new_forward_event_rows(txn): sql = ( "SELECT e.stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts, relates_to_id, membership" + " state_key, redacts, relates_to_id, membership, rej.reason IS NOT NULL" " FROM events AS e" " LEFT JOIN redactions USING (event_id)" " LEFT JOIN state_events USING (event_id)" " LEFT JOIN event_relations USING (event_id)" " LEFT JOIN room_memberships USING (event_id)" + " LEFT JOIN rejections USING (event_id)" " WHERE ? < stream_ordering AND stream_ordering <= ?" " AND instance_name = ?" " ORDER BY stream_ordering ASC" @@ -1101,13 +1102,14 @@ async def get_ex_outlier_stream_rows( def get_ex_outlier_stream_rows_txn(txn): sql = ( "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts, relates_to_id, membership" + " state_key, redacts, relates_to_id, membership, rej.reason IS NOT NULL" " FROM events AS e" " INNER JOIN ex_outlier_stream AS out USING (event_id)" " LEFT JOIN redactions USING (event_id)" " LEFT JOIN state_events USING (event_id)" " LEFT JOIN event_relations USING (event_id)" " LEFT JOIN room_memberships USING (event_id)" + " LEFT JOIN rejections USING (event_id)" " WHERE ? < event_stream_ordering" " AND event_stream_ordering <= ?" " AND out.instance_name = ?" From cd453d0d7fef9df2029f3769bfe3f965a83e2720 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Oct 2020 15:01:05 +0000 Subject: [PATCH 04/10] Shuffle notifier to not store event --- synapse/notifier.py | 44 ++++++++++++++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index eb56b26f219c..967f561a4614 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -28,6 +28,7 @@ Union, ) +import attr from prometheus_client import Counter from twisted.internet import defer @@ -173,6 +174,17 @@ def __bool__(self): return bool(self.events) +@attr.s(slots=True, frozen=True) +class _PendingRoomEventEntry: + event_pos = attr.ib(type=PersistedEventPosition) + extra_users = attr.ib(type=Collection[UserID]) + + room_id = attr.ib(type=str) + type = attr.ib(type=str) + state_key = attr.ib(type=Optional[str]) + membership = attr.ib(type=Optional[str]) + + class Notifier: """ This class is responsible for notifying any listeners when there are new events available for it. @@ -190,9 +202,7 @@ def __init__(self, hs: "synapse.server.HomeServer"): self.storage = hs.get_storage() self.event_sources = hs.get_event_sources() self.store = hs.get_datastore() - self.pending_new_room_events = ( - [] - ) # type: List[Tuple[PersistedEventPosition, EventBase, Collection[UserID]]] + self.pending_new_room_events = [] # type: List[_PendingRoomEventEntry] # Called when there are new things to stream over replication self.replication_callbacks = [] # type: List[Callable[[], None]] @@ -266,7 +276,16 @@ def on_new_room_event( until all previous events have been persisted before notifying the client streams. """ - self.pending_new_room_events.append((event_pos, event, extra_users)) + self.pending_new_room_events.append( + _PendingRoomEventEntry( + event_pos=event_pos, + extra_users=extra_users, + room_id=event.room_id, + type=event.type, + state_key=event.get("state_key"), + membership=event.get("membership"), + ) + ) self._notify_pending_new_room_events(max_room_stream_token) self.notify_replication() @@ -284,18 +303,19 @@ def _notify_pending_new_room_events(self, max_room_stream_token: RoomStreamToken users = set() # type: Set[UserID] rooms = set() # type: Set[str] - for event_pos, event, extra_users in pending: - if event_pos.persisted_after(max_room_stream_token): - self.pending_new_room_events.append((event_pos, event, extra_users)) + for entry in pending: + if entry.event_pos.persisted_after(max_room_stream_token): + self.pending_new_room_events.append(entry) else: if ( - event.type == EventTypes.Member - and event.membership == Membership.JOIN + entry.type == EventTypes.Member + and entry.membership == Membership.JOIN + and entry.state_key ): - self._user_joined_room(event.state_key, event.room_id) + self._user_joined_room(entry.state_key, entry.room_id) - users.update(extra_users) - rooms.add(event.room_id) + users.update(entry.extra_users) + rooms.add(entry.room_id) if users or rooms: self.on_new_event( From 25973e23fd5720b308b3cb2ef8ff12ac6903001b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Oct 2020 15:27:41 +0000 Subject: [PATCH 05/10] fixup! Include rejection state in EventsStreamEventRow --- synapse/storage/databases/main/events_worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 9884607c49c2..810f8e0bbd31 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1065,7 +1065,7 @@ async def get_all_new_forward_event_rows( def get_all_new_forward_event_rows(txn): sql = ( "SELECT e.stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts, relates_to_id, membership, rej.reason IS NOT NULL" + " state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL" " FROM events AS e" " LEFT JOIN redactions USING (event_id)" " LEFT JOIN state_events USING (event_id)" @@ -1102,7 +1102,7 @@ async def get_ex_outlier_stream_rows( def get_ex_outlier_stream_rows_txn(txn): sql = ( "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts, relates_to_id, membership, rej.reason IS NOT NULL" + " state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL" " FROM events AS e" " INNER JOIN ex_outlier_stream AS out USING (event_id)" " LEFT JOIN redactions USING (event_id)" From 7cbbc4890f21acea976d32c5dd148d3d941d2adb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Oct 2020 15:28:25 +0000 Subject: [PATCH 06/10] Don't pull event from DB when handling replication traffic --- synapse/notifier.py | 32 ++++++++++++++++++++++++++----- synapse/replication/tcp/client.py | 21 ++++++++++---------- 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 967f561a4614..c9d3398f2791 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -265,7 +265,29 @@ def on_new_room_event( max_room_stream_token: RoomStreamToken, extra_users: Collection[UserID] = [], ): - """ Used by handlers to inform the notifier something has happened + """Unrwaps event and calls `on_new_room_event_args`. + """ + self.on_new_room_event_args( + event_pos=event_pos, + room_id=event.room_id, + event_type=event.type, + state_key=event.get("state_key"), + membership=event.get("membership"), + max_room_stream_token=max_room_stream_token, + extra_users=extra_users, + ) + + def on_new_room_event_args( + self, + room_id: str, + event_type: str, + state_key: Optional[str], + membership: Optional[str], + event_pos: PersistedEventPosition, + max_room_stream_token: RoomStreamToken, + extra_users: Collection[UserID] = [], + ): + """Used by handlers to inform the notifier something has happened in the room, room event wise. This triggers the notifier to wake up any listeners that are @@ -280,10 +302,10 @@ def on_new_room_event( _PendingRoomEventEntry( event_pos=event_pos, extra_users=extra_users, - room_id=event.room_id, - type=event.type, - state_key=event.get("state_key"), - membership=event.get("membership"), + room_id=room_id, + type=event_type, + state_key=state_key, + membership=membership, ) ) self._notify_pending_new_room_events(max_room_stream_token) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index e27ee216f076..c0b54195d41f 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -141,21 +141,22 @@ async def on_rdata( if row.type != EventsStreamEventRow.TypeId: continue assert isinstance(row, EventsStreamRow) - - event = await self.store.get_event( - row.data.event_id, allow_rejected=True - ) - if event.rejected_reason: - continue + assert isinstance(row.data, EventsStreamEventRow) extra_users = () # type: Tuple[UserID, ...] - if event.type == EventTypes.Member: - extra_users = (UserID.from_string(event.state_key),) + if row.data.type == EventTypes.Member and row.data.state_key: + extra_users = (UserID.from_string(row.data.state_key),) max_token = self.store.get_room_max_token() event_pos = PersistedEventPosition(instance_name, token) - self.notifier.on_new_room_event( - event, event_pos, max_token, extra_users + self.notifier.on_new_room_event_args( + event_pos=event_pos, + max_room_stream_token=max_token, + extra_users=extra_users, + room_id=row.data.room_id, + event_type=row.data.type, + state_key=row.data.state_key, + membership=row.data.membership, ) # Notify any waiting deferreds. The list is ordered by position so we From 75ff65938fae52f5846d9f5e388678c6ad348f71 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Oct 2020 15:31:30 +0000 Subject: [PATCH 07/10] Newsfile --- changelog.d/8669.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8669.misc diff --git a/changelog.d/8669.misc b/changelog.d/8669.misc new file mode 100644 index 000000000000..5228105cd313 --- /dev/null +++ b/changelog.d/8669.misc @@ -0,0 +1 @@ +Don't pull event from DB when handling replication traffic. From a8628c7cef92ff63f16d798f55e9c895b3d036a8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Oct 2020 16:46:53 +0000 Subject: [PATCH 08/10] Fix typo Co-authored-by: Patrick Cloke --- synapse/notifier.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index c9d3398f2791..05ae5c213ade 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -265,7 +265,7 @@ def on_new_room_event( max_room_stream_token: RoomStreamToken, extra_users: Collection[UserID] = [], ): - """Unrwaps event and calls `on_new_room_event_args`. + """Unwraps event and calls `on_new_room_event_args`. """ self.on_new_room_event_args( event_pos=event_pos, From ece9a53fa39118c16803d584737df694d2d3a420 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Oct 2020 17:16:26 +0000 Subject: [PATCH 09/10] Fix tests --- synapse/notifier.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 05ae5c213ade..a17352ef46aa 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -272,7 +272,7 @@ def on_new_room_event( room_id=event.room_id, event_type=event.type, state_key=event.get("state_key"), - membership=event.get("membership"), + membership=event.content.get("membership"), max_room_stream_token=max_room_stream_token, extra_users=extra_users, ) From 495665a6b9ca3ef11c983ea775d6db822b62fb2d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 28 Oct 2020 11:46:34 +0000 Subject: [PATCH 10/10] Ignore rejected events --- synapse/replication/tcp/client.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index c0b54195d41f..2618eb1e536c 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -143,6 +143,9 @@ async def on_rdata( assert isinstance(row, EventsStreamRow) assert isinstance(row.data, EventsStreamEventRow) + if row.data.rejected: + continue + extra_users = () # type: Tuple[UserID, ...] if row.data.type == EventTypes.Member and row.data.state_key: extra_users = (UserID.from_string(row.data.state_key),)