Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Handle race between persisting an event and un-partial stating a room #13100

Merged
merged 18 commits into from
Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13100.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Faster room joins: Handle race between persisting an event and un-partial stating a room.
18 changes: 15 additions & 3 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
ReplicationFederationSendEduRestServlet,
ReplicationGetQueryRestServlet,
)
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.lock import Lock
from synapse.types import JsonDict, StateMap, get_domain_from_id
from synapse.util import json_decoder, unwrapFirstError
Expand Down Expand Up @@ -882,9 +883,20 @@ async def _on_send_membership_event(
logger.warning("%s", errmsg)
raise SynapseError(403, errmsg, Codes.FORBIDDEN)

return await self._federation_event_handler.on_send_membership_event(
origin, event
)
try:
return await self._federation_event_handler.on_send_membership_event(
origin, event
)
except PartialStateConflictError:
# The room was un-partial stated while we were persisting the event.
# Try once more, with full state this time.
logger.info(
"Room %s was un-partial stated during `on_send_membership_event`, trying again.",
room_id,
)
return await self._federation_event_handler.on_send_membership_event(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so to clarify, there should only ever be one retry needed because once it's un-partial-stated, it can't conflict anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, a room can only be un-partial-stated once.
Unless we leave it or purge it, but I don't know what happens in that case, even in the absence of faster room joins.

origin, event
)

async def on_event_auth(
self, origin: str, room_id: str, event_id: str
Expand Down
39 changes: 25 additions & 14 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
FederationDeniedError,
FederationError,
HttpResponseException,
LimitExceededError,
NotFoundError,
RequestSendFailed,
SynapseError,
Expand All @@ -64,6 +65,7 @@
ReplicationCleanRoomRestServlet,
ReplicationStoreRoomOnOutlierMembershipRestServlet,
)
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import JsonDict, StateMap, get_domain_from_id
Expand Down Expand Up @@ -549,15 +551,29 @@ async def do_invite_join(
# https://github.com/matrix-org/synapse/issues/12998
await self.store.store_partial_state_room(room_id, ret.servers_in_room)

max_stream_id = await self._federation_event_handler.process_remote_join(
origin,
room_id,
auth_chain,
state,
event,
room_version_obj,
partial_state=ret.partial_state,
)
try:
max_stream_id = (
await self._federation_event_handler.process_remote_join(
origin,
room_id,
auth_chain,
state,
event,
room_version_obj,
partial_state=ret.partial_state,
)
)
except PartialStateConflictError as e:
# The homeserver was already in the room and it is no longer partial
# stated. We ought to be doing a local join instead. Turn the error into
# a 429, as a hint to the client to try again.
# TODO(faster_joins): `_should_perform_remote_join` suggests that we may
# do a remote join for restricted rooms even if we have full state.
logger.error(
"Room %s was un-partial stated while processing remote join.",
room_id,
)
raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we consider opening an issue as to talk about whether we want to make this better?
I see what you're saying but it still feels to me that this is worth thinking about (but I don't want to block this PR on that).
(For sending messages, some clients seem to prompt you to retry sending the message if it fails, I'm not sure about the exact circumstances but leaving it like this means we'll want to check that, so perhaps defer it to an issue regardless)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's fair. I've filed it as #13173.


if ret.partial_state:
# Kick off the process of asynchronously fetching the state for this
Expand Down Expand Up @@ -1567,11 +1583,6 @@ async def _sync_partial_state_room(

# we raced against more events arriving with partial state. Go round
# the loop again. We've already logged a warning, so no need for more.
# TODO(faster_joins): there is still a race here, whereby incoming events which raced
# with us will fail to be persisted after the call to `clear_partial_state_room` due to
# having partial state.
# https://github.com/matrix-org/synapse/issues/12988
#
continue

events = await self.store.get_events_as_list(
Expand Down
51 changes: 41 additions & 10 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
ReplicationFederationSendEventsRestServlet,
)
from synapse.state import StateResolutionStore
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import (
Expand Down Expand Up @@ -275,7 +276,16 @@ async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None:
affected=pdu.event_id,
)

await self._process_received_pdu(origin, pdu, state_ids=None)
try:
await self._process_received_pdu(origin, pdu, state_ids=None)
except PartialStateConflictError:
# The room was un-partial stated while we were processing the PDU.
# Try once more, with full state this time.
logger.info(
"Room %s was un-partial stated while processing the PDU, trying again.",
room_id,
)
await self._process_received_pdu(origin, pdu, state_ids=None)

async def on_send_membership_event(
self, origin: str, event: EventBase
Expand Down Expand Up @@ -306,6 +316,9 @@ async def on_send_membership_event(

Raises:
SynapseError if the event is not accepted into the room
PartialStateConflictError if the room was un-partial stated in between
computing the state at the event and persisting it. The caller should
retry exactly once in this case.
"""
logger.debug(
"on_send_membership_event: Got event: %s, signatures: %s",
Expand Down Expand Up @@ -423,6 +436,8 @@ async def process_remote_join(

Raises:
SynapseError if the response is in some way invalid.
PartialStateConflictError if the homeserver is already in the room and it
has been un-partial stated.
"""
create_event = None
for e in state:
Expand Down Expand Up @@ -1084,10 +1099,14 @@ async def _process_received_pdu(

state_ids: Normally None, but if we are handling a gap in the graph
(ie, we are missing one or more prev_events), the resolved state at the
event
event. Must not be partial state.

backfilled: True if this is part of a historical batch of events (inhibits
notification to clients, and validation of device keys.)

PartialStateConflictError: if the room was un-partial stated in between
computing the state at the event and persisting it. The caller should retry
exactly once in this case. Will never be raised if `state_ids` is provided.
"""
logger.debug("Processing event: %s", event)
assert not event.internal_metadata.outlier
Expand Down Expand Up @@ -1934,6 +1953,9 @@ async def _run_push_actions_and_persist_event(
event: The event itself.
context: The event context.
backfilled: True if the event was backfilled.

PartialStateConflictError: if attempting to persist a partial state event in
a room that has been un-partial stated.
"""
# this method should not be called on outliers (those code paths call
# persist_events_and_notify directly.)
Expand Down Expand Up @@ -1986,6 +2008,10 @@ async def persist_events_and_notify(

Returns:
The stream ID after which all events have been persisted.

Raises:
PartialStateConflictError: if attempting to persist a partial state event in
a room that has been un-partial stated.
"""
if not event_and_contexts:
return self._store.get_room_max_stream_ordering()
Expand All @@ -1994,14 +2020,19 @@ async def persist_events_and_notify(
if instance != self._instance_name:
# Limit the number of events sent over replication. We choose 200
# here as that is what we default to in `max_request_body_size(..)`
for batch in batch_iter(event_and_contexts, 200):
result = await self._send_events(
instance_name=instance,
store=self._store,
room_id=room_id,
event_and_contexts=batch,
backfilled=backfilled,
)
try:
for batch in batch_iter(event_and_contexts, 200):
result = await self._send_events(
instance_name=instance,
store=self._store,
room_id=room_id,
event_and_contexts=batch,
backfilled=backfilled,
)
except SynapseError as e:
if e.code == HTTPStatus.CONFLICT:
raise PartialStateConflictError()
raise
return result["max_stream_id"]
else:
assert self._storage_controllers.persistence
Expand Down
79 changes: 53 additions & 26 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
AuthError,
Codes,
ConsentNotGivenError,
LimitExceededError,
NotFoundError,
ShadowBanError,
SynapseError,
Expand All @@ -53,6 +54,7 @@
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import (
Expand Down Expand Up @@ -1247,6 +1249,8 @@ async def handle_new_client_event(

Raises:
ShadowBanError if the requester has been shadow-banned.
SynapseError(503) if attempting to persist a partial state event in
a room that has been un-partial stated.
"""
extra_users = extra_users or []

Expand Down Expand Up @@ -1297,24 +1301,35 @@ async def handle_new_client_event(

# We now persist the event (and update the cache in parallel, since we
# don't want to block on it).
result, _ = await make_deferred_yieldable(
gather_results(
(
run_in_background(
self._persist_event,
requester=requester,
event=event,
context=context,
ratelimit=ratelimit,
extra_users=extra_users,
try:
result, _ = await make_deferred_yieldable(
gather_results(
(
run_in_background(
self._persist_event,
requester=requester,
event=event,
context=context,
ratelimit=ratelimit,
extra_users=extra_users,
),
run_in_background(
self.cache_joined_hosts_for_event, event, context
).addErrback(
log_failure, "cache_joined_hosts_for_event failed"
),
),
run_in_background(
self.cache_joined_hosts_for_event, event, context
).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
),
consumeErrors=True,
consumeErrors=True,
)
).addErrback(unwrapFirstError)
except PartialStateConflictError as e:
# The event context needs to be recomputed.
# Turn the error into a 429, as a hint to the client to try again.
logger.info(
"Room %s was un-partial stated while persisting client event.",
event.room_id,
)
).addErrback(unwrapFirstError)
raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)

return result

Expand All @@ -1329,6 +1344,9 @@ async def _persist_event(
"""Actually persists the event. Should only be called by
`handle_new_client_event`, and see its docstring for documentation of
the arguments.

PartialStateConflictError: if attempting to persist a partial state event in
a room that has been un-partial stated.
"""

# Skip push notification actions for historical messages
Expand All @@ -1345,16 +1363,21 @@ async def _persist_event(
# If we're a worker we need to hit out to the master.
writer_instance = self._events_shard_config.get_instance(event.room_id)
if writer_instance != self._instance_name:
result = await self.send_event(
instance_name=writer_instance,
event_id=event.event_id,
store=self.store,
requester=requester,
event=event,
context=context,
ratelimit=ratelimit,
extra_users=extra_users,
)
try:
result = await self.send_event(
instance_name=writer_instance,
event_id=event.event_id,
store=self.store,
requester=requester,
event=event,
context=context,
ratelimit=ratelimit,
extra_users=extra_users,
)
except SynapseError as e:
if e.code == HTTPStatus.CONFLICT:
raise PartialStateConflictError()
raise
Comment on lines +1380 to +1383
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way we transport the PartialStateConflictError across replication is pretty ugly. I'm open to alternative suggestions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it feels ugly but it also is straight forward so it has that going for it. I think it's fine and we can always change it later

stream_id = result["stream_id"]
event_id = result["event_id"]
if event_id != event.event_id:
Expand Down Expand Up @@ -1482,6 +1505,10 @@ async def persist_and_notify_client_event(
The persisted event. This may be different than the given event if
it was de-duplicated (e.g. because we had already persisted an
event with the same transaction ID.)

Raises:
PartialStateConflictError: if attempting to persist a partial state event in
a room that has been un-partial stated.
"""
extra_users = extra_users or []

Expand Down
3 changes: 3 additions & 0 deletions synapse/replication/http/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
{
"max_stream_id": 32443,
}

Responds with a 409 when a `PartialStateConflictError` is raised due to an event
context that needs to be recomputed due to the un-partial stating of a room.
"""

NAME = "fed_send_events"
Expand Down
3 changes: 3 additions & 0 deletions synapse/replication/http/send_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):

{ "stream_id": 12345, "event_id": "$abcdef..." }

Responds with a 409 when a `PartialStateConflictError` is raised due to an event
context that needs to be recomputed due to the un-partial stating of a room.

The returned event ID may not match the sent event if it was deduplicated.
"""

Expand Down
12 changes: 12 additions & 0 deletions synapse/storage/controllers/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ async def persist_events(
if they were deduplicated due to an event already existing that
matched the transaction ID; the existing event is returned in such
a case.

Raises:
PartialStateConflictError: if attempting to persist a partial state event in
a room that has been un-partial stated.
"""
partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
for event, ctx in events_and_contexts:
Expand Down Expand Up @@ -363,6 +367,10 @@ async def persist_event(
latest persisted event. The returned event may not match the given
event if it was deduplicated due to an existing event matching the
transaction ID.

Raises:
PartialStateConflictError: if attempting to persist a partial state event in
a room that has been un-partial stated.
"""
# add_to_queue returns a map from event ID to existing event ID if the
# event was deduplicated. (The dict may also include other entries if
Expand Down Expand Up @@ -453,6 +461,10 @@ async def _persist_event_batch(
Returns:
A dictionary of event ID to event ID we didn't persist as we already
had another event persisted with the same TXN ID.

Raises:
PartialStateConflictError: if attempting to persist a partial state event in
a room that has been un-partial stated.
"""
replaced_events: Dict[str, str] = {}
if not events_and_contexts:
Expand Down
Loading