Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support MSC4140: Delayed events (Futures) #17326

Merged
merged 86 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
044f57a
Add DB schema for delayed events
AndrewFerr Aug 6, 2024
82c5437
Support scheduling delayed events
AndrewFerr Aug 7, 2024
645e225
Support config for maximum allowed event delay
AndrewFerr Aug 6, 2024
5472588
Support updating delayed events
AndrewFerr Aug 7, 2024
14cf8ec
Support listing delayed events
AndrewFerr Aug 7, 2024
d8e3135
Restore pending delayed events on startup
AndrewFerr Aug 7, 2024
f9261b9
Cancel delayed state events on state change
AndrewFerr Aug 7, 2024
d3ea968
Prevent race conditions in delayed event updates
AndrewFerr Aug 6, 2024
c34221f
Check startup delayed state events for same state
AndrewFerr Aug 2, 2024
f2d8144
Advertise as unstable feature
AndrewFerr Aug 6, 2024
d2c9ca7
Update copyright years
AndrewFerr Aug 6, 2024
56c6d87
Add changelog
AndrewFerr Aug 6, 2024
c24c41b
Don't throw when event callback finds no event
AndrewFerr Aug 7, 2024
6e382df
Increase expected db_txn_counts
AndrewFerr Aug 7, 2024
7b79db0
Merge branch 'develop' into af/msc4140
AndrewFerr Aug 7, 2024
8b3fb49
Merge branch 'develop' into af/msc4140
AndrewFerr Aug 8, 2024
32cbacf
Validate a delayed event before scheduling it
AndrewFerr Aug 9, 2024
71e8997
Start adding unit tests for delayed events
AndrewFerr Aug 9, 2024
fafaa03
Add comments to explain rowid / identity columns
AndrewFerr Aug 15, 2024
3afa3cf
Prefix internally-used attributes with underscore
AndrewFerr Aug 15, 2024
b3d4d6c
Use "Params" instead of "Args" in docstrings
AndrewFerr Aug 15, 2024
48d8126
Add comment to explain parent delayed events
AndrewFerr Aug 15, 2024
5ce3787
Merge branch 'develop' into af/msc4140
AndrewFerr Aug 15, 2024
bee52bd
Use utility function for generating a fake ID
AndrewFerr Aug 15, 2024
6252708
Move DB error messages to debug log
AndrewFerr Aug 15, 2024
b1f74a8
Remove TODO to verify delayed event contents
AndrewFerr Aug 15, 2024
221e0af
Don't bother using a CRC for delay_ids
AndrewFerr Aug 15, 2024
08f54ca
Assert non-negative delay; allow missing delay ID
AndrewFerr Aug 15, 2024
56abbb9
Check for membership in delayed member events
AndrewFerr Aug 15, 2024
c4e80ad
Put colons after exception types in docstrings
AndrewFerr Aug 15, 2024
99e421c
Use built-in method to check for RETURNING support
AndrewFerr Aug 15, 2024
335eeb7
Properly indent comment block
AndrewFerr Aug 15, 2024
21311fb
Remove support for delayed event parents
AndrewFerr Aug 19, 2024
3478118
Fix comments
AndrewFerr Aug 20, 2024
00217f3
Reraise the error for an invalid max delay config
AndrewFerr Aug 20, 2024
90cc8b5
Don't bother handling DB key collisions
AndrewFerr Aug 20, 2024
8a65c77
Move update action value check to REST layer
AndrewFerr Aug 20, 2024
47b6e69
Move delay value check to REST layer
AndrewFerr Aug 20, 2024
e0e6802
Replace assert with a single-row update
AndrewFerr Aug 20, 2024
5672d0d
Replace rowid primary key with delay ID + user ID
AndrewFerr Aug 20, 2024
85cb72f
Rename delayed events store methods & txn descs
AndrewFerr Aug 20, 2024
974463f
Remove delayed event from DB on cancel
AndrewFerr Aug 20, 2024
05accda
Make user_localpart first column of DB key
AndrewFerr Aug 21, 2024
2a9069c
Don't handle missing delays in DB lookup
AndrewFerr Aug 21, 2024
57b7229
Replace running_since with send_ts and index it
AndrewFerr Aug 21, 2024
4dc41dc
Remove redundant delay value check
AndrewFerr Aug 21, 2024
3ce7305
Refactor max delay config
AndrewFerr Aug 28, 2024
be094e6
Refactor delayed event processing
AndrewFerr Aug 28, 2024
2aed40b
Set delayed event origin time to its send time
AndrewFerr Aug 28, 2024
235c432
Save delayed event requester's device ID
AndrewFerr Aug 28, 2024
b03312b
Merge branch 'develop' into af/msc4140
AndrewFerr Aug 28, 2024
afff231
Merge 'develop' & bump schema version
AndrewFerr Aug 29, 2024
86c0e97
Merge branch 'develop' into af/msc4140
AndrewFerr Sep 4, 2024
798c79e
Remove license headers on new files
AndrewFerr Sep 4, 2024
186e55d
Lint
AndrewFerr Sep 4, 2024
a3fbdd3
Update documentation
AndrewFerr Sep 9, 2024
ef7284f
Fix top-level comment
AndrewFerr Sep 9, 2024
92d352c
Add docstring to helper function
AndrewFerr Sep 9, 2024
0ab82f5
Fix unit tests
AndrewFerr Sep 9, 2024
9025922
Comment early match return on no delayed events
AndrewFerr Sep 9, 2024
c3ad95d
Pick a nit
AndrewFerr Sep 9, 2024
1dbbb74
Remove TODO for something that's no longer needed
AndrewFerr Sep 9, 2024
3e9f76f
Refactor DB logic for delayed event resetting
AndrewFerr Sep 9, 2024
d36c89f
Use streams to watch for state deltas
AndrewFerr Sep 9, 2024
e0225eb
Nitpick: rename inner function
AndrewFerr Sep 10, 2024
e741c56
Add/improve comments & logs
AndrewFerr Sep 10, 2024
2d79506
Lint
AndrewFerr Sep 10, 2024
e41b5a1
Put retrieved delayed events in field for GET
AndrewFerr Sep 11, 2024
94048f7
Lint imports
AndrewFerr Sep 11, 2024
8e3df61
Don't use data-modifying CTE in WITH for sqlite
AndrewFerr Sep 11, 2024
dd3c746
Remove TODO for returning transaction IDs
AndrewFerr Sep 11, 2024
a60fa7f
Use attrs classes for delayed event properties
AndrewFerr Sep 11, 2024
092793a
Restrict delayed events to a single worker
AndrewFerr Sep 13, 2024
1d75060
Reword docstring
AndrewFerr Sep 13, 2024
34ed582
On startup, wait to catch up on state changes
AndrewFerr Sep 13, 2024
a6cf11c
Don't expect to remember next_send_ts
AndrewFerr Sep 13, 2024
fb04833
Add more unit tests
AndrewFerr Sep 13, 2024
0bf03ad
Merge branch 'develop' into af/msc4140
AndrewFerr Sep 13, 2024
3860f75
Mention that GET /delayed_events supports workers
AndrewFerr Sep 13, 2024
8ee1558
Use default ts for delayed events sent on request
AndrewFerr Sep 16, 2024
cbedade
Don't use attr.asdict
AndrewFerr Sep 16, 2024
7ee57d8
Fix path regex for delayed_events updating
AndrewFerr Sep 16, 2024
10b9dee
Fix SQL query
AndrewFerr Sep 16, 2024
a723f6b
Add more unit tests
AndrewFerr Sep 16, 2024
f32cf9c
Run Complement tests
AndrewFerr Sep 16, 2024
dfde3c2
Order returned delayed events by send_ts
AndrewFerr Sep 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 227 additions & 0 deletions synapse/handlers/delayed_events.py
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2024 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
AndrewFerr marked this conversation as resolved.
Show resolved Hide resolved
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
#

import logging
from typing import TYPE_CHECKING, Dict, Optional

import attr

from twisted.internet.interfaces import IDelayedCall

from synapse.api.constants import EventTypes
from synapse.api.errors import ShadowBanError
from synapse.logging.opentracing import set_tag
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases.main.delayed_events import (
Delay,
DelayID,
EventType,
StateKey,
Timestamp,
UserLocalpart,
)
from synapse.types import JsonDict, Requester, RoomID, UserID, create_requester
from synapse.util.stringutils import random_string

if TYPE_CHECKING:
from synapse.server import HomeServer

logger = logging.getLogger(__name__)


@attr.s(slots=True, frozen=True, auto_attribs=True)
class _DelayedCallKey:
delay_id: DelayID
user_localpart: UserLocalpart

def __str__(self) -> str:
return f"{self.user_localpart}:{self.delay_id}"


class DelayedEventsHandler:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self.config = hs.config
self.clock = hs.get_clock()
self.request_ratelimiter = hs.get_request_ratelimiter()
self.event_creation_handler = hs.get_event_creation_handler()
self.room_member_handler = hs.get_room_member_handler()
AndrewFerr marked this conversation as resolved.
Show resolved Hide resolved

self._delayed_calls: Dict[_DelayedCallKey, IDelayedCall] = {}

async def add(
self,
requester: Requester,
*,
room_id: str,
event_type: str,
state_key: Optional[str],
origin_server_ts: Optional[int],
content: JsonDict,
delay: Optional[int],
parent_id: Optional[str],
AndrewFerr marked this conversation as resolved.
Show resolved Hide resolved
) -> str:
"""
Creates a new delayed event.

Params:
AndrewFerr marked this conversation as resolved.
Show resolved Hide resolved
requester: The requester of the delayed event, who will be its owner.
room_id: The room where the event should be sent.
event_type: The type of event to be sent.
state_key: The state key of the event to be sent, or None if it is not a state event.
origin_server_ts: The custom timestamp to send the event with.
If None, the timestamp will be the actual time when the event is sent.
content: The content of the event to be sent.
delay: How long (in milliseconds) to wait before automatically sending the event.
If None, the event won't be automatically sent (allowed only when parent_id is set).
parent_id: The ID of the delayed event this one is grouped with.
May only refer to a delayed event that has no parent itself.
AndrewFerr marked this conversation as resolved.
Show resolved Hide resolved

Returns:
The ID of the added delayed event.
"""
# Callers should ensure that at least one of these are set
assert delay or parent_id

await self.request_ratelimiter.ratelimit(requester)

# TODO: Validate that the event is valid before scheduling it
AndrewFerr marked this conversation as resolved.
Show resolved Hide resolved

user_localpart = UserLocalpart(requester.user.localpart)
delay_id = await self.store.add(
user_localpart=user_localpart,
current_ts=self._get_current_ts(),
room_id=RoomID.from_string(room_id),
event_type=event_type,
state_key=state_key,
origin_server_ts=origin_server_ts,
content=content,
delay=delay,
parent_id=parent_id,
)

if delay is not None:
self._schedule(delay_id, user_localpart, Delay(delay))

return delay_id

async def _send_on_timeout(
self, delay_id: DelayID, user_localpart: UserLocalpart
) -> None:
del self._delayed_calls[_DelayedCallKey(delay_id, user_localpart)]

args, removed_timeout_delay_ids = await self.store.pop_event(
delay_id, user_localpart
)

removed_timeout_delay_ids.remove(delay_id)
for timeout_delay_id in removed_timeout_delay_ids:
self._unschedule(timeout_delay_id, user_localpart)
await self._send_event(user_localpart, *args)

def _schedule(
AndrewFerr marked this conversation as resolved.
Show resolved Hide resolved
self,
delay_id: DelayID,
user_localpart: UserLocalpart,
delay: Delay,
) -> None:
"""NOTE: Should not be called with a delay_id that isn't in the DB, or with a negative delay."""
AndrewFerr marked this conversation as resolved.
Show resolved Hide resolved
delay_sec = delay / 1000

logger.info(
"Scheduling delayed event %s for local user %s to be sent in %.3fs",
delay_id,
user_localpart,
delay_sec,
)

self._delayed_calls[_DelayedCallKey(delay_id, user_localpart)] = (
self.clock.call_later(
delay_sec,
run_as_background_process,
"_send_on_timeout",
self._send_on_timeout,
delay_id,
user_localpart,
)
)

def _unschedule(self, delay_id: DelayID, user_localpart: UserLocalpart) -> None:
delayed_call = self._delayed_calls.pop(
_DelayedCallKey(delay_id, user_localpart)
)
self.clock.cancel_call_later(delayed_call)

async def _send_event(
self,
user_localpart: UserLocalpart,
room_id: RoomID,
event_type: EventType,
state_key: Optional[StateKey],
origin_server_ts: Optional[Timestamp],
content: JsonDict,
txn_id: Optional[str] = None,
) -> None:
user_id = UserID(user_localpart, self.config.server.server_name)
user_id_str = user_id.to_string()
requester = create_requester(
user_id,
is_guest=await self.store.is_guest(user_id_str),
)

try:
if state_key is not None and event_type == EventTypes.Member:
membership = content.get("membership", None)
AndrewFerr marked this conversation as resolved.
Show resolved Hide resolved
event_id, _ = await self.room_member_handler.update_membership(
requester,
target=UserID.from_string(state_key),
room_id=room_id.to_string(),
action=membership,
content=content,
origin_server_ts=origin_server_ts,
)
else:
event_dict: JsonDict = {
"type": event_type,
"content": content,
"room_id": room_id.to_string(),
"sender": user_id_str,
}

if state_key is not None:
event_dict["state_key"] = state_key

if origin_server_ts is not None:
event_dict["origin_server_ts"] = origin_server_ts

(
event,
_,
) = await self.event_creation_handler.create_and_send_nonmember_event(
requester,
event_dict,
txn_id=txn_id,
)
event_id = event.event_id
except ShadowBanError:
event_id = "$" + random_string(43)
AndrewFerr marked this conversation as resolved.
Show resolved Hide resolved

set_tag("event_id", event_id)

def _get_current_ts(self) -> Timestamp:
return Timestamp(self.clock.time_msec())
48 changes: 44 additions & 4 deletions synapse/rest/client/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ def __init__(self, hs: "HomeServer"):
self.event_creation_handler = hs.get_event_creation_handler()
self.room_member_handler = hs.get_room_member_handler()
self.message_handler = hs.get_message_handler()
self.delayed_events_handler = hs.get_delayed_events_handler()
self.auth = hs.get_auth()

def register(self, http_server: HttpServer) -> None:
Expand Down Expand Up @@ -289,6 +290,24 @@ async def on_PUT(
if requester.app_service:
origin_server_ts = parse_integer(request, "ts")

delay = parse_integer(request, "org.matrix.msc4140.delay")
parent_id = parse_string(request, "org.matrix.msc4140.parent_delay_id")
AndrewFerr marked this conversation as resolved.
Show resolved Hide resolved
if delay is not None or parent_id is not None:
delay_id = await self.delayed_events_handler.add(
requester,
room_id=room_id,
event_type=event_type,
state_key=state_key,
origin_server_ts=origin_server_ts,
content=content,
delay=delay,
parent_id=parent_id,
)

set_tag("delay_id", delay_id)
ret = {"delay_id": delay_id}
return 200, ret

try:
if event_type == EventTypes.Member:
membership = content.get("membership", None)
Expand Down Expand Up @@ -339,6 +358,7 @@ class RoomSendEventRestServlet(TransactionRestServlet):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.event_creation_handler = hs.get_event_creation_handler()
self.delayed_events_handler = hs.get_delayed_events_handler()
self.auth = hs.get_auth()

def register(self, http_server: HttpServer) -> None:
Expand All @@ -356,17 +376,37 @@ async def _do(
) -> Tuple[int, JsonDict]:
content = parse_json_object_from_request(request)

origin_server_ts = None
if requester.app_service:
origin_server_ts = parse_integer(request, "ts")

delay = parse_integer(request, "org.matrix.msc4140.delay")
parent_id = parse_string(request, "org.matrix.msc4140.parent_delay_id")
if delay is not None or parent_id is not None:
delay_id = await self.delayed_events_handler.add(
requester,
room_id=room_id,
event_type=event_type,
state_key=None,
origin_server_ts=origin_server_ts,
content=content,
delay=delay,
parent_id=parent_id,
)

set_tag("delay_id", delay_id)
ret = {"delay_id": delay_id}
return 200, ret

event_dict: JsonDict = {
"type": event_type,
"content": content,
"room_id": room_id,
"sender": requester.user.to_string(),
}

if requester.app_service:
origin_server_ts = parse_integer(request, "ts")
if origin_server_ts is not None:
event_dict["origin_server_ts"] = origin_server_ts
if origin_server_ts is not None:
event_dict["origin_server_ts"] = origin_server_ts

try:
(
Expand Down
6 changes: 6 additions & 0 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
from synapse.handlers.auth import AuthHandler, PasswordAuthProvider
from synapse.handlers.cas import CasHandler
from synapse.handlers.deactivate_account import DeactivateAccountHandler
from synapse.handlers.delayed_events import DelayedEventsHandler
from synapse.handlers.device import DeviceHandler, DeviceWorkerHandler
from synapse.handlers.devicemessage import DeviceMessageHandler
from synapse.handlers.directory import DirectoryHandler
Expand Down Expand Up @@ -249,6 +250,7 @@ class HomeServer(metaclass=abc.ABCMeta):
"account_validity",
"auth",
"deactivate_account",
"delayed_events",
"message",
"pagination",
"profile",
Expand Down Expand Up @@ -941,3 +943,7 @@ def get_worker_locks_handler(self) -> WorkerLocksHandler:
@cache_in_self
def get_task_scheduler(self) -> TaskScheduler:
return TaskScheduler(self)

@cache_in_self
def get_delayed_events_handler(self) -> DelayedEventsHandler:
return DelayedEventsHandler(self)
2 changes: 2 additions & 0 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from .cache import CacheInvalidationWorkerStore
from .censor_events import CensorEventsStore
from .client_ips import ClientIpWorkerStore
from .delayed_events import DelayedEventsStore
from .deviceinbox import DeviceInboxStore
from .devices import DeviceStore
from .directory import DirectoryStore
Expand Down Expand Up @@ -156,6 +157,7 @@ class DataStore(
LockStore,
SessionStore,
TaskSchedulerWorkerStore,
DelayedEventsStore,
):
def __init__(
self,
Expand Down
Loading