Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Wait for lazy join to complete when getting current state (#12872)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston authored Jun 1, 2022
1 parent 782cb74 commit 888a29f
Show file tree
Hide file tree
Showing 33 changed files with 361 additions and 82 deletions.
1 change: 1 addition & 0 deletions changelog.d/12872.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Faster room joins: when querying the current state of the room, wait for state to be populated.
3 changes: 2 additions & 1 deletion synapse/events/third_party_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def __init__(self, hs: "HomeServer"):
self.third_party_rules = None

self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()

self._check_event_allowed_callbacks: List[CHECK_EVENT_ALLOWED_CALLBACK] = []
self._on_create_room_callbacks: List[ON_CREATE_ROOM_CALLBACK] = []
Expand Down Expand Up @@ -463,7 +464,7 @@ async def _get_state_map_for_room(self, room_id: str) -> StateMap[EventBase]:
Returns:
A dict mapping (event type, state key) to state event.
"""
state_ids = await self.store.get_filtered_current_state_ids(room_id)
state_ids = await self._storage_controllers.state.get_current_state_ids(room_id)
room_state_events = await self.store.get_events(state_ids.values())

state_events = {}
Expand Down
4 changes: 3 additions & 1 deletion synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ def __init__(self, hs: "HomeServer"):
self.state = hs.get_state_handler()
self._event_auth_handler = hs.get_event_auth_handler()

self._state_storage_controller = hs.get_storage_controllers().state

self.device_handler = hs.get_device_handler()

# Ensure the following handlers are loaded since they register callbacks
Expand Down Expand Up @@ -1221,7 +1223,7 @@ async def check_server_matches_acl(self, server_name: str, room_id: str) -> None
Raises:
AuthError if the server does not match the ACL
"""
state_ids = await self.store.get_current_state_ids(room_id)
state_ids = await self._state_storage_controller.get_current_state_ids(room_id)
acl_event_id = state_ids.get((EventTypes.ServerACL, ""))

if not acl_event_id:
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ async def get_user_ids_changed(
possibly_changed = set(changed)
possibly_left = set()
for room_id in rooms_changed:
current_state_ids = await self.store.get_current_state_ids(room_id)
current_state_ids = await self._state_storage.get_current_state_ids(room_id)

# The user may have left the room
# TODO: Check if they actually did or if we were just invited.
Expand Down
7 changes: 6 additions & 1 deletion synapse/handlers/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self, hs: "HomeServer"):
self.appservice_handler = hs.get_application_service_handler()
self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.config = hs.config
self.enable_room_list_search = hs.config.roomdirectory.enable_room_list_search
self.require_membership = hs.config.server.require_membership_for_aliases
Expand Down Expand Up @@ -463,7 +464,11 @@ async def edit_published_room_list(
making_public = visibility == "public"
if making_public:
room_aliases = await self.store.get_aliases_for_room(room_id)
canonical_alias = await self.store.get_canonical_alias_for_room(room_id)
canonical_alias = (
await self._storage_controllers.state.get_canonical_alias_for_room(
room_id
)
)
if canonical_alias:
room_aliases.append(canonical_alias)

Expand Down
7 changes: 6 additions & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,9 @@ async def on_make_join_request(
# Note that this requires the /send_join request to come back to the
# same server.
if room_version.msc3083_join_rules:
state_ids = await self.store.get_current_state_ids(room_id)
state_ids = await self._state_storage_controller.get_current_state_ids(
room_id
)
if await self._event_auth_handler.has_restricted_join_rules(
state_ids, room_version
):
Expand Down Expand Up @@ -1552,6 +1554,9 @@ async def _sync_partial_state_room(
success = await self.store.clear_partial_state_room(room_id)
if success:
logger.info("State resync complete for %s", room_id)
self._storage_controllers.state.notify_room_un_partial_stated(
room_id
)

# TODO(faster_joins) update room stats and user directory?
return
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ async def get_state_events(
)

if membership == Membership.JOIN:
state_ids = await self.store.get_filtered_current_state_ids(
state_ids = await self._state_storage_controller.get_current_state_ids(
room_id, state_filter=state_filter
)
room_state = await self.store.get_events(state_ids.values())
Expand Down
6 changes: 5 additions & 1 deletion synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class BasePresenceHandler(abc.ABC):
def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.presence_router = hs.get_presence_router()
self.state = hs.get_state_handler()
self.is_mine_id = hs.is_mine_id
Expand Down Expand Up @@ -1348,7 +1349,10 @@ async def _unsafe_process(self) -> None:
self._event_pos,
room_max_stream_ordering,
)
max_pos, deltas = await self.store.get_current_state_deltas(
(
max_pos,
deltas,
) = await self._storage_controllers.state.get_current_state_deltas(
self._event_pos, room_max_stream_ordering
)

Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class LoginDict(TypedDict):
class RegistrationHandler:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.clock = hs.get_clock()
self.hs = hs
self.auth = hs.get_auth()
Expand Down Expand Up @@ -528,7 +529,7 @@ async def _join_rooms(self, user_id: str) -> None:

if requires_invite:
# If the server is in the room, check if the room is public.
state = await self.store.get_filtered_current_state_ids(
state = await self._storage_controllers.state.get_current_state_ids(
room_id, StateFilter.from_types([(EventTypes.JoinRules, "")])
)

Expand Down
13 changes: 9 additions & 4 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class EventContext:
class RoomCreationHandler:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.hs = hs
Expand Down Expand Up @@ -480,8 +481,10 @@ async def clone_existing_room(
if room_type == RoomTypes.SPACE:
types_to_copy.append((EventTypes.SpaceChild, None))

old_room_state_ids = await self.store.get_filtered_current_state_ids(
old_room_id, StateFilter.from_types(types_to_copy)
old_room_state_ids = (
await self._storage_controllers.state.get_current_state_ids(
old_room_id, StateFilter.from_types(types_to_copy)
)
)
# map from event_id to BaseEvent
old_room_state_events = await self.store.get_events(old_room_state_ids.values())
Expand Down Expand Up @@ -558,8 +561,10 @@ async def clone_existing_room(
)

# Transfer membership events
old_room_member_state_ids = await self.store.get_filtered_current_state_ids(
old_room_id, StateFilter.from_types([(EventTypes.Member, None)])
old_room_member_state_ids = (
await self._storage_controllers.state.get_current_state_ids(
old_room_id, StateFilter.from_types([(EventTypes.Member, None)])
)
)

# map from event_id to BaseEvent
Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/room_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
class RoomListHandler:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.hs = hs
self.enable_room_list_search = hs.config.roomdirectory.enable_room_list_search
self.response_cache: ResponseCache[
Expand Down Expand Up @@ -274,7 +275,7 @@ async def generate_room_entry(
if aliases:
result["aliases"] = aliases

current_state_ids = await self.store.get_current_state_ids(
current_state_ids = await self._storage_controllers.state.get_current_state_ids(
room_id, on_invalidate=cache_context.invalidate
)

Expand Down
5 changes: 4 additions & 1 deletion synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.auth = hs.get_auth()
self.state_handler = hs.get_state_handler()
self.config = hs.config
Expand Down Expand Up @@ -994,7 +995,9 @@ async def _should_perform_remote_join(
# If the host is in the room, but not one of the authorised hosts
# for restricted join rules, a remote join must be used.
room_version = await self.store.get_room_version(room_id)
current_state_ids = await self.store.get_current_state_ids(room_id)
current_state_ids = await self._storage_controllers.state.get_current_state_ids(
room_id
)

# If restricted join rules are not being used, a local join can always
# be used.
Expand Down
11 changes: 8 additions & 3 deletions synapse/handlers/room_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class RoomSummaryHandler:
def __init__(self, hs: "HomeServer"):
self._event_auth_handler = hs.get_event_auth_handler()
self._store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self._event_serializer = hs.get_event_client_serializer()
self._server_name = hs.hostname
self._federation_client = hs.get_federation_client()
Expand Down Expand Up @@ -537,7 +538,7 @@ async def _is_local_room_accessible(
Returns:
True if the room is accessible to the requesting user or server.
"""
state_ids = await self._store.get_current_state_ids(room_id)
state_ids = await self._storage_controllers.state.get_current_state_ids(room_id)

# If there's no state for the room, it isn't known.
if not state_ids:
Expand Down Expand Up @@ -702,7 +703,9 @@ async def _build_room_entry(self, room_id: str, for_federation: bool) -> JsonDic
# there should always be an entry
assert stats is not None, "unable to retrieve stats for %s" % (room_id,)

current_state_ids = await self._store.get_current_state_ids(room_id)
current_state_ids = await self._storage_controllers.state.get_current_state_ids(
room_id
)
create_event = await self._store.get_event(
current_state_ids[(EventTypes.Create, "")]
)
Expand Down Expand Up @@ -760,7 +763,9 @@ async def _get_child_events(self, room_id: str) -> Iterable[EventBase]:
"""

# look for child rooms/spaces.
current_state_ids = await self._store.get_current_state_ids(room_id)
current_state_ids = await self._storage_controllers.state.get_current_state_ids(
room_id
)

events = await self._store.get_events_as_list(
[
Expand Down
6 changes: 5 additions & 1 deletion synapse/handlers/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class StatsHandler:
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.state = hs.get_state_handler()
self.server_name = hs.hostname
self.clock = hs.get_clock()
Expand Down Expand Up @@ -105,7 +106,10 @@ async def _unsafe_process(self) -> None:
logger.debug(
"Processing room stats %s->%s", self.pos, room_max_stream_ordering
)
max_pos, deltas = await self.store.get_current_state_deltas(
(
max_pos,
deltas,
) = await self._storage_controllers.state.get_current_state_deltas(
self.pos, room_max_stream_ordering
)

Expand Down
13 changes: 9 additions & 4 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,10 @@ async def _load_filtered_recents(
# ensure that we always include current state in the timeline
current_state_ids: FrozenSet[str] = frozenset()
if any(e.is_state() for e in recents):
current_state_ids_map = await self.store.get_current_state_ids(
room_id
current_state_ids_map = (
await self._state_storage_controller.get_current_state_ids(
room_id
)
)
current_state_ids = frozenset(current_state_ids_map.values())

Expand Down Expand Up @@ -574,8 +576,11 @@ async def _load_filtered_recents(
# ensure that we always include current state in the timeline
current_state_ids = frozenset()
if any(e.is_state() for e in loaded_recents):
current_state_ids_map = await self.store.get_current_state_ids(
room_id
# FIXME(faster_joins): We use the partial state here as
# we don't want to block `/sync` on finishing a lazy join.
# Is this the correct way of doing it?
current_state_ids_map = (
await self.store.get_partial_current_state_ids(room_id)
)
current_state_ids = frozenset(current_state_ids_map.values())

Expand Down
6 changes: 5 additions & 1 deletion synapse/handlers/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def __init__(self, hs: "HomeServer"):
super().__init__(hs)

self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
Expand Down Expand Up @@ -174,7 +175,10 @@ async def _unsafe_process(self) -> None:
logger.debug(
"Processing user stats %s->%s", self.pos, room_max_stream_ordering
)
max_pos, deltas = await self.store.get_current_state_deltas(
(
max_pos,
deltas,
) = await self._storage_controllers.state.get_current_state_deltas(
self.pos, room_max_stream_ordering
)

Expand Down
19 changes: 8 additions & 11 deletions synapse/module_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ def __init__(self, hs: "HomeServer", auth_handler: AuthHandler) -> None:
self._store: Union[
DataStore, "GenericWorkerSlavedStore"
] = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self._auth = hs.get_auth()
self._auth_handler = auth_handler
self._server_name = hs.hostname
Expand Down Expand Up @@ -911,7 +912,7 @@ def get_state_events_in_room(
The filtered state events in the room.
"""
state_ids = yield defer.ensureDeferred(
self._store.get_filtered_current_state_ids(
self._storage_controllers.state.get_current_state_ids(
room_id=room_id, state_filter=StateFilter.from_types(types)
)
)
Expand Down Expand Up @@ -1289,20 +1290,16 @@ async def get_room_state(
# regardless of their state key
]
"""
state_filter = None
if event_filter:
# If a filter was provided, turn it into a StateFilter and retrieve a filtered
# view of the state.
state_filter = StateFilter.from_types(event_filter)
state_ids = await self._store.get_filtered_current_state_ids(
room_id,
state_filter,
)
else:
# If no filter was provided, get the whole state. We could also reuse the call
# to get_filtered_current_state_ids above, with `state_filter = StateFilter.all()`,
# but get_filtered_current_state_ids isn't cached and `get_current_state_ids`
# is, so using the latter when we can is better for perf.
state_ids = await self._store.get_current_state_ids(room_id)

state_ids = await self._storage_controllers.state.get_current_state_ids(
room_id,
state_filter,
)

state_events = await self._store.get_events(state_ids.values())

Expand Down
4 changes: 3 additions & 1 deletion synapse/push/mailer.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,9 @@ async def send_notification_mail(
user_display_name = user_id

async def _fetch_room_state(room_id: str) -> None:
room_state = await self.store.get_current_state_ids(room_id)
room_state = await self._state_storage_controller.get_current_state_ids(
room_id
)
state_by_room[room_id] = room_state

# Run at most 3 of these at once: sync does 10 at a time but email
Expand Down
3 changes: 2 additions & 1 deletion synapse/rest/admin/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ class RoomStateRestServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.clock = hs.get_clock()
self._event_serializer = hs.get_event_client_serializer()

Expand All @@ -430,7 +431,7 @@ async def on_GET(
if not ret:
raise NotFoundError("Room not found")

event_ids = await self.store.get_current_state_ids(room_id)
event_ids = await self._storage_controllers.state.get_current_state_ids(room_id)
events = await self.store.get_events(event_ids.values())
now = self.clock.time_msec()
room_state = self._event_serializer.serialize_events(events.values(), now)
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def _invalidate_state_caches(

# Purge other caches based on room state.
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
self._attempt_to_invalidate_cache("get_current_state_ids", (room_id,))
self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))

def _attempt_to_invalidate_cache(
self, cache_name: str, key: Optional[Collection[Any]]
Expand Down
Loading

0 comments on commit 888a29f

Please sign in to comment.