Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Cache event ID to auth event IDs lookups (#8752)
Browse files Browse the repository at this point in the history
This should hopefully speed up `get_auth_chain_difference` a bit in the case of repeated state res on the same rooms.

`get_auth_chain_difference` does a breadth first walk of the auth graphs by repeatedly looking up events' auth events. Different state resolutions on the same room will end up doing a lot of the same event to auth events lookups, so by caching them we should speed things up in cases of repeated state resolutions on the same room.
  • Loading branch information
erikjohnston authored Nov 13, 2020
1 parent c2d4467 commit 4cb00d2
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 12 deletions.
1 change: 1 addition & 0 deletions changelog.d/8752.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up repeated state resolutions on the same room by caching event ID to auth event ID lookups.
82 changes: 70 additions & 12 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from synapse.storage.databases.main.signatures import SignatureWorkerStore
from synapse.types import Collection
from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache
from synapse.util.iterutils import batch_iter

logger = logging.getLogger(__name__)
Expand All @@ -40,6 +41,11 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
)

# Cache of event ID to list of auth event IDs and their depths.
self._event_auth_cache = LruCache(
500000, "_event_auth_cache", size_callback=len
) # type: LruCache[str, List[Tuple[str, int]]]

async def get_auth_chain(
self, event_ids: Collection[str], include_given: bool = False
) -> List[EventBase]:
Expand Down Expand Up @@ -84,17 +90,45 @@ def _get_auth_chain_ids_txn(
else:
results = set()

base_sql = "SELECT DISTINCT auth_id FROM event_auth WHERE "
# We pull out the depth simply so that we can populate the
# `_event_auth_cache` cache.
base_sql = """
SELECT a.event_id, auth_id, depth
FROM event_auth AS a
INNER JOIN events AS e ON (e.event_id = a.auth_id)
WHERE
"""

front = set(event_ids)
while front:
new_front = set()
for chunk in batch_iter(front, 100):
clause, args = make_in_list_sql_clause(
txn.database_engine, "event_id", chunk
)
txn.execute(base_sql + clause, args)
new_front.update(r[0] for r in txn)
# Pull the auth events either from the cache or DB.
to_fetch = [] # Event IDs to fetch from DB # type: List[str]
for event_id in chunk:
res = self._event_auth_cache.get(event_id)
if res is None:
to_fetch.append(event_id)
else:
new_front.update(auth_id for auth_id, depth in res)

if to_fetch:
clause, args = make_in_list_sql_clause(
txn.database_engine, "a.event_id", to_fetch
)
txn.execute(base_sql + clause, args)

# Note we need to batch up the results by event ID before
# adding to the cache.
to_cache = {}
for event_id, auth_event_id, auth_event_depth in txn:
to_cache.setdefault(event_id, []).append(
(auth_event_id, auth_event_depth)
)
new_front.add(auth_event_id)

for event_id, auth_events in to_cache.items():
self._event_auth_cache.set(event_id, auth_events)

new_front -= results

Expand Down Expand Up @@ -213,14 +247,38 @@ def _get_auth_chain_difference_txn(
break

# Fetch the auth events and their depths of the N last events we're
# currently walking
# currently walking, either from cache or DB.
search, chunk = search[:-100], search[-100:]
clause, args = make_in_list_sql_clause(
txn.database_engine, "a.event_id", [e_id for _, e_id in chunk]
)
txn.execute(base_sql + clause, args)

for event_id, auth_event_id, auth_event_depth in txn:
found = [] # Results found # type: List[Tuple[str, str, int]]
to_fetch = [] # Event IDs to fetch from DB # type: List[str]
for _, event_id in chunk:
res = self._event_auth_cache.get(event_id)
if res is None:
to_fetch.append(event_id)
else:
found.extend((event_id, auth_id, depth) for auth_id, depth in res)

if to_fetch:
clause, args = make_in_list_sql_clause(
txn.database_engine, "a.event_id", to_fetch
)
txn.execute(base_sql + clause, args)

# We parse the results and add the to the `found` set and the
# cache (note we need to batch up the results by event ID before
# adding to the cache).
to_cache = {}
for event_id, auth_event_id, auth_event_depth in txn:
to_cache.setdefault(event_id, []).append(
(auth_event_id, auth_event_depth)
)
found.append((event_id, auth_event_id, auth_event_depth))

for event_id, auth_events in to_cache.items():
self._event_auth_cache.set(event_id, auth_events)

for event_id, auth_event_id, auth_event_depth in found:
event_to_auth_events.setdefault(event_id, set()).add(auth_event_id)

sets = event_to_missing_sets.get(auth_event_id)
Expand Down

0 comments on commit 4cb00d2

Please sign in to comment.