Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Sliding Sync /sync/e2ee endpoint for To-Device messages #17167

Merged
merged 39 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
f9e6e53
Configurable /sync/e2ee endpoint
MadLittleMods May 6, 2024
1e05a05
Add Sliding Sync `/sync/e2ee` endpoint for To-Device messages
MadLittleMods May 7, 2024
5e925f6
Share tests with test_sendtodevice
MadLittleMods May 8, 2024
69f9143
Comment on tests
MadLittleMods May 8, 2024
d4ff933
Prefer Sync v2 vs Sliding Sync distinction
MadLittleMods May 8, 2024
371ec57
Fix wait_for_sync_for_user in tests
MadLittleMods May 8, 2024
06d12e5
Ugly overloads
MadLittleMods May 8, 2024
b8b70ba
Fix lint
MadLittleMods May 8, 2024
c60a4f8
Add changelog
MadLittleMods May 8, 2024
10ffae6
Shared logic for `get_sync_result_builder()`
MadLittleMods May 8, 2024
6bf4896
Try calculate more
MadLittleMods May 9, 2024
8871dac
Share tests using inheritance
MadLittleMods May 9, 2024
0892283
Add comments docs
MadLittleMods May 9, 2024
adb7e20
Consolidate device_lists /sync tests
MadLittleMods May 9, 2024
f098355
Add `device_one_time_keys_count` tests
MadLittleMods May 9, 2024
6b7cfd7
Add tests for `device_unused_fallback_key_types` in `/sync`
MadLittleMods May 9, 2024
b9e5379
Describe test
MadLittleMods May 9, 2024
9bdfa16
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-e2ee
MadLittleMods May 16, 2024
7331401
Lint
MadLittleMods May 16, 2024
b23abca
Fix test inheritance
MadLittleMods May 16, 2024
821a1b3
Add missing field to docstring
MadLittleMods May 16, 2024
35ca937
Format docstring
MadLittleMods May 16, 2024
4ad7a8b
No need to change this formatting from develop
MadLittleMods May 16, 2024
3092ab5
Calculate room derived membership info for device_lists
MadLittleMods May 20, 2024
3539abe
Membership in timeline for better derived info
MadLittleMods May 20, 2024
5f194f9
Exclude application services
MadLittleMods May 20, 2024
02cecfa
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-e2ee
MadLittleMods May 20, 2024
f6122ff
Use `client_patterns()` for endpoint URL
MadLittleMods May 21, 2024
2f112e7
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-e2ee
MadLittleMods May 21, 2024
c2221bb
Lint
MadLittleMods May 21, 2024
717b160
Adjust wording, add todo
MadLittleMods May 21, 2024
514aba5
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-e2ee
MadLittleMods May 22, 2024
9749795
Update filter to be more precise and avoid more work
MadLittleMods May 22, 2024
06ac1da
Restore copyright header
MadLittleMods May 22, 2024
3da6bc1
Use `@parameterized_class`
MadLittleMods May 22, 2024
d4b41aa
Fix lints
MadLittleMods May 22, 2024
6606ac1
Add docstring for parametized attributes
MadLittleMods May 23, 2024
ab0b844
Add actual typing for params (not just docstrings)
MadLittleMods May 23, 2024
a482545
Fix test after removing type ignore
MadLittleMods May 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
# MSC3391: Removing account data.
self.msc3391_enabled = experimental.get("msc3391_enabled", False)

# MSC3575 (Sliding Sync API endpoints)
self.msc3575_enabled: bool = experimental.get("msc3575_enabled", False)

# MSC3773: Thread notifications
self.msc3773_enabled: bool = experimental.get("msc3773_enabled", False)

Expand Down
110 changes: 96 additions & 14 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#
import itertools
import logging
from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
Expand Down Expand Up @@ -112,12 +113,21 @@
SyncRequestKey = Tuple[Any, ...]


class SyncType(Enum):
"""Enum for specifying the type of sync request."""

# These string values are semantically significant and are used in the the metrics
INITIAL_SYNC = "initial_sync"
FULL_STATE_SYNC = "full_state_sync"
INCREMENTAL_SYNC = "incremental_sync"
E2EE_SYNC = "e2ee_sync"


@attr.s(slots=True, frozen=True, auto_attribs=True)
class SyncConfig:
user: UserID
filter_collection: FilterCollection
is_guest: bool
request_key: SyncRequestKey
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
device_id: Optional[str]


Expand Down Expand Up @@ -263,6 +273,15 @@ def __bool__(self) -> bool:
)


@attr.s(slots=True, frozen=True, auto_attribs=True)
class E2eeSyncResult:
next_batch: StreamToken
to_device: List[JsonDict]
# device_lists: DeviceListUpdates
# device_one_time_keys_count: JsonMapping
# device_unused_fallback_key_types: List[str]


class SyncHandler:
def __init__(self, hs: "HomeServer"):
self.hs_config = hs.config
Expand Down Expand Up @@ -309,13 +328,18 @@ async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SyncConfig,
sync_type: SyncType,
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
request_key: SyncRequestKey,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> SyncResult:
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.

Args:
request_key: The key to use for caching the response.
"""
# If the user is not part of the mau group, then check that limits have
# not been exceeded (if not part of the group by this point, almost certain
Expand All @@ -324,9 +348,10 @@ async def wait_for_sync_for_user(
await self.auth_blocking.check_auth_blocking(requester=requester)

res = await self.response_cache.wrap(
sync_config.request_key,
request_key,
self._wait_for_sync_for_user,
sync_config,
sync_type,
since_token,
timeout,
full_state,
Expand All @@ -338,6 +363,7 @@ async def wait_for_sync_for_user(
async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
sync_type: SyncType,
since_token: Optional[StreamToken],
timeout: int,
full_state: bool,
Expand All @@ -356,13 +382,6 @@ async def _wait_for_sync_for_user(
Computing the body of the response begins in the next method,
`current_sync_for_user`.
"""
if since_token is None:
sync_type = "initial_sync"
elif full_state:
sync_type = "full_state_sync"
else:
sync_type = "incremental_sync"

context = current_context()
if context:
context.tag = sync_type
Expand All @@ -384,14 +403,16 @@ async def _wait_for_sync_for_user(
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result: SyncResult = await self.current_sync_for_user(
sync_config, since_token, full_state=full_state
sync_config, sync_type, since_token, full_state=full_state
)
else:
# Otherwise, we wait for something to happen and report it to the user.
async def current_sync_callback(
before_token: StreamToken, after_token: StreamToken
) -> SyncResult:
return await self.current_sync_for_user(sync_config, since_token)
return await self.current_sync_for_user(
sync_config, sync_type, since_token
)

result = await self.notifier.wait_for_events(
sync_config.user.to_string(),
Expand Down Expand Up @@ -423,6 +444,7 @@ async def current_sync_callback(
async def current_sync_for_user(
self,
sync_config: SyncConfig,
sync_type: SyncType,
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> SyncResult:
Expand All @@ -434,9 +456,25 @@ async def current_sync_for_user(
"""
with start_active_span("sync.current_sync_for_user"):
log_kv({"since_token": since_token})
sync_result = await self.generate_sync_result(
sync_config, since_token, full_state
)

# Go through the `/sync` v2 path
if sync_type in {
SyncType.INITIAL_SYNC,
SyncType.FULL_STATE_SYNC,
SyncType.INCREMENTAL_SYNC,
}:
sync_result = await self.generate_sync_result(
sync_config, since_token, full_state
)
# Go through the MSC3575 Sliding Sync `/sync/e2ee` path
elif sync_type == SyncType.E2EE_SYNC:
sync_result = await self.generate_e2ee_sync_result(
sync_config, since_token
)
else:
raise Exception(
f"Unknown sync_type (this is a Synapse problem): {sync_type}"
)

set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
return sync_result
Expand Down Expand Up @@ -1751,6 +1789,50 @@ async def generate_sync_result(
next_batch=sync_result_builder.now_token,
)

async def generate_e2ee_sync_result(
self,
sync_config: SyncConfig,
since_token: Optional[StreamToken] = None,
) -> SyncResult:
"""Generates the response body of a MSC3575 Sliding Sync `/sync/e2ee` result."""

user_id = sync_config.user.to_string()
# TODO: Should we exclude app services here? There could be an argument to allow
# them since the appservice doesn't have to make a massive initial sync.
# (related to https://github.com/matrix-org/matrix-doc/issues/1144)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

# NB: The now_token gets changed by some of the generate_sync_* methods,
# this is due to some of the underlying streams not supporting the ability
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
now_token = self.event_sources.get_current_token()
log_kv({"now_token": now_token})

joined_room_ids = await self.store.get_rooms_for_user(user_id)

sync_result_builder = SyncResultBuilder(
sync_config,
full_state=False,
since_token=since_token,
now_token=now_token,
joined_room_ids=joined_room_ids,
# Dummy values to fill out `SyncResultBuilder`
excluded_room_ids=frozenset({}),
forced_newly_joined_room_ids=frozenset({}),
membership_change_events=frozenset({}),
)

await self._generate_sync_entry_for_to_device(sync_result_builder)

return E2eeSyncResult(
to_device=sync_result_builder.to_device,
# to_device: List[JsonDict]
# device_lists: DeviceListUpdates
# device_one_time_keys_count: JsonMapping
# device_unused_fallback_key_types: List[str]
next_batch=sync_result_builder.now_token,
)

@measure_func("_generate_sync_entry_for_device_list")
async def _generate_sync_entry_for_device_list(
self,
Expand Down
118 changes: 117 additions & 1 deletion synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#
import itertools
import logging
import re
from collections import defaultdict
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union

Expand All @@ -40,6 +41,7 @@
KnockedSyncResult,
SyncConfig,
SyncResult,
SyncType,
)
from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
Expand Down Expand Up @@ -197,14 +199,20 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
user=user,
filter_collection=filter_collection,
is_guest=requester.is_guest,
request_key=request_key,
device_id=device_id,
)

since_token = None
if since is not None:
since_token = await StreamToken.from_string(self.store, since)

if since_token is None:
sync_type = SyncType.INITIAL_SYNC
elif full_state:
sync_type = SyncType.FULL_STATE_SYNC
else:
sync_type = SyncType.INCREMENTAL_SYNC

# send any outstanding server notices to the user.
await self._server_notices_sender.on_user_syncing(user.to_string())

Expand All @@ -220,6 +228,8 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
sync_result = await self.sync_handler.wait_for_sync_for_user(
requester,
sync_config,
sync_type,
request_key,
since_token=since_token,
timeout=timeout,
full_state=full_state,
Expand Down Expand Up @@ -553,5 +563,111 @@ async def encode_room(
return result


class SlidingSyncE2eeRestServlet(RestServlet):
"""
API endpoint for MSC3575 Sliding Sync `/sync/e2ee`. This is being introduced as part
of Sliding Sync but doesn't have any sliding window component. It's just a way to
get E2EE events without having to sit through a initial sync. And not have
encryption events backed up by the main sync response.

GET parameters::
timeout(int): How long to wait for new events in milliseconds.
since(batch_token): Batch token when asking for incremental deltas.

Response JSON::
{
"next_batch": // batch token for the next /sync
"to_device": {
// list of to-device events
"events": [
{
"content: { "algorithm": "m.olm.v1.curve25519-aes-sha2", "ciphertext": { ... }, "org.matrix.msgid": "abcd", "session_id": "abcd" },
"type": "m.room.encrypted",
"sender": "@alice:example.com",
}
// ...
]
},
"device_one_time_keys_count": {
"signed_curve25519": 50
},
"device_lists": {
"changed": ["@alice:example.com"],
"left": ["@bob:example.com"]
},
"device_unused_fallback_key_types": [
"signed_curve25519"
]
}
"""

PATTERNS = (re.compile("^/_matrix/client/unstable/org.matrix.msc3575/sync/e2ee$"),)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self, hs: "HomeServer"):
super().__init__()
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
self.filtering = hs.get_filtering()
self.sync_handler = hs.get_sync_handler()

async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request, allow_guest=True)
user = requester.user
device_id = requester.device_id

timeout = parse_integer(request, "timeout", default=0)
since = parse_string(request, "since")

sync_config = SyncConfig(
user=user,
# Filtering doesn't apply to this endpoint so just use a default to fill in
# the SyncConfig
filter_collection=self.filtering.DEFAULT_FILTER_COLLECTION,
is_guest=requester.is_guest,
device_id=device_id,
)
sync_type = SyncType.E2EE_SYNC

since_token = None
if since is not None:
since_token = await StreamToken.from_string(self.store, since)

# Request cache key
request_key = (
sync_type,
user,
timeout,
since,
)

# Gather data for the response
sync_result = await self.sync_handler.wait_for_sync_for_user(
requester,
sync_config,
sync_type,
request_key,
since_token=since_token,
timeout=timeout,
full_state=False,
)

# The client may have disconnected by now; don't bother to serialize the
# response if so.
if request._disconnected:
logger.info("Client has disconnected; not serializing response.")
return 200, {}

response: JsonDict = defaultdict(dict)
response["next_batch"] = await sync_result.next_batch.to_string(self.store)

if sync_result.to_device:
response["to_device"] = {"events": sync_result.to_device}

return 200, response


def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
SyncRestServlet(hs).register(http_server)

if hs.config.experimental.msc3575_enabled:
SlidingSyncE2eeRestServlet(hs).register(http_server)
Loading
Loading