Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Async/await for background updates #6647

Merged
merged 5 commits into from
Jan 7, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6647.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Port core background update routines to async/await.
36 changes: 20 additions & 16 deletions synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

import logging
from typing import Optional

from canonicaljson import json

Expand Down Expand Up @@ -97,15 +98,14 @@ def __init__(self, hs, database):
def start_doing_background_updates(self):
run_as_background_process("background_updates", self.run_background_updates)

@defer.inlineCallbacks
def run_background_updates(self, sleep=True):
async def run_background_updates(self, sleep=True):
logger.info("Starting background schema updates")
while True:
if sleep:
yield self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)

try:
result = yield self.do_next_background_update(
result = await self.do_next_background_update(
self.BACKGROUND_UPDATE_DURATION_MS
)
except Exception:
Expand Down Expand Up @@ -170,20 +170,21 @@ async def has_completed_background_update(self, update_name) -> bool:

return not update_exists

@defer.inlineCallbacks
def do_next_background_update(self, desired_duration_ms):
async def do_next_background_update(
self, desired_duration_ms: float
) -> Optional[int]:
"""Does some amount of work on the next queued background update

Returns once some amount of work is done.

Args:
desired_duration_ms(float): How long we want to spend
updating.
Returns:
A deferred that completes once some amount of work is done.
The deferred will have a value of None if there is currently
no more work to do.
None if there is no more work to do, otherwise an int
"""
if not self._background_update_queue:
updates = yield self.db.simple_select_list(
updates = await self.db.simple_select_list(
"background_updates",
keyvalues=None,
retcols=("update_name", "depends_on"),
Expand All @@ -201,11 +202,12 @@ def do_next_background_update(self, desired_duration_ms):
update_name = self._background_update_queue.pop(0)
self._background_update_queue.append(update_name)

res = yield self._do_background_update(update_name, desired_duration_ms)
res = await self._do_background_update(update_name, desired_duration_ms)
return res

@defer.inlineCallbacks
def _do_background_update(self, update_name, desired_duration_ms):
async def _do_background_update(
self, update_name: str, desired_duration_ms: float
) -> int:
logger.info("Starting update batch on background update '%s'", update_name)

update_handler = self._background_update_handlers[update_name]
Expand All @@ -225,7 +227,7 @@ def _do_background_update(self, update_name, desired_duration_ms):
else:
batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE

progress_json = yield self.db.simple_select_one_onecol(
progress_json = await self.db.simple_select_one_onecol(
"background_updates",
keyvalues={"update_name": update_name},
retcol="progress_json",
Expand All @@ -234,7 +236,7 @@ def _do_background_update(self, update_name, desired_duration_ms):
progress = json.loads(progress_json)

time_start = self._clock.time_msec()
items_updated = yield update_handler(progress, batch_size)
items_updated = await update_handler(progress, batch_size)
time_stop = self._clock.time_msec()

duration_ms = time_stop - time_start
Expand Down Expand Up @@ -263,7 +265,9 @@ def register_background_update_handler(self, update_name, update_handler):
* A dict of the current progress
* An integer count of the number of items to update in this batch.

The handler should return a deferred integer count of items updated.
The handler should return a deferred or coroutine which returns an integer count
of items updated.

The handler is responsible for updating the progress of the update.

Args:
Expand Down