Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add a storage method for returning all current presence from all users #9650

Merged
merged 4 commits into from
Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9650.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a storage method for pulling all current user presence state from the database.
11 changes: 9 additions & 2 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1906,6 +1906,7 @@ def simple_select_list_paginate_txn(
retcols: Iterable[str],
filters: Optional[Dict[str, Any]] = None,
keyvalues: Optional[Dict[str, Any]] = None,
exclude_keyvalues: Optional[Dict[str, Any]] = None,
order_direction: str = "ASC",
) -> List[Dict[str, Any]]:
"""
Expand All @@ -1929,7 +1930,10 @@ def simple_select_list_paginate_txn(
apply a WHERE ? LIKE ? clause.
keyvalues:
column names and values to select the rows with, or None to not
apply a WHERE clause.
apply a WHERE key = value clause.
exclude_keyvalues:
column names and values to exclude rows with, or None to not
apply a WHERE key != value clause.
order_direction: Whether the results should be ordered "ASC" or "DESC".

Returns:
Expand All @@ -1938,7 +1942,7 @@ def simple_select_list_paginate_txn(
if order_direction not in ["ASC", "DESC"]:
raise ValueError("order_direction must be one of 'ASC' or 'DESC'.")

where_clause = "WHERE " if filters or keyvalues else ""
where_clause = "WHERE " if filters or keyvalues or exclude_keyvalues else ""
arg_list = [] # type: List[Any]
if filters:
where_clause += " AND ".join("%s LIKE ?" % (k,) for k in filters)
Expand All @@ -1947,6 +1951,9 @@ def simple_select_list_paginate_txn(
if keyvalues:
where_clause += " AND ".join("%s = ?" % (k,) for k in keyvalues)
arg_list += list(keyvalues.values())
if exclude_keyvalues:
where_clause += " AND ".join("%s != ?" % (k,) for k in exclude_keyvalues)
arg_list += list(exclude_keyvalues.values())

sql = "SELECT %s FROM %s %s ORDER BY %s %s LIMIT ? OFFSET ?" % (
", ".join(retcols),
Expand Down
60 changes: 59 additions & 1 deletion synapse/storage/databases/main/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Tuple
from typing import Dict, List, Tuple

from synapse.api.presence import UserPresenceState
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
Expand Down Expand Up @@ -157,5 +157,63 @@ async def get_presence_for_users(self, user_ids):

return {row["user_id"]: UserPresenceState(**row) for row in rows}

async def get_presence_for_all_users(
self,
include_offline: bool = True,
) -> Dict[str, UserPresenceState]:
"""Retrieve the current presence state for all users.

Note that the presence_stream table is culled frequently, so it should only
contain the latest presence state for each user.

Args:
include_offline: Whether to include offline presence states

Returns:
A dict of user IDs to their current UserPresenceState.
"""
users_to_state = {}

exclude_keyvalues = None
if not include_offline:
# Exclude offline presence state
exclude_keyvalues = {"state": "offline"}

# This may be a very heavy database query.
# We paginate in order to not block a database connection.
limit = 100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100 seems kind of low, I think we usually batch by 1000?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only other usage of this function I see uses a limit of 100:

def _get_statistics_for_subject_txn(
self, txn, stats_type, stats_id, start, size=100
):
"""
Transaction-bound version of L{get_statistics_for_subject}.
"""
table, id_col = TYPE_TO_TABLE[stats_type]
selected_columns = list(
ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type]
)
slice_list = self.db_pool.simple_select_list_paginate_txn(
txn,
table + "_historical",
"end_ts",
start,
size,
retcols=selected_columns + ["bucket_size", "end_ts"],
keyvalues={id_col: stats_id},
order_direction="DESC",
)
return slice_list

But intuitively 1000 is probably also fine, given the overhead of spinning up another query?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. We can tweak it if we need to.

offset = 0
while True:
rows = await self.db_pool.runInteraction(
"get_presence_for_all_users",
self.db_pool.simple_select_list_paginate_txn,
"presence_stream",
orderby="stream_id",
start=offset,
limit=limit,
exclude_keyvalues=exclude_keyvalues,
retcols=(
"user_id",
"state",
"last_active_ts",
"last_federation_update_ts",
"last_user_sync_ts",
"status_msg",
"currently_active",
),
order_direction="ASC",
)

for row in rows:
users_to_state[row["user_id"]] = UserPresenceState(**row)

# We've run out of updates to query
if len(rows) < limit:
break

offset += limit

return users_to_state

def get_current_presence_token(self):
return self._presence_id_gen.get_current_token()