Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix perf of fetching the same events many times. (#10703)
Browse files Browse the repository at this point in the history
The code to deduplicate repeated fetches of the same set of events was
N^2 (over the number of events requested), which could lead to a process
being completely wedged.

The main fix is to deduplicate the returned deferreds so we only await
on a deferred once rather than many times. Seperately, when handling the
returned events from the defrered we only add the events we care about
to the event map to be returned (so that we don't pay the price of
inserting extraneous events into the dict).
  • Loading branch information
erikjohnston authored Aug 27, 2021
1 parent 1800aab commit c4fa4f3
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 6 deletions.
1 change: 1 addition & 0 deletions changelog.d/10703.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a regression introduced in v1.41.0 which affected the performance of concurrent fetches of large sets of events, in extreme cases causing the process to hang.
29 changes: 23 additions & 6 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,16 +520,26 @@ async def _get_events_from_cache_or_db(
# We now look up if we're already fetching some of the events in the DB,
# if so we wait for those lookups to finish instead of pulling the same
# events out of the DB multiple times.
already_fetching: Dict[str, defer.Deferred] = {}
#
# Note: we might get the same `ObservableDeferred` back for multiple
# events we're already fetching, so we deduplicate the deferreds to
# avoid extraneous work (if we don't do this we can end up in a n^2 mode
# when we wait on the same Deferred N times, then try and merge the
# same dict into itself N times).
already_fetching_ids: Set[str] = set()
already_fetching_deferreds: Set[
ObservableDeferred[Dict[str, _EventCacheEntry]]
] = set()

for event_id in missing_events_ids:
deferred = self._current_event_fetches.get(event_id)
if deferred is not None:
# We're already pulling the event out of the DB. Add the deferred
# to the collection of deferreds to wait on.
already_fetching[event_id] = deferred.observe()
already_fetching_ids.add(event_id)
already_fetching_deferreds.add(deferred)

missing_events_ids.difference_update(already_fetching)
missing_events_ids.difference_update(already_fetching_ids)

if missing_events_ids:
log_ctx = current_context()
Expand Down Expand Up @@ -569,18 +579,25 @@ async def _get_events_from_cache_or_db(
with PreserveLoggingContext():
fetching_deferred.callback(missing_events)

if already_fetching:
if already_fetching_deferreds:
# Wait for the other event requests to finish and add their results
# to ours.
results = await make_deferred_yieldable(
defer.gatherResults(
already_fetching.values(),
(d.observe() for d in already_fetching_deferreds),
consumeErrors=True,
)
).addErrback(unwrapFirstError)

for result in results:
event_entry_map.update(result)
# We filter out events that we haven't asked for as we might get
# a *lot* of superfluous events back, and there is no point
# going through and inserting them all (which can take time).
event_entry_map.update(
(event_id, entry)
for event_id, entry in result.items()
if event_id in already_fetching_ids
)

if not allow_rejected:
event_entry_map = {
Expand Down

0 comments on commit c4fa4f3

Please sign in to comment.