Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix SQL delta file taking a long time to run #9516

Merged
merged 4 commits into from
Mar 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9516.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug where users' pushers were not all deleted when they deactivated their account.
2 changes: 2 additions & 0 deletions scripts/synapse_port_db
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ from synapse.storage.databases.main.events_bg_updates import (
from synapse.storage.databases.main.media_repository import (
MediaRepositoryBackgroundUpdateStore,
)
from synapse.storage.databases.main.pusher import PusherWorkerStore
from synapse.storage.databases.main.registration import (
RegistrationBackgroundUpdateStore,
find_max_generated_user_id_localpart,
Expand Down Expand Up @@ -177,6 +178,7 @@ class Store(
UserDirectoryBackgroundUpdateStore,
EndToEndKeyBackgroundStore,
StatsStore,
PusherWorkerStore,
):
def execute(self, f, *args, **kwargs):
return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
Expand Down
53 changes: 53 additions & 0 deletions synapse/storage/databases/main/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"
db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
)

self.db_pool.updates.register_background_update_handler(
"remove_deactivated_pushers",
self._remove_deactivated_pushers,
)

def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]:
"""JSON-decode the data in the rows returned from the `pushers` table

Expand Down Expand Up @@ -284,6 +289,54 @@ async def set_throttle_params(
lock=False,
)

async def _remove_deactivated_pushers(self, progress: dict, batch_size: int) -> int:
"""A background update that deletes all pushers for deactivated users.

Note that we don't proacively tell the pusherpool that we've deleted
these (just because its a bit off a faff to do from here), but they will
get cleaned up at the next restart
"""

last_user = progress.get("last_user", "")

def _delete_pushers(txn) -> int:

sql = """
SELECT name FROM users
WHERE deactivated = ? and name > ?
ORDER BY name ASC
LIMIT ?
"""

txn.execute(sql, (1, last_user, batch_size))
users = [row[0] for row in txn]

self.db_pool.simple_delete_many_txn(
txn,
table="pushers",
column="user_name",
iterable=users,
keyvalues={},
)

if users:
self.db_pool.updates._background_update_progress_txn(
txn, "remove_deactivated_pushers", {"last_user": users[-1]}
)

return len(users)

number_deleted = await self.db_pool.runInteraction(
"_remove_deactivated_pushers", _delete_pushers
)

if number_deleted < batch_size:
await self.db_pool.updates._end_background_update(
"remove_deactivated_pushers"
)

return number_deleted


class PusherStore(PusherWorkerStore):
def get_pushers_stream_token(self) -> int:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
*/


-- We may not have deleted all pushers for deactivated accounts. Do so now.
--
-- Note: We don't bother updating the `deleted_pushers` table as it's just use
-- to stop pushers on workers, and that will happen when they get next restarted.
DELETE FROM pushers WHERE user_name IN (SELECT name FROM users WHERE deactivated = 1);
-- We may not have deleted all pushers for deactivated accounts, so we set up a
-- background job to delete them.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(5908, 'remove_deactivated_pushers', '{}');