Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Avoid blocking lazy-loading /syncs during partial joins #13477

Merged
merged 12 commits into from
Aug 18, 2022
Merged
1 change: 1 addition & 0 deletions changelog.d/13477.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Faster room joins: Avoid blocking lazy-loading `/sync`s during partial joins due to remote memberships. Pull remote memberships from auth events instead of the room state.
206 changes: 175 additions & 31 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,10 +517,17 @@ async def _load_filtered_recents(
# ensure that we always include current state in the timeline
current_state_ids: FrozenSet[str] = frozenset()
if any(e.is_state() for e in recents):
# FIXME(faster_joins): We use the partial state here as
# we don't want to block `/sync` on finishing a lazy join.
# Which should be fine once
# https://github.com/matrix-org/synapse/issues/12989 is resolved,
# since we shouldn't reach here anymore?
# Note that we use the current state as a whitelist for filtering
# `recents`, so partial state is only a problem when a membership
# event turns up in `recents` but has not made it into the current
# state.
current_state_ids_map = (
await self._state_storage_controller.get_current_state_ids(
room_id
)
await self.store.get_partial_current_state_ids(room_id)
)
current_state_ids = frozenset(current_state_ids_map.values())

Expand Down Expand Up @@ -589,7 +596,13 @@ async def _load_filtered_recents(
if any(e.is_state() for e in loaded_recents):
# FIXME(faster_joins): We use the partial state here as
# we don't want to block `/sync` on finishing a lazy join.
# Is this the correct way of doing it?
# Which should be fine once
# https://github.com/matrix-org/synapse/issues/12989 is resolved,
# since we shouldn't reach here anymore?
# Note that we use the current state as a whitelist for filtering
# `loaded_recents`, so partial state is only a problem when a
# membership event turns up in `loaded_recents` but has not made it
# into the current state.
current_state_ids_map = (
await self.store.get_partial_current_state_ids(room_id)
)
Expand Down Expand Up @@ -637,17 +650,25 @@ async def _load_filtered_recents(
)

async def get_state_after_event(
self, event_id: str, state_filter: Optional[StateFilter] = None
self,
event_id: str,
state_filter: Optional[StateFilter] = None,
await_full_state: Optional[bool] = None,
) -> StateMap[str]:
"""
Get the room state after the given event

Args:
event_id: event of interest
state_filter: The state filter used to fetch state from the database.
await_full_state: if `True`, will block if we do not yet have complete state
at the event. Defaults to `True` unless `state_filter` can be completely
satisfied with partial state.
"""
state_ids = await self._state_storage_controller.get_state_ids_for_event(
event_id, state_filter=state_filter or StateFilter.all()
event_id,
state_filter=state_filter or StateFilter.all(),
await_full_state=await_full_state,
)

# using get_metadata_for_events here (instead of get_event) sidesteps an issue
Expand All @@ -670,13 +691,18 @@ async def get_state_at(
room_id: str,
stream_position: StreamToken,
state_filter: Optional[StateFilter] = None,
await_full_state: Optional[bool] = None,
) -> StateMap[str]:
"""Get the room state at a particular stream position

Args:
room_id: room for which to get state
stream_position: point at which to get state
state_filter: The state filter used to fetch state from the database.
await_full_state: if `True`, will block if we do not yet have complete state
at the last event in the room before `stream_position`. Defaults to
`True` unless `state_filter` can be completely satisfied with partial
state.
"""
# FIXME: This gets the state at the latest event before the stream ordering,
# which might not be the same as the "current state" of the room at the time
Expand All @@ -688,7 +714,9 @@ async def get_state_at(

if last_event_id:
state = await self.get_state_after_event(
last_event_id, state_filter=state_filter or StateFilter.all()
last_event_id,
state_filter=state_filter or StateFilter.all(),
await_full_state=await_full_state,
)

else:
Expand Down Expand Up @@ -890,8 +918,14 @@ async def compute_state_delta(

with Measure(self.clock, "compute_state_delta"):
# The memberships needed for events in the timeline.
# A dictionary with user IDs as keys and the first event in the timeline
# requiring each member as values.
# Only calculated when `lazy_load_members` is on.
members_to_fetch = None
members_to_fetch: Optional[Dict[str, Optional[EventBase]]] = None

# The contribution to the room state from state events in the timeline.
# Only contains the last event for any given state key.
timeline_state: StateMap[str]

lazy_load_members = sync_config.filter_collection.lazy_load_members()
include_redundant_members = (
Expand All @@ -902,29 +936,38 @@ async def compute_state_delta(
# We only request state for the members needed to display the
# timeline:

members_to_fetch = {
event.sender # FIXME: we also care about invite targets etc.
for event in batch.events
}
timeline_state = {}

members_to_fetch = {}
for event in batch.events:
# We need the event's sender, unless their membership was in a
# previous timeline event.
if (
EventTypes.Member,
event.sender,
) not in timeline_state and event.sender not in members_to_fetch:
members_to_fetch[event.sender] = event
# FIXME: we also care about invite targets etc.

if event.is_state():
timeline_state[(event.type, event.state_key)] = event.event_id
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change calculates members_to_fetch more accurately, by excluding event senders whose membership appears previously in the timeline.


if full_state:
# always make sure we LL ourselves so we know we're in the room
# (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
# We only need apply this on full state syncs given we disabled
# LL for incr syncs in #3840.
members_to_fetch.add(sync_config.user.to_string())
members_to_fetch[sync_config.user.to_string()] = None

state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
else:
state_filter = StateFilter.all()
timeline_state = {
(event.type, event.state_key): event.event_id
for event in batch.events
if event.is_state()
}

# The contribution to the room state from state events in the timeline.
# Only contains the last event for any given state key.
timeline_state = {
(event.type, event.state_key): event.event_id
for event in batch.events
if event.is_state()
}
state_filter = StateFilter.all()

# Now calculate the state to return in the sync response for the room.
# This is more or less the change in state between the end of the previous
Expand All @@ -936,19 +979,26 @@ async def compute_state_delta(
if batch:
state_at_timeline_end = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[-1].event_id, state_filter=state_filter
batch.events[-1].event_id,
state_filter=state_filter,
await_full_state=not lazy_load_members,
richvdh marked this conversation as resolved.
Show resolved Hide resolved
)
)

state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id, state_filter=state_filter
batch.events[0].event_id,
state_filter=state_filter,
await_full_state=not lazy_load_members,
)
)

else:
state_at_timeline_end = await self.get_state_at(
room_id, stream_position=now_token, state_filter=state_filter
room_id,
stream_position=now_token,
state_filter=state_filter,
await_full_state=not lazy_load_members,
)

state_at_timeline_start = state_at_timeline_end
Expand All @@ -964,14 +1014,19 @@ async def compute_state_delta(
if batch:
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id, state_filter=state_filter
batch.events[0].event_id,
state_filter=state_filter,
await_full_state=not lazy_load_members,
)
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_start = await self.get_state_at(
room_id, stream_position=now_token, state_filter=state_filter
room_id,
stream_position=now_token,
state_filter=state_filter,
await_full_state=not lazy_load_members,
)

# for now, we disable LL for gappy syncs - see
Expand All @@ -993,20 +1048,28 @@ async def compute_state_delta(
# is indeed the case.
assert since_token is not None
state_at_previous_sync = await self.get_state_at(
room_id, stream_position=since_token, state_filter=state_filter
room_id,
stream_position=since_token,
state_filter=state_filter,
await_full_state=not lazy_load_members,
)

if batch:
state_at_timeline_end = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[-1].event_id, state_filter=state_filter
batch.events[-1].event_id,
state_filter=state_filter,
await_full_state=not lazy_load_members,
)
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_end = await self.get_state_at(
room_id, stream_position=now_token, state_filter=state_filter
room_id,
stream_position=now_token,
state_filter=state_filter,
await_full_state=not lazy_load_members,
)

state_ids = _calculate_state(
Expand Down Expand Up @@ -1036,6 +1099,77 @@ async def compute_state_delta(
(EventTypes.Member, member)
for member in members_to_fetch
),
await_full_state=False,
)

# If we only have partial state for the room, `state_ids` may be missing the
# memberships we wanted. We attempt to find some by digging through the auth
# events of timeline events.
if lazy_load_members:
assert members_to_fetch is not None

is_partial_state = await self.store.is_partial_state_room(room_id)
if is_partial_state:
additional_state_ids: MutableStateMap[str] = {}

# Tracks the missing members for logging purposes.
missing_members = {}

# Pick out the auth events of timeline events whose sender
# memberships are missing.
auth_event_ids: Set[str] = set()
for member, first_referencing_event in members_to_fetch.items():
if (
first_referencing_event is None
richvdh marked this conversation as resolved.
Show resolved Hide resolved
or (EventTypes.Member, member) in state_ids
):
continue

missing_members[member] = first_referencing_event
auth_event_ids.update(first_referencing_event.auth_event_ids())

auth_events = await self.store.get_events(auth_event_ids)

# Run through the events with missing sender memberships once more,
# picking out the memberships from the pile of auth events we have
# just fetched.
for member, first_referencing_event in members_to_fetch.items():
if (
first_referencing_event is None
or (EventTypes.Member, member) in state_ids
):
continue

# Dig through the auth events to find the sender's membership.
for auth_event_id in first_referencing_event.auth_event_ids():
# We only store events once we have all their auth events,
# so the auth event must be in the pile we have just
# fetched.
auth_event = auth_events[auth_event_id]

if (
auth_event.type == EventTypes.Member
and auth_event.state_key == event.sender
):
missing_members.pop(member)
additional_state_ids[
(EventTypes.Member, event.sender)
] = auth_event.event_id
break

# Now merge in the state we have scrounged up.
state_ids = {**state_ids, **additional_state_ids}

if missing_members:
# There really shouldn't be any missing memberships now.
# For an event to appear in the timeline, we must have its auth
# events, which must include its sender's membership.
logger.error(
"Failed to find memberships for %s in partial state room "
"%s in the auth events of %s.",
list(missing_members.keys()),
room_id,
list(missing_members.values()),
)
Copy link
Contributor Author

@squahtx squahtx Aug 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since all three branches (initial, gappy and incremental sync) above have been modified to return partial state, the property below (

# At this point, if `lazy_load_members` is enabled, `state_ids` includes
# the memberships of all event senders in the timeline

) no longer holds. We try to find membership events from auth events to restore the property.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this error to be accurate, we need to change the calculation of members_to_fetch to be more accurate (done above).


# At this point, if `lazy_load_members` is enabled, `state_ids` includes
Expand Down Expand Up @@ -1730,7 +1864,11 @@ async def _get_rooms_changed(
continue

if room_id in sync_result_builder.joined_room_ids or has_join:
old_state_ids = await self.get_state_at(room_id, since_token)
old_state_ids = await self.get_state_at(
room_id,
since_token,
state_filter=StateFilter.from_types([(EventTypes.Member, user_id)]),
)
old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
Comment on lines +1916 to 1921
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only ever pull this one local membership out of the state, so the state filter is okay to add. The same applies to the next instance below.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like a valuable optimisation irrespective of the faster-joins case

old_mem_ev = None
if old_mem_ev_id:
Expand All @@ -1756,7 +1894,13 @@ async def _get_rooms_changed(
newly_left_rooms.append(room_id)
else:
if not old_state_ids:
old_state_ids = await self.get_state_at(room_id, since_token)
old_state_ids = await self.get_state_at(
room_id,
since_token,
state_filter=StateFilter.from_types(
[(EventTypes.Member, user_id)]
),
)
old_mem_ev_id = old_state_ids.get(
(EventTypes.Member, user_id), None
)
Expand Down
Loading