Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Refactor get_events_from_store_or_dest to return a dict (#6501)
Browse files Browse the repository at this point in the history
* commit 'c3dda2874':
  Refactor get_events_from_store_or_dest to return a dict (#6501)
  • Loading branch information
anoadragon453 committed Mar 19, 2020
2 parents 5d68a2f + c3dda28 commit 0d60ea6
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 29 deletions.
1 change: 1 addition & 0 deletions changelog.d/6501.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor get_events_from_store_or_dest to return a dict.
44 changes: 15 additions & 29 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import itertools
import logging

from six.moves import range

from prometheus_client import Counter

from twisted.internet import defer
Expand All @@ -41,7 +39,7 @@
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.utils import log_function
from synapse.util import unwrapFirstError
from synapse.util import batch_iter, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.retryutils import NotRetryingDestination

Expand Down Expand Up @@ -331,19 +329,19 @@ def get_state_for_room(self, destination, room_id, event_id):
state_event_ids = result["pdu_ids"]
auth_event_ids = result.get("auth_chain_ids", [])

fetched_events, failed_to_fetch = yield self.get_events_from_store_or_dest(
destination, room_id, set(state_event_ids + auth_event_ids)
desired_events = set(state_event_ids + auth_event_ids)
event_map = yield self.get_events_from_store_or_dest(
destination, room_id, desired_events
)

failed_to_fetch = desired_events - event_map.keys()
if failed_to_fetch:
logger.warning(
"Failed to fetch missing state/auth events for %s: %s",
room_id,
failed_to_fetch,
)

event_map = {ev.event_id: ev for ev in fetched_events}

pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map]
auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map]

Expand All @@ -358,23 +356,18 @@ def get_events_from_store_or_dest(self, destination, room_id, event_ids):
Args:
destination (str)
room_id (str)
event_ids (list)
event_ids (Iterable[str])
Returns:
Deferred: A deferred resolving to a 2-tuple where the first is a list of
events and the second is a list of event ids that we failed to fetch.
Deferred[dict[str, EventBase]]: A deferred resolving to a map
from event_id to event
"""
seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
signed_events = list(seen_events.values())

failed_to_fetch = set()
fetched_events = yield self.store.get_events(event_ids, allow_rejected=True)

missing_events = set(event_ids)
for k in seen_events:
missing_events.discard(k)
missing_events = set(event_ids) - fetched_events.keys()

if not missing_events:
return signed_events, failed_to_fetch
return fetched_events

logger.debug(
"Fetching unknown state/auth events %s for room %s",
Expand All @@ -384,11 +377,8 @@ def get_events_from_store_or_dest(self, destination, room_id, event_ids):

room_version = yield self.store.get_room_version(room_id)

batch_size = 20
missing_events = list(missing_events)
for i in range(0, len(missing_events), batch_size):
batch = set(missing_events[i : i + batch_size])

# XXX 20 requests at once? really?
for batch in batch_iter(missing_events, 20):
deferreds = [
run_in_background(
self.get_pdu,
Expand All @@ -404,13 +394,9 @@ def get_events_from_store_or_dest(self, destination, room_id, event_ids):
)
for success, result in res:
if success and result:
signed_events.append(result)
batch.discard(result.event_id)

# We removed all events we successfully fetched from `batch`
failed_to_fetch.update(batch)
fetched_events[result.event_id] = result

return signed_events, failed_to_fetch
return fetched_events

@defer.inlineCallbacks
@log_function
Expand Down

0 comments on commit 0d60ea6

Please sign in to comment.