From b3458bc8ed7faf69bb13287ab61eb360db629d8e Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 17 Mar 2021 18:10:14 +0000 Subject: [PATCH 1/4] Add an exclude_keyvalues option to simple_select_list_paginate_txn Essentially does the same as the existing keyvalues option, but excludes rows with the given column and value. --- synapse/storage/database.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index f1ba529a2d76..d56f3a93ed25 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1906,6 +1906,7 @@ def simple_select_list_paginate_txn( retcols: Iterable[str], filters: Optional[Dict[str, Any]] = None, keyvalues: Optional[Dict[str, Any]] = None, + exclude_keyvalues: Optional[Dict[str, Any]] = None, order_direction: str = "ASC", ) -> List[Dict[str, Any]]: """ @@ -1929,7 +1930,10 @@ def simple_select_list_paginate_txn( apply a WHERE ? LIKE ? clause. keyvalues: column names and values to select the rows with, or None to not - apply a WHERE clause. + apply a WHERE key = value clause. + exclude_keyvalues: + column names and values to exclude rows with, or None to not + apply a WHERE key != value clause. order_direction: Whether the results should be ordered "ASC" or "DESC". Returns: @@ -1938,7 +1942,7 @@ def simple_select_list_paginate_txn( if order_direction not in ["ASC", "DESC"]: raise ValueError("order_direction must be one of 'ASC' or 'DESC'.") - where_clause = "WHERE " if filters or keyvalues else "" + where_clause = "WHERE " if filters or keyvalues or exclude_keyvalues else "" arg_list = [] # type: List[Any] if filters: where_clause += " AND ".join("%s LIKE ?" % (k,) for k in filters) @@ -1947,6 +1951,9 @@ def simple_select_list_paginate_txn( if keyvalues: where_clause += " AND ".join("%s = ?" % (k,) for k in keyvalues) arg_list += list(keyvalues.values()) + if exclude_keyvalues: + where_clause += " AND ".join("%s != ?" % (k,) for k in exclude_keyvalues) + arg_list += list(exclude_keyvalues.values()) sql = "SELECT %s FROM %s %s ORDER BY %s %s LIMIT ? OFFSET ?" % ( ", ".join(retcols), From 5793fa857270353a25b6245d1587055d0fbde8b1 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 17 Mar 2021 18:11:47 +0000 Subject: [PATCH 2/4] Add a storage method to get the current presence state for all users This will be useful for when PresenceRouter.get_interested_user returns "ALL". It allows for querying all current local user presencee. Note that the `presence_stream` table is culled frequently, and doesn't just grow forever like other stream tables. --- synapse/storage/databases/main/presence.py | 61 +++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 29edab34d47d..eae7065fdc14 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List, Tuple +from typing import Dict, List, Tuple from synapse.api.presence import UserPresenceState from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause @@ -157,5 +157,64 @@ async def get_presence_for_users(self, user_ids): return {row["user_id"]: UserPresenceState(**row) for row in rows} + async def get_presence_for_all_users( + self, + include_offline: bool = True, + ) -> Dict[str, UserPresenceState]: + """Retrieve the current presence state for all users. + + Note that the presence_stream table is culled frequently, so it should only + contain the latest presence state for each user. + + Args: + include_offline: Whether to include offline presence states + + Returns: + A dict of user IDs to their current UserPresenceState. + """ + users_to_state = {} + + exclude_keyvalues = {} + if not include_offline: + # Exclude offline presence state + exclude_keyvalues = {"state": "offline"} + + # This may be a very heavy database query. + # We paginate in order to not block a database connection. + limit = 100 + offset = 0 + while True: + rows = await self.db_pool.runInteraction( + "get_presence_for_all_users", + self.db_pool.simple_select_list_paginate_txn, + "presence_stream", + orderby="stream_id", + start=offset, + limit=limit, + keyvalues={}, + exclude_keyvalues=exclude_keyvalues, + retcols=( + "user_id", + "state", + "last_active_ts", + "last_federation_update_ts", + "last_user_sync_ts", + "status_msg", + "currently_active", + ), + order_direction="ASC", + ) + + for row in rows: + users_to_state[row["user_id"]] = UserPresenceState(**row) + + # We've ran out of updates to query + if len(rows) < limit: + break + + offset += limit + + return users_to_state + def get_current_presence_token(self): return self._presence_id_gen.get_current_token() From 6b6d42c0f9d06cbe297428090ae552ff6a3e5658 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 19 Mar 2021 12:06:58 +0000 Subject: [PATCH 3/4] Changelog --- changelog.d/9650.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/9650.misc diff --git a/changelog.d/9650.misc b/changelog.d/9650.misc new file mode 100644 index 000000000000..d830ead70ecc --- /dev/null +++ b/changelog.d/9650.misc @@ -0,0 +1 @@ +Add a storage method for pulling all current user presence state from the database. \ No newline at end of file From 61fcfa00ae02d7da1fd9e0792f21d98d3af641f8 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 24 Mar 2021 13:44:26 +0000 Subject: [PATCH 4/4] Address review feedback --- synapse/storage/databases/main/presence.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index eae7065fdc14..0ff693a3109f 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -174,7 +174,7 @@ async def get_presence_for_all_users( """ users_to_state = {} - exclude_keyvalues = {} + exclude_keyvalues = None if not include_offline: # Exclude offline presence state exclude_keyvalues = {"state": "offline"} @@ -191,7 +191,6 @@ async def get_presence_for_all_users( orderby="stream_id", start=offset, limit=limit, - keyvalues={}, exclude_keyvalues=exclude_keyvalues, retcols=( "user_id", @@ -208,7 +207,7 @@ async def get_presence_for_all_users( for row in rows: users_to_state[row["user_id"]] = UserPresenceState(**row) - # We've ran out of updates to query + # We've run out of updates to query if len(rows) < limit: break