diff --git a/changelog.d/13100.misc b/changelog.d/13100.misc new file mode 100644 index 000000000000..28f2fe0349a9 --- /dev/null +++ b/changelog.d/13100.misc @@ -0,0 +1 @@ +Faster room joins: Handle race between persisting an event and un-partial stating a room. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 3e1518f1f60e..5dfdc86740d1 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -67,6 +67,7 @@ ReplicationFederationSendEduRestServlet, ReplicationGetQueryRestServlet, ) +from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.databases.main.lock import Lock from synapse.types import JsonDict, StateMap, get_domain_from_id from synapse.util import json_decoder, unwrapFirstError @@ -882,9 +883,20 @@ async def _on_send_membership_event( logger.warning("%s", errmsg) raise SynapseError(403, errmsg, Codes.FORBIDDEN) - return await self._federation_event_handler.on_send_membership_event( - origin, event - ) + try: + return await self._federation_event_handler.on_send_membership_event( + origin, event + ) + except PartialStateConflictError: + # The room was un-partial stated while we were persisting the event. + # Try once more, with full state this time. + logger.info( + "Room %s was un-partial stated during `on_send_membership_event`, trying again.", + room_id, + ) + return await self._federation_event_handler.on_send_membership_event( + origin, event + ) async def on_event_auth( self, origin: str, room_id: str, event_id: str diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 34cc5ecd1144..3c44b4bf8608 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -45,6 +45,7 @@ FederationDeniedError, FederationError, HttpResponseException, + LimitExceededError, NotFoundError, RequestSendFailed, SynapseError, @@ -64,6 +65,7 @@ ReplicationCleanRoomRestServlet, ReplicationStoreRoomOnOutlierMembershipRestServlet, ) +from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.state import StateFilter from synapse.types import JsonDict, StateMap, get_domain_from_id @@ -549,15 +551,29 @@ async def do_invite_join( # https://github.com/matrix-org/synapse/issues/12998 await self.store.store_partial_state_room(room_id, ret.servers_in_room) - max_stream_id = await self._federation_event_handler.process_remote_join( - origin, - room_id, - auth_chain, - state, - event, - room_version_obj, - partial_state=ret.partial_state, - ) + try: + max_stream_id = ( + await self._federation_event_handler.process_remote_join( + origin, + room_id, + auth_chain, + state, + event, + room_version_obj, + partial_state=ret.partial_state, + ) + ) + except PartialStateConflictError as e: + # The homeserver was already in the room and it is no longer partial + # stated. We ought to be doing a local join instead. Turn the error into + # a 429, as a hint to the client to try again. + # TODO(faster_joins): `_should_perform_remote_join` suggests that we may + # do a remote join for restricted rooms even if we have full state. + logger.error( + "Room %s was un-partial stated while processing remote join.", + room_id, + ) + raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0) if ret.partial_state: # Kick off the process of asynchronously fetching the state for this @@ -1567,11 +1583,6 @@ async def _sync_partial_state_room( # we raced against more events arriving with partial state. Go round # the loop again. We've already logged a warning, so no need for more. - # TODO(faster_joins): there is still a race here, whereby incoming events which raced - # with us will fail to be persisted after the call to `clear_partial_state_room` due to - # having partial state. - # https://github.com/matrix-org/synapse/issues/12988 - # continue events = await self.store.get_events_as_list( diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 479d936dc00b..c74117c19aae 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -64,6 +64,7 @@ ReplicationFederationSendEventsRestServlet, ) from synapse.state import StateResolutionStore +from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.state import StateFilter from synapse.types import ( @@ -275,7 +276,16 @@ async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None: affected=pdu.event_id, ) - await self._process_received_pdu(origin, pdu, state_ids=None) + try: + await self._process_received_pdu(origin, pdu, state_ids=None) + except PartialStateConflictError: + # The room was un-partial stated while we were processing the PDU. + # Try once more, with full state this time. + logger.info( + "Room %s was un-partial stated while processing the PDU, trying again.", + room_id, + ) + await self._process_received_pdu(origin, pdu, state_ids=None) async def on_send_membership_event( self, origin: str, event: EventBase @@ -306,6 +316,9 @@ async def on_send_membership_event( Raises: SynapseError if the event is not accepted into the room + PartialStateConflictError if the room was un-partial stated in between + computing the state at the event and persisting it. The caller should + retry exactly once in this case. """ logger.debug( "on_send_membership_event: Got event: %s, signatures: %s", @@ -423,6 +436,8 @@ async def process_remote_join( Raises: SynapseError if the response is in some way invalid. + PartialStateConflictError if the homeserver is already in the room and it + has been un-partial stated. """ create_event = None for e in state: @@ -1084,10 +1099,14 @@ async def _process_received_pdu( state_ids: Normally None, but if we are handling a gap in the graph (ie, we are missing one or more prev_events), the resolved state at the - event + event. Must not be partial state. backfilled: True if this is part of a historical batch of events (inhibits notification to clients, and validation of device keys.) + + PartialStateConflictError: if the room was un-partial stated in between + computing the state at the event and persisting it. The caller should retry + exactly once in this case. Will never be raised if `state_ids` is provided. """ logger.debug("Processing event: %s", event) assert not event.internal_metadata.outlier @@ -1933,6 +1952,9 @@ async def _run_push_actions_and_persist_event( event: The event itself. context: The event context. backfilled: True if the event was backfilled. + + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ # this method should not be called on outliers (those code paths call # persist_events_and_notify directly.) @@ -1985,6 +2007,10 @@ async def persist_events_and_notify( Returns: The stream ID after which all events have been persisted. + + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ if not event_and_contexts: return self._store.get_room_max_stream_ordering() @@ -1993,14 +2019,19 @@ async def persist_events_and_notify( if instance != self._instance_name: # Limit the number of events sent over replication. We choose 200 # here as that is what we default to in `max_request_body_size(..)` - for batch in batch_iter(event_and_contexts, 200): - result = await self._send_events( - instance_name=instance, - store=self._store, - room_id=room_id, - event_and_contexts=batch, - backfilled=backfilled, - ) + try: + for batch in batch_iter(event_and_contexts, 200): + result = await self._send_events( + instance_name=instance, + store=self._store, + room_id=room_id, + event_and_contexts=batch, + backfilled=backfilled, + ) + except SynapseError as e: + if e.code == HTTPStatus.CONFLICT: + raise PartialStateConflictError() + raise return result["max_stream_id"] else: assert self._storage_controllers.persistence diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c6b40a5b7a4d..1980e37daeca 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -37,6 +37,7 @@ AuthError, Codes, ConsentNotGivenError, + LimitExceededError, NotFoundError, ShadowBanError, SynapseError, @@ -53,6 +54,7 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.send_event import ReplicationSendEventRestServlet +from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.state import StateFilter from synapse.types import ( @@ -1250,6 +1252,8 @@ async def handle_new_client_event( Raises: ShadowBanError if the requester has been shadow-banned. + SynapseError(503) if attempting to persist a partial state event in + a room that has been un-partial stated. """ extra_users = extra_users or [] @@ -1300,24 +1304,35 @@ async def handle_new_client_event( # We now persist the event (and update the cache in parallel, since we # don't want to block on it). - result, _ = await make_deferred_yieldable( - gather_results( - ( - run_in_background( - self._persist_event, - requester=requester, - event=event, - context=context, - ratelimit=ratelimit, - extra_users=extra_users, + try: + result, _ = await make_deferred_yieldable( + gather_results( + ( + run_in_background( + self._persist_event, + requester=requester, + event=event, + context=context, + ratelimit=ratelimit, + extra_users=extra_users, + ), + run_in_background( + self.cache_joined_hosts_for_event, event, context + ).addErrback( + log_failure, "cache_joined_hosts_for_event failed" + ), ), - run_in_background( - self.cache_joined_hosts_for_event, event, context - ).addErrback(log_failure, "cache_joined_hosts_for_event failed"), - ), - consumeErrors=True, + consumeErrors=True, + ) + ).addErrback(unwrapFirstError) + except PartialStateConflictError as e: + # The event context needs to be recomputed. + # Turn the error into a 429, as a hint to the client to try again. + logger.info( + "Room %s was un-partial stated while persisting client event.", + event.room_id, ) - ).addErrback(unwrapFirstError) + raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0) return result @@ -1332,6 +1347,9 @@ async def _persist_event( """Actually persists the event. Should only be called by `handle_new_client_event`, and see its docstring for documentation of the arguments. + + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ # Skip push notification actions for historical messages @@ -1348,16 +1366,21 @@ async def _persist_event( # If we're a worker we need to hit out to the master. writer_instance = self._events_shard_config.get_instance(event.room_id) if writer_instance != self._instance_name: - result = await self.send_event( - instance_name=writer_instance, - event_id=event.event_id, - store=self.store, - requester=requester, - event=event, - context=context, - ratelimit=ratelimit, - extra_users=extra_users, - ) + try: + result = await self.send_event( + instance_name=writer_instance, + event_id=event.event_id, + store=self.store, + requester=requester, + event=event, + context=context, + ratelimit=ratelimit, + extra_users=extra_users, + ) + except SynapseError as e: + if e.code == HTTPStatus.CONFLICT: + raise PartialStateConflictError() + raise stream_id = result["stream_id"] event_id = result["event_id"] if event_id != event.event_id: @@ -1485,6 +1508,10 @@ async def persist_and_notify_client_event( The persisted event. This may be different than the given event if it was de-duplicated (e.g. because we had already persisted an event with the same transaction ID.) + + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ extra_users = extra_users or [] diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index eed29cd59739..d3abafed2871 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -60,6 +60,9 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): { "max_stream_id": 32443, } + + Responds with a 409 when a `PartialStateConflictError` is raised due to an event + context that needs to be recomputed due to the un-partial stating of a room. """ NAME = "fed_send_events" diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index c2b2588ea548..486f04723c89 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -59,6 +59,9 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): { "stream_id": 12345, "event_id": "$abcdef..." } + Responds with a 409 when a `PartialStateConflictError` is raised due to an event + context that needs to be recomputed due to the un-partial stating of a room. + The returned event ID may not match the sent event if it was deduplicated. """ diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 4bcb99d06eae..c248fccc8163 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -315,6 +315,10 @@ async def persist_events( if they were deduplicated due to an event already existing that matched the transaction ID; the existing event is returned in such a case. + + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {} for event, ctx in events_and_contexts: @@ -363,6 +367,10 @@ async def persist_event( latest persisted event. The returned event may not match the given event if it was deduplicated due to an existing event matching the transaction ID. + + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ # add_to_queue returns a map from event ID to existing event ID if the # event was deduplicated. (The dict may also include other entries if @@ -453,6 +461,10 @@ async def _persist_event_batch( Returns: A dictionary of event ID to event ID we didn't persist as we already had another event persisted with the same TXN ID. + + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ replaced_events: Dict[str, str] = {} if not events_and_contexts: diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index a3e12f1e9be4..8a0e4e9589ad 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -16,6 +16,7 @@ import itertools import logging from collections import OrderedDict +from http import HTTPStatus from typing import ( TYPE_CHECKING, Any, @@ -35,6 +36,7 @@ import synapse.metrics from synapse.api.constants import EventContentFields, EventTypes, RelationTypes +from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import RoomVersions from synapse.events import EventBase, relation_from_event from synapse.events.snapshot import EventContext @@ -69,6 +71,24 @@ ) +class PartialStateConflictError(SynapseError): + """An internal error raised when attempting to persist an event with partial state + after the room containing the event has been un-partial stated. + + This error should be handled by recomputing the event context and trying again. + + This error has an HTTP status code so that it can be transported over replication. + It should not be exposed to clients. + """ + + def __init__(self) -> None: + super().__init__( + HTTPStatus.CONFLICT, + msg="Cannot persist partial state event in un-partial stated room", + errcode=Codes.UNKNOWN, + ) + + @attr.s(slots=True, auto_attribs=True) class DeltaState: """Deltas to use to update the `current_state_events` table. @@ -154,6 +174,10 @@ async def _persist_events_and_state_updates( Returns: Resolves when the events have been persisted + + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ # We want to calculate the stream orderings as late as possible, as @@ -354,6 +378,9 @@ def _persist_events_txn( For each room, a list of the event ids which are the forward extremities. + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ state_delta_for_room = state_delta_for_room or {} new_forward_extremities = new_forward_extremities or {} @@ -1304,6 +1331,10 @@ def _update_outliers_txn( Returns: new list, without events which are already in the events table. + + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ txn.execute( "SELECT event_id, outlier FROM events WHERE event_id in (%s)" @@ -2215,6 +2246,11 @@ def _store_event_state_mappings_txn( txn: LoggingTransaction, events_and_contexts: Collection[Tuple[EventBase, EventContext]], ) -> None: + """ + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. + """ state_groups = {} for event, context in events_and_contexts: if event.internal_metadata.is_outlier(): @@ -2239,19 +2275,37 @@ def _store_event_state_mappings_txn( # if we have partial state for these events, record the fact. (This happens # here rather than in _store_event_txn because it also needs to happen when # we de-outlier an event.) - self.db_pool.simple_insert_many_txn( - txn, - table="partial_state_events", - keys=("room_id", "event_id"), - values=[ - ( - event.room_id, - event.event_id, - ) - for event, ctx in events_and_contexts - if ctx.partial_state - ], - ) + try: + self.db_pool.simple_insert_many_txn( + txn, + table="partial_state_events", + keys=("room_id", "event_id"), + values=[ + ( + event.room_id, + event.event_id, + ) + for event, ctx in events_and_contexts + if ctx.partial_state + ], + ) + except self.db_pool.engine.module.IntegrityError: + logger.info( + "Cannot persist events %s in rooms %s: room has been un-partial stated", + [ + event.event_id + for event, ctx in events_and_contexts + if ctx.partial_state + ], + list( + { + event.room_id + for event, ctx in events_and_contexts + if ctx.partial_state + } + ), + ) + raise PartialStateConflictError() self.db_pool.simple_upsert_many_txn( txn, diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index d8026e3fac7f..13d6a1d5c0d9 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1156,19 +1156,25 @@ async def get_partial_state_rooms_and_servers( return room_servers async def clear_partial_state_room(self, room_id: str) -> bool: - # this can race with incoming events, so we watch out for FK errors. - # TODO(faster_joins): this still doesn't completely fix the race, since the persist process - # is not atomic. I fear we need an application-level lock. - # https://github.com/matrix-org/synapse/issues/12988 + """Clears the partial state flag for a room. + + Args: + room_id: The room whose partial state flag is to be cleared. + + Returns: + `True` if the partial state flag has been cleared successfully. + + `False` if the partial state flag could not be cleared because the room + still contains events with partial state. + """ try: await self.db_pool.runInteraction( "clear_partial_state_room", self._clear_partial_state_room_txn, room_id ) return True - except self.db_pool.engine.module.DatabaseError as e: - # TODO(faster_joins): how do we distinguish between FK errors and other errors? - # https://github.com/matrix-org/synapse/issues/12988 - logger.warning( + except self.db_pool.engine.module.IntegrityError as e: + # Assume that any `IntegrityError`s are due to partial state events. + logger.info( "Exception while clearing lazy partial-state-room %s, retrying: %s", room_id, e,