From d6b3feee9dff56a72a2d0c4b440ab1c525debe09 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 23 Nov 2021 19:30:02 -0600 Subject: [PATCH 1/7] Refactor backfilled into specific behavior function arguments (`_persist_events_and_state_updates`) Part of https://github.com/matrix-org/synapse/issues/11300 Call stack: - `_persist_events_and_state_updates` (added `use_negative_stream_ordering`) - `_persist_events_txn` - `_update_room_depths_txn` (added `update_room_forward_stream_ordering`) - `_update_metadata_tables_txn` - `_store_room_members_txn` (added `inhibit_local_membership_updates`) --- synapse/storage/databases/main/events.py | 84 +++++++++++++++++++----- 1 file changed, 66 insertions(+), 18 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 06832221adc2..68c150c1ec3d 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -124,7 +124,9 @@ async def _persist_events_and_state_updates( current_state_for_room: Dict[str, StateMap[str]], state_delta_for_room: Dict[str, DeltaState], new_forward_extremeties: Dict[str, List[str]], - backfilled: bool = False, + *, + use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, ) -> None: """Persist a set of events alongside updates to the current state and forward extremities tables. @@ -137,7 +139,14 @@ async def _persist_events_and_state_updates( room state new_forward_extremities: Map from room_id to list of event IDs that are the new forward extremities of the room. - backfilled + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. This should be set as True + for backfilled events because backfilled events get a negative + stream ordering so they don't come down incremental `/sync`. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. Returns: Resolves when the events have been persisted @@ -159,7 +168,7 @@ async def _persist_events_and_state_updates( # # Note: Multiple instances of this function cannot be in flight at # the same time for the same room. - if backfilled: + if use_negative_stream_ordering: stream_ordering_manager = self._backfill_id_gen.get_next_mult( len(events_and_contexts) ) @@ -176,13 +185,14 @@ async def _persist_events_and_state_updates( "persist_events", self._persist_events_txn, events_and_contexts=events_and_contexts, - backfilled=backfilled, + inhibit_local_membership_updates=inhibit_local_membership_updates, + use_negative_stream_ordering=use_negative_stream_ordering, state_delta_for_room=state_delta_for_room, new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc(len(events_and_contexts)) - if not backfilled: + if stream < 0: # backfilled events have negative stream orderings, so we don't # want to set the event_persisted_position to that. synapse.metrics.event_persisted_position.set( @@ -316,8 +326,10 @@ def _get_prevs_before_rejected_txn(txn, batch): def _persist_events_txn( self, txn: LoggingTransaction, + *, events_and_contexts: List[Tuple[EventBase, EventContext]], - backfilled: bool, + inhibit_local_membership_updates: bool = False, + use_negative_stream_ordering: bool = False, state_delta_for_room: Optional[Dict[str, DeltaState]] = None, new_forward_extremeties: Optional[Dict[str, List[str]]] = None, ): @@ -330,7 +342,14 @@ def _persist_events_txn( Args: txn events_and_contexts: events to persist - backfilled: True if the events were backfilled + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. This should be set as True + for backfilled events because backfilled events get a negative + stream ordering so they don't come down incremental `/sync`. delete_existing True to purge existing table rows for the events from the database. This is useful when retrying due to IntegrityError. @@ -364,7 +383,9 @@ def _persist_events_txn( ) self._update_room_depths_txn( - txn, events_and_contexts=events_and_contexts, backfilled=backfilled + txn, + events_and_contexts=events_and_contexts, + use_negative_stream_ordering=use_negative_stream_ordering, ) # _update_outliers_txn filters out any events which have already been @@ -398,7 +419,7 @@ def _persist_events_txn( txn, events_and_contexts=events_and_contexts, all_events_and_contexts=all_events_and_contexts, - backfilled=backfilled, + inhibit_local_membership_updates=inhibit_local_membership_updates, ) # We call this last as it assumes we've inserted the events into @@ -1200,7 +1221,8 @@ def _update_room_depths_txn( self, txn, events_and_contexts: List[Tuple[EventBase, EventContext]], - backfilled: bool, + *, + use_negative_stream_ordering: bool = False, ): """Update min_depth for each room @@ -1208,13 +1230,21 @@ def _update_room_depths_txn( txn (twisted.enterprise.adbapi.Connection): db connection events_and_contexts (list[(EventBase, EventContext)]): events we are persisting - backfilled (bool): True if the events were backfilled + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. This should be set as True + for backfilled events because backfilled events get a negative + stream ordering so they don't come down incremental `/sync`. """ depth_updates: Dict[str, int] = {} for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids txn.call_after(self.store._invalidate_get_event_cache, event.event_id) - if not backfilled: + # This will update the `stream_ordering` position to mark the latest + # event as the front of the room. This should not be done for + # backfilled events because backfilled events have negative + # stream_ordering and happened in the past so we know that we don't + # need to update the stream_ordering tip for the room. + if not use_negative_stream_ordering: txn.call_after( self.store._events_stream_cache.entity_has_changed, event.room_id, @@ -1427,7 +1457,11 @@ def _store_rejected_events_txn(self, txn, events_and_contexts): return [ec for ec in events_and_contexts if ec[0] not in to_remove] def _update_metadata_tables_txn( - self, txn, events_and_contexts, all_events_and_contexts, backfilled + self, + txn, + events_and_contexts, + all_events_and_contexts, + inhibit_local_membership_updates: bool = False, ): """Update all the miscellaneous tables for new events @@ -1439,7 +1473,10 @@ def _update_metadata_tables_txn( events that we were going to persist. This includes events we've already persisted, etc, that wouldn't appear in events_and_context. - backfilled (bool): True if the events were backfilled + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. """ # Insert all the push actions into the event_push_actions table. @@ -1513,7 +1550,7 @@ def _update_metadata_tables_txn( for event, _ in events_and_contexts if event.type == EventTypes.Member ], - backfilled=backfilled, + inhibit_local_membership_updates=inhibit_local_membership_updates, ) # Insert event_reference_hashes table. @@ -1638,8 +1675,19 @@ def _store_event_reference_hashes_txn(self, txn, events): txn, table="event_reference_hashes", values=vals ) - def _store_room_members_txn(self, txn, events, backfilled): - """Store a room member in the database.""" + def _store_room_members_txn( + self, txn, events, *, inhibit_local_membership_updates: bool = False + ): + """ + Store a room member in the database. + Args: + txn: The transaction to use. + events: List of events to store. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. + """ def non_null_str_or_none(val: Any) -> Optional[str]: return val if isinstance(val, str) and "\u0000" not in val else None @@ -1682,7 +1730,7 @@ def non_null_str_or_none(val: Any) -> Optional[str]: # band membership", like a remote invite or a rejection of a remote invite. if ( self.is_mine_id(event.state_key) - and not backfilled + and not inhibit_local_membership_updates and event.internal_metadata.is_outlier() and event.internal_metadata.is_out_of_band_membership() ): From c138f8cc9ec21d562fe172d234b495edcf932132 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 23 Nov 2021 19:46:32 -0600 Subject: [PATCH 2/7] Connect the backfilled dots --- synapse/storage/persist_events.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 402f134d894b..428d66a617b1 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -583,7 +583,8 @@ async def _persist_event_batch( current_state_for_room=current_state_for_room, state_delta_for_room=state_delta_for_room, new_forward_extremeties=new_forward_extremeties, - backfilled=backfilled, + use_negative_stream_ordering=backfilled, + inhibit_local_membership_updates=backfilled, ) await self._handle_potentially_left_users(potentially_left_users) From 67393b8a5fac0e3decc9cc64e8088ac5a99a63f2 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 23 Nov 2021 19:49:11 -0600 Subject: [PATCH 3/7] Add changelog --- changelog.d/11417.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/11417.misc diff --git a/changelog.d/11417.misc b/changelog.d/11417.misc new file mode 100644 index 000000000000..88dc4722da29 --- /dev/null +++ b/changelog.d/11417.misc @@ -0,0 +1 @@ +Refactor `backfilled` into specific behavior function arguments (`_persist_events_and_state_updates` and downstream calls). From 52e3121f6c863e2c7fd22ed015f4d241fdb3527b Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 23 Nov 2021 23:58:41 -0600 Subject: [PATCH 4/7] More keyword-only arguments higher up which matches existing usage anywy --- synapse/storage/databases/main/events.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 68c150c1ec3d..9847dd1ec27b 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -121,10 +121,10 @@ def __init__( async def _persist_events_and_state_updates( self, events_and_contexts: List[Tuple[EventBase, EventContext]], + *, current_state_for_room: Dict[str, StateMap[str]], state_delta_for_room: Dict[str, DeltaState], new_forward_extremeties: Dict[str, List[str]], - *, use_negative_stream_ordering: bool = False, inhibit_local_membership_updates: bool = False, ) -> None: @@ -1220,8 +1220,8 @@ def _filter_events_and_contexts_for_duplicates( def _update_room_depths_txn( self, txn, - events_and_contexts: List[Tuple[EventBase, EventContext]], *, + events_and_contexts: List[Tuple[EventBase, EventContext]], use_negative_stream_ordering: bool = False, ): """Update min_depth for each room @@ -1459,6 +1459,7 @@ def _store_rejected_events_txn(self, txn, events_and_contexts): def _update_metadata_tables_txn( self, txn, + *, events_and_contexts, all_events_and_contexts, inhibit_local_membership_updates: bool = False, From c83148ecbfbeb7c215483127fbf52dfabe1a8bc4 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 24 Nov 2021 17:34:16 -0600 Subject: [PATCH 5/7] Derive negative stream_ordering from the actual value --- synapse/storage/databases/main/events.py | 24 ++++-------------------- 1 file changed, 4 insertions(+), 20 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 9847dd1ec27b..6c53350799b8 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -186,7 +186,6 @@ async def _persist_events_and_state_updates( self._persist_events_txn, events_and_contexts=events_and_contexts, inhibit_local_membership_updates=inhibit_local_membership_updates, - use_negative_stream_ordering=use_negative_stream_ordering, state_delta_for_room=state_delta_for_room, new_forward_extremeties=new_forward_extremeties, ) @@ -329,7 +328,6 @@ def _persist_events_txn( *, events_and_contexts: List[Tuple[EventBase, EventContext]], inhibit_local_membership_updates: bool = False, - use_negative_stream_ordering: bool = False, state_delta_for_room: Optional[Dict[str, DeltaState]] = None, new_forward_extremeties: Optional[Dict[str, List[str]]] = None, ): @@ -346,10 +344,6 @@ def _persist_events_txn( from being updated by these events. This should be set to True for backfilled events because backfilled events in the past do not affect the current local state. - use_negative_stream_ordering: Whether to start stream_ordering on - the negative side and decrement. This should be set as True - for backfilled events because backfilled events get a negative - stream ordering so they don't come down incremental `/sync`. delete_existing True to purge existing table rows for the events from the database. This is useful when retrying due to IntegrityError. @@ -382,11 +376,7 @@ def _persist_events_txn( events_and_contexts ) - self._update_room_depths_txn( - txn, - events_and_contexts=events_and_contexts, - use_negative_stream_ordering=use_negative_stream_ordering, - ) + self._update_room_depths_txn(txn, events_and_contexts=events_and_contexts) # _update_outliers_txn filters out any events which have already been # persisted, and returns the filtered list. @@ -1220,9 +1210,7 @@ def _filter_events_and_contexts_for_duplicates( def _update_room_depths_txn( self, txn, - *, events_and_contexts: List[Tuple[EventBase, EventContext]], - use_negative_stream_ordering: bool = False, ): """Update min_depth for each room @@ -1230,21 +1218,17 @@ def _update_room_depths_txn( txn (twisted.enterprise.adbapi.Connection): db connection events_and_contexts (list[(EventBase, EventContext)]): events we are persisting - use_negative_stream_ordering: Whether to start stream_ordering on - the negative side and decrement. This should be set as True - for backfilled events because backfilled events get a negative - stream ordering so they don't come down incremental `/sync`. """ depth_updates: Dict[str, int] = {} for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids txn.call_after(self.store._invalidate_get_event_cache, event.event_id) - # This will update the `stream_ordering` position to mark the latest + # Then update the `stream_ordering` position to mark the latest # event as the front of the room. This should not be done for # backfilled events because backfilled events have negative # stream_ordering and happened in the past so we know that we don't - # need to update the stream_ordering tip for the room. - if not use_negative_stream_ordering: + # need to update the stream_ordering tip/front for the room. + if event.internal_metadata.stream_ordering >= 0: txn.call_after( self.store._events_stream_cache.entity_has_changed, event.room_id, From 96f1fe6d2211494e7d62636996dc8e14f61f088d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 24 Nov 2021 17:47:51 -0600 Subject: [PATCH 6/7] Fix lint --- synapse/storage/databases/main/events.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 6c53350799b8..c564f3ec4e7c 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1228,7 +1228,10 @@ def _update_room_depths_txn( # backfilled events because backfilled events have negative # stream_ordering and happened in the past so we know that we don't # need to update the stream_ordering tip/front for the room. - if event.internal_metadata.stream_ordering >= 0: + if ( + event.internal_metadata.stream_ordering + and event.internal_metadata.stream_ordering >= 0 + ): txn.call_after( self.store._events_stream_cache.entity_has_changed, event.room_id, From 53e5d0b314512d26360207ff548d2b4ecda839de Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 29 Nov 2021 15:25:57 -0600 Subject: [PATCH 7/7] Assert to appease typing See https://github.com/matrix-org/synapse/pull/11417#discussion_r756840908 --- synapse/storage/databases/main/events.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index c564f3ec4e7c..9065bca59c45 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1228,10 +1228,8 @@ def _update_room_depths_txn( # backfilled events because backfilled events have negative # stream_ordering and happened in the past so we know that we don't # need to update the stream_ordering tip/front for the room. - if ( - event.internal_metadata.stream_ordering - and event.internal_metadata.stream_ordering >= 0 - ): + assert event.internal_metadata.stream_ordering is not None + if event.internal_metadata.stream_ordering >= 0: txn.call_after( self.store._events_stream_cache.entity_has_changed, event.room_id,