Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Properly handle unknown results for the stream change cache. #14592

Merged
merged 3 commits into from
Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14592.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a long-standing bug where...
clokep marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 2 additions & 2 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1764,14 +1764,14 @@ async def _filter_all_presence_updates_for_user(
Returns:
A list of presence states for the given user to receive.
"""
updated_users = None
if from_key:
# Only return updates since the last sync
updated_users = self.store.presence_stream_cache.get_all_entities_changed(
from_key
)
if not updated_users:
updated_users = []
Comment on lines -1772 to -1773
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_filter_app_presence_updates_for_user is only called if additional_users_interested_in == PresenceRouter.ALL_USERS, so it seems reasonable here to treated this the same as not having a from_key (and returning info on all users).


if updated_users is not None:
# Get the actual presence update for each change
users_to_state = await self.get_presence_handler().current_state_for_users(
updated_users
Expand Down
33 changes: 19 additions & 14 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -842,12 +842,11 @@ async def get_users_whose_devices_changed(
user_ids, from_key
)

if not user_ids_to_check:
# If an empty set was returned, there's nothing to do.
if user_ids_to_check is not None and not user_ids_to_check:
return set()

def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]:
changes: Set[str] = set()

stream_id_where_clause = "stream_id > ?"
sql_args = [from_key]

Expand All @@ -858,19 +857,25 @@ def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]:
sql = f"""
SELECT DISTINCT user_id FROM device_lists_stream
WHERE {stream_id_where_clause}
AND
"""

# Query device changes with a batch of users at a time
# Assertion for mypy's benefit; see also
# https://mypy.readthedocs.io/en/stable/common_issues.html#narrowing-and-inner-functions
assert user_ids_to_check is not None
for chunk in batch_iter(user_ids_to_check, 100):
clause, args = make_in_list_sql_clause(
txn.database_engine, "user_id", chunk
)
txn.execute(sql + clause, sql_args + args)
changes.update(user_id for user_id, in txn)
# If the stream change cache gave us no information, fetch *all*
# users between the stream IDs.
if user_ids_to_check is None:
txn.execute(sql, sql_args)
return {user_id for user_id, in txn}

# Otherwise, fetch changes for the given users.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to additionally query the database if user_ids_check is non-None? Didn't the stream change cache give us the needed info?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't think we need to do that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit more complicated than initially thought as the database query is what is used to cap things at a max stream token (i.e. we only check the cache for if things have changed after from_key, but the changes might have been before or after to_key -- the cache doesn't know).

else:
changes: Set[str] = set()

# Query device changes with a batch of users at a time
for chunk in batch_iter(user_ids_to_check, 100):
clause, args = make_in_list_sql_clause(
txn.database_engine, "user_id", chunk
)
txn.execute(sql + " AND " + clause, sql_args + args)
changes.update(user_id for user_id, in txn)

return changes

Expand Down