Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Update the device list cache when keys/query is called
Browse files Browse the repository at this point in the history
We only cache it when it makes sense to do so.
  • Loading branch information
JorikSchellekens committed Jul 16, 2019
1 parent d863213 commit 700f1ca
Show file tree
Hide file tree
Showing 2 changed files with 231 additions and 77 deletions.
160 changes: 88 additions & 72 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from twisted.internet import defer

import synapse.logging.opentracing as opentracing
from synapse.api import errors
from synapse.api.constants import EventTypes
from synapse.api.errors import (
Expand Down Expand Up @@ -211,12 +212,12 @@ def __init__(self, hs):

self.federation_sender = hs.get_federation_sender()

self._edu_updater = DeviceListEduUpdater(hs, self)
self.device_list_updater = DeviceListUpdater(hs, self)

federation_registry = hs.get_federation_registry()

federation_registry.register_edu_handler(
"m.device_list_update", self._edu_updater.incoming_device_list_update
"m.device_list_update", self.device_list_updater.incoming_device_list_update
)
federation_registry.register_query_handler(
"user_devices", self.on_federation_query_user_devices
Expand Down Expand Up @@ -430,7 +431,7 @@ def _update_device_from_client_ips(device, client_ips):
device.update({"last_seen_ts": ip.get("last_seen"), "last_seen_ip": ip.get("ip")})


class DeviceListEduUpdater(object):
class DeviceListUpdater(object):
"Handles incoming device list updates from federation and updates the DB"

def __init__(self, hs, device_handler):
Expand Down Expand Up @@ -523,75 +524,7 @@ def _handle_device_updates(self, user_id):
logger.debug("Need to re-sync devices for %r? %r", user_id, resync)

if resync:
# Fetch all devices for the user.
origin = get_domain_from_id(user_id)
try:
result = yield self.federation.query_user_devices(origin, user_id)
except (
NotRetryingDestination,
RequestSendFailed,
HttpResponseException,
):
# TODO: Remember that we are now out of sync and try again
# later
logger.warn("Failed to handle device list update for %s", user_id)
# We abort on exceptions rather than accepting the update
# as otherwise synapse will 'forget' that its device list
# is out of date. If we bail then we will retry the resync
# next time we get a device list update for this user_id.
# This makes it more likely that the device lists will
# eventually become consistent.
return
except FederationDeniedError as e:
logger.info(e)
return
except Exception:
# TODO: Remember that we are now out of sync and try again
# later
logger.exception(
"Failed to handle device list update for %s", user_id
)
return

stream_id = result["stream_id"]
devices = result["devices"]

# If the remote server has more than ~1000 devices for this user
# we assume that something is going horribly wrong (e.g. a bot
# that logs in and creates a new device every time it tries to
# send a message). Maintaining lots of devices per user in the
# cache can cause serious performance issues as if this request
# takes more than 60s to complete, internal replication from the
# inbound federation worker to the synapse master may time out
# causing the inbound federation to fail and causing the remote
# server to retry, causing a DoS. So in this scenario we give
# up on storing the total list of devices and only handle the
# delta instead.
if len(devices) > 1000:
logger.warn(
"Ignoring device list snapshot for %s as it has >1K devs (%d)",
user_id,
len(devices),
)
devices = []

for device in devices:
logger.debug(
"Handling resync update %r/%r, ID: %r",
user_id,
device["device_id"],
stream_id,
)

yield self.store.update_remote_device_list_cache(
user_id, devices, stream_id
)
device_ids = [device["device_id"] for device in devices]
yield self.device_handler.notify_device_update(user_id, device_ids)

# We clobber the seen updates since we've re-synced from a given
# point.
self._seen_updates[user_id] = set([stream_id])
yield self.user_device_resync(user_id)
else:
# Simply update the single device, since we know that is the only
# change (because of the single prev_id matching the current cache)
Expand Down Expand Up @@ -638,3 +571,86 @@ def _need_to_do_resync(self, user_id, updates):
stream_id_in_updates.add(stream_id)

defer.returnValue(False)

@opentracing.trace_deferred
@defer.inlineCallbacks
def user_device_resync(self, user_id):
"""Fetches all devices for a user and updates the device cache with them.
Args:
user_id (String): The user's id whose device_list will be updated.
Returns:
a dict with device info as under the "devices" in the result of this
request:
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
"""
opentracing.log_kv({"message": "Doing resync to update device list."})
# Fetch all devices for the user.
origin = get_domain_from_id(user_id)
try:
result = yield self.federation.query_user_devices(origin, user_id)
except (NotRetryingDestination, RequestSendFailed, HttpResponseException):
# TODO: Remember that we are now out of sync and try again
# later
logger.warn("Failed to handle device list update for %s", user_id)
# We abort on exceptions rather than accepting the update
# as otherwise synapse will 'forget' that its device list
# is out of date. If we bail then we will retry the resync
# next time we get a device list update for this user_id.
# This makes it more likely that the device lists will
# eventually become consistent.
return
except FederationDeniedError as e:
opentracing.set_tag("error", True)
opentracing.log_kv({"reason": "FederationDeniedError"})
logger.info(e)
return
except Exception as e:
# TODO: Remember that we are now out of sync and try again
# later
opentracing.set_tag("error", True)
opentracing.log_kv(
{"message": "Exception raised by federation request", "exception": e}
)
logger.exception("Failed to handle device list update for %s", user_id)
return
opentracing.log_kv({"result": result})
stream_id = result["stream_id"]
devices = result["devices"]

# If the remote server has more than ~1000 devices for this user
# we assume that something is going horribly wrong (e.g. a bot
# that logs in and creates a new device every time it tries to
# send a message). Maintaining lots of devices per user in the
# cache can cause serious performance issues as if this request
# takes more than 60s to complete, internal replication from the
# inbound federation worker to the synapse master may time out
# causing the inbound federation to fail and causing the remote
# server to retry, causing a DoS. So in this scenario we give
# up on storing the total list of devices and only handle the
# delta instead.
if len(devices) > 1000:
logger.warn(
"Ignoring device list snapshot for %s as it has >1K devs (%d)",
user_id,
len(devices),
)
devices = []

for device in devices:
logger.debug(
"Handling resync update %r/%r, ID: %r",
user_id,
device["device_id"],
stream_id,
)

yield self.store.update_remote_device_list_cache(user_id, devices, stream_id)
device_ids = [device["device_id"] for device in devices]
yield self.device_handler.notify_device_update(user_id, device_ids)

# We clobber the seen updates since we've re-synced from a given
# point.
self._seen_updates[user_id] = set([stream_id])

defer.returnValue(result)
Loading

0 comments on commit 700f1ca

Please sign in to comment.