Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Implement account status endpoints (MSC3720) (#12001)
Browse files Browse the repository at this point in the history
  • Loading branch information
babolivier and squahtx authored Feb 22, 2022
1 parent 94a396e commit 250104d
Show file tree
Hide file tree
Showing 11 changed files with 511 additions and 6 deletions.
1 change: 1 addition & 0 deletions changelog.d/12001.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement experimental support for [MSC3720](https://github.com/matrix-org/matrix-doc/pull/3720) (account status endpoints).
3 changes: 3 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,6 @@ def read_config(self, config: JsonDict, **kwargs):
# experimental support for faster joins over federation (msc2775, msc3706)
# requires a target server with msc3706_enabled enabled.
self.faster_joins_enabled: bool = experimental.get("faster_joins", False)

# MSC3720 (Account status endpoint)
self.msc3720_enabled: bool = experimental.get("msc3720_enabled", False)
60 changes: 59 additions & 1 deletion synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
from synapse.events import EventBase, builder
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.federation.transport.client import SendJoinResponse
from synapse.types import JsonDict, get_domain_from_id
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.retryutils import NotRetryingDestination
Expand Down Expand Up @@ -1610,6 +1610,64 @@ async def timestamp_to_event(
except ValueError as e:
raise InvalidResponseError(str(e))

async def get_account_status(
self, destination: str, user_ids: List[str]
) -> Tuple[JsonDict, List[str]]:
"""Retrieves account statuses for a given list of users on a given remote
homeserver.
If the request fails for any reason, all user IDs for this destination are marked
as failed.
Args:
destination: the destination to contact
user_ids: the user ID(s) for which to request account status(es)
Returns:
The account statuses, as well as the list of user IDs for which it was not
possible to retrieve a status.
"""
try:
res = await self.transport_layer.get_account_status(destination, user_ids)
except Exception:
# If the query failed for any reason, mark all the users as failed.
return {}, user_ids

statuses = res.get("account_statuses", {})
failures = res.get("failures", [])

if not isinstance(statuses, dict) or not isinstance(failures, list):
# Make sure we're not feeding back malformed data back to the caller.
logger.warning(
"Destination %s responded with malformed data to account_status query",
destination,
)
return {}, user_ids

for user_id in user_ids:
# Any account whose status is missing is a user we failed to receive the
# status of.
if user_id not in statuses and user_id not in failures:
failures.append(user_id)

# Filter out any user ID that doesn't belong to the remote server that sent its
# status (or failure).
def filter_user_id(user_id: str) -> bool:
try:
return UserID.from_string(user_id).domain == destination
except SynapseError:
# If the user ID doesn't parse, ignore it.
return False

filtered_statuses = dict(
# item is a (key, value) tuple, so item[0] is the user ID.
filter(lambda item: filter_user_id(item[0]), statuses.items())
)

filtered_failures = list(filter(filter_user_id, failures))

return filtered_statuses, filtered_failures


@attr.s(frozen=True, slots=True, auto_attribs=True)
class TimestampToEventResponse:
Expand Down
19 changes: 18 additions & 1 deletion synapse/federation/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,9 @@ async def make_query(
args: dict,
retry_on_dns_fail: bool,
ignore_backoff: bool = False,
prefix: str = FEDERATION_V1_PREFIX,
) -> JsonDict:
path = _create_v1_path("/query/%s", query_type)
path = _create_path(prefix, "/query/%s", query_type)

return await self.client.get_json(
destination=destination,
Expand Down Expand Up @@ -1247,6 +1248,22 @@ async def get_room_hierarchy_unstable(
args={"suggested_only": "true" if suggested_only else "false"},
)

async def get_account_status(
self, destination: str, user_ids: List[str]
) -> JsonDict:
"""
Args:
destination: The remote server.
user_ids: The user ID(s) for which to request account status(es).
"""
path = _create_path(
FEDERATION_UNSTABLE_PREFIX, "/org.matrix.msc3720/account_status"
)

return await self.client.post_json(
destination=destination, path=path, data={"user_ids": user_ids}
)


def _create_path(federation_prefix: str, path: str, *args: str) -> str:
"""
Expand Down
8 changes: 8 additions & 0 deletions synapse/federation/transport/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
)
from synapse.federation.transport.server.federation import (
FEDERATION_SERVLET_CLASSES,
FederationAccountStatusServlet,
FederationTimestampLookupServlet,
)
from synapse.federation.transport.server.groups_local import GROUP_LOCAL_SERVLET_CLASSES
Expand Down Expand Up @@ -336,6 +337,13 @@ def register_servlets(
):
continue

# Only allow the `/account_status` servlet if msc3720 is enabled
if (
servletclass == FederationAccountStatusServlet
and not hs.config.experimental.msc3720_enabled
):
continue

servletclass(
hs=hs,
authenticator=authenticator,
Expand Down
35 changes: 35 additions & 0 deletions synapse/federation/transport/server/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,40 @@ async def on_GET(
return 200, complexity


class FederationAccountStatusServlet(BaseFederationServerServlet):
PATH = "/query/account_status"
PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc3720"

def __init__(
self,
hs: "HomeServer",
authenticator: Authenticator,
ratelimiter: FederationRateLimiter,
server_name: str,
):
super().__init__(hs, authenticator, ratelimiter, server_name)
self._account_handler = hs.get_account_handler()

async def on_POST(
self,
origin: str,
content: JsonDict,
query: Mapping[bytes, Sequence[bytes]],
room_id: str,
) -> Tuple[int, JsonDict]:
if "user_ids" not in content:
raise SynapseError(
400, "Required parameter 'user_ids' is missing", Codes.MISSING_PARAM
)

statuses, failures = await self._account_handler.get_account_statuses(
content["user_ids"],
allow_remote=False,
)

return 200, {"account_statuses": statuses, "failures": failures}


FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
FederationSendServlet,
FederationEventServlet,
Expand Down Expand Up @@ -797,4 +831,5 @@ async def on_GET(
FederationRoomHierarchyUnstableServlet,
FederationV1SendKnockServlet,
FederationMakeKnockServlet,
FederationAccountStatusServlet,
)
144 changes: 144 additions & 0 deletions synapse/handlers/account.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TYPE_CHECKING, Dict, List, Tuple

from synapse.api.errors import Codes, SynapseError
from synapse.types import JsonDict, UserID

if TYPE_CHECKING:
from synapse.server import HomeServer


class AccountHandler:
def __init__(self, hs: "HomeServer"):
self._store = hs.get_datastore()
self._is_mine = hs.is_mine
self._federation_client = hs.get_federation_client()

async def get_account_statuses(
self,
user_ids: List[str],
allow_remote: bool,
) -> Tuple[JsonDict, List[str]]:
"""Get account statuses for a list of user IDs.
If one or more account(s) belong to remote homeservers, retrieve their status(es)
over federation if allowed.
Args:
user_ids: The list of accounts to retrieve the status of.
allow_remote: Whether to try to retrieve the status of remote accounts, if
any.
Returns:
The account statuses as well as the list of users whose statuses could not be
retrieved.
Raises:
SynapseError if a required parameter is missing or malformed, or if one of
the accounts isn't local to this homeserver and allow_remote is False.
"""
statuses = {}
failures = []
remote_users: List[UserID] = []

for raw_user_id in user_ids:
try:
user_id = UserID.from_string(raw_user_id)
except SynapseError:
raise SynapseError(
400,
f"Not a valid Matrix user ID: {raw_user_id}",
Codes.INVALID_PARAM,
)

if self._is_mine(user_id):
status = await self._get_local_account_status(user_id)
statuses[user_id.to_string()] = status
else:
if not allow_remote:
raise SynapseError(
400,
f"Not a local user: {raw_user_id}",
Codes.INVALID_PARAM,
)

remote_users.append(user_id)

if allow_remote and len(remote_users) > 0:
remote_statuses, remote_failures = await self._get_remote_account_statuses(
remote_users,
)

statuses.update(remote_statuses)
failures += remote_failures

return statuses, failures

async def _get_local_account_status(self, user_id: UserID) -> JsonDict:
"""Retrieve the status of a local account.
Args:
user_id: The account to retrieve the status of.
Returns:
The account's status.
"""
status = {"exists": False}

userinfo = await self._store.get_userinfo_by_id(user_id.to_string())

if userinfo is not None:
status = {
"exists": True,
"deactivated": userinfo.is_deactivated,
}

return status

async def _get_remote_account_statuses(
self, remote_users: List[UserID]
) -> Tuple[JsonDict, List[str]]:
"""Send out federation requests to retrieve the statuses of remote accounts.
Args:
remote_users: The accounts to retrieve the statuses of.
Returns:
The statuses of the accounts, and a list of accounts for which no status
could be retrieved.
"""
# Group remote users by destination, so we only send one request per remote
# homeserver.
by_destination: Dict[str, List[str]] = {}
for user in remote_users:
if user.domain not in by_destination:
by_destination[user.domain] = []

by_destination[user.domain].append(user.to_string())

# Retrieve the statuses and failures for remote accounts.
final_statuses: JsonDict = {}
final_failures: List[str] = []
for destination, users in by_destination.items():
statuses, failures = await self._federation_client.get_account_status(
destination,
users,
)

final_statuses.update(statuses)
final_failures += failures

return final_statuses, final_failures
33 changes: 33 additions & 0 deletions synapse/rest/client/account.py
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,36 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
return 200, response


class AccountStatusRestServlet(RestServlet):
PATTERNS = client_patterns(
"/org.matrix.msc3720/account_status$", unstable=True, releases=()
)

def __init__(self, hs: "HomeServer"):
super().__init__()
self._auth = hs.get_auth()
self._store = hs.get_datastore()
self._is_mine = hs.is_mine
self._federation_client = hs.get_federation_client()
self._account_handler = hs.get_account_handler()

async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
await self._auth.get_user_by_req(request)

body = parse_json_object_from_request(request)
if "user_ids" not in body:
raise SynapseError(
400, "Required parameter 'user_ids' is missing", Codes.MISSING_PARAM
)

statuses, failures = await self._account_handler.get_account_statuses(
body["user_ids"],
allow_remote=True,
)

return 200, {"account_statuses": statuses, "failures": failures}


def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
EmailPasswordRequestTokenRestServlet(hs).register(http_server)
PasswordRestServlet(hs).register(http_server)
Expand All @@ -910,3 +940,6 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ThreepidUnbindRestServlet(hs).register(http_server)
ThreepidDeleteRestServlet(hs).register(http_server)
WhoamiRestServlet(hs).register(http_server)

if hs.config.experimental.msc3720_enabled:
AccountStatusRestServlet(hs).register(http_server)
5 changes: 5 additions & 0 deletions synapse/rest/client/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
if self.config.experimental.msc3440_enabled:
response["capabilities"]["io.element.thread"] = {"enabled": True}

if self.config.experimental.msc3720_enabled:
response["capabilities"]["org.matrix.msc3720.account_status"] = {
"enabled": True,
}

return HTTPStatus.OK, response


Expand Down
Loading

0 comments on commit 250104d

Please sign in to comment.