diff --git a/changelog.d/16311.misc b/changelog.d/16311.misc new file mode 100644 index 000000000000..4f266c1fb029 --- /dev/null +++ b/changelog.d/16311.misc @@ -0,0 +1 @@ +Delete device messages asynchronously and in staged batches using the task scheduler. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 9d240ad4ee30..e2ae3da67ebc 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -91,9 +91,14 @@ def __init__(self, hs: "HomeServer"): self._query_appservices_for_keys = ( hs.config.experimental.msc3984_appservice_key_query ) + self._task_scheduler = hs.get_task_scheduler() self.device_list_updater = DeviceListWorkerUpdater(hs) + self._task_scheduler.register_action( + self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME + ) + @trace async def get_devices_by_user(self, user_id: str) -> List[JsonDict]: """ @@ -383,6 +388,32 @@ async def handle_room_un_partial_stated(self, room_id: str) -> None: "Trying handling device list state for partial join: not supported on workers." ) + DEVICE_MSGS_DELETE_BATCH_LIMIT = 100 + + async def _delete_device_messages( + self, + task: ScheduledTask, + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: + """Scheduler task to delete device messages in batch of `DEVICE_MSGS_DELETE_BATCH_LIMIT`.""" + assert task.params is not None + user_id = task.params["user_id"] + device_id = task.params["device_id"] + up_to_stream_id = task.params["up_to_stream_id"] + + res = await self.store.delete_messages_for_device( + user_id=user_id, + device_id=device_id, + up_to_stream_id=up_to_stream_id, + limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, + ) + + if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: + return TaskStatus.COMPLETE, None, None + else: + # There is probably still device messages to be deleted, let's keep the task active and it will be run + # again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running). + return TaskStatus.ACTIVE, None, None + class DeviceHandler(DeviceWorkerHandler): device_list_updater: "DeviceListUpdater" @@ -394,7 +425,6 @@ def __init__(self, hs: "HomeServer"): self._account_data_handler = hs.get_account_data_handler() self._storage_controllers = hs.get_storage_controllers() self.db_pool = hs.get_datastores().main.db_pool - self._task_scheduler = hs.get_task_scheduler() self.device_list_updater = DeviceListUpdater(hs, self) @@ -428,10 +458,6 @@ def __init__(self, hs: "HomeServer"): self._delete_stale_devices, ) - self._task_scheduler.register_action( - self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME - ) - def _check_device_name_length(self, name: Optional[str]) -> None: """ Checks whether a device name is longer than the maximum allowed length. @@ -590,32 +616,6 @@ async def delete_devices(self, user_id: str, device_ids: List[str]) -> None: await self.notify_device_update(user_id, device_ids) - DEVICE_MSGS_DELETE_BATCH_LIMIT = 100 - - async def _delete_device_messages( - self, - task: ScheduledTask, - ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: - """Scheduler task to delete device messages in batch of `DEVICE_MSGS_DELETE_BATCH_LIMIT`.""" - assert task.params is not None - user_id = task.params["user_id"] - device_id = task.params["device_id"] - up_to_stream_id = task.params["up_to_stream_id"] - - res = await self.store.delete_messages_for_device( - user_id=user_id, - device_id=device_id, - up_to_stream_id=up_to_stream_id, - limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, - ) - - if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: - return TaskStatus.COMPLETE, None, None - else: - # There is probably still device messages to be deleted, let's keep the task active and it will be run - # again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running). - return TaskStatus.ACTIVE, None, None - async def update_device(self, user_id: str, device_id: str, content: dict) -> None: """Update the given device