Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise notifier mk2 #17766

Merged
merged 8 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17766.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Increase performance of the notifier when there are many syncing users.
96 changes: 61 additions & 35 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from prometheus_client import Counter

from twisted.internet import defer
from twisted.internet.defer import Deferred

from synapse.api.constants import EduTypes, EventTypes, HistoryVisibility, Membership
from synapse.api.errors import AuthError
Expand All @@ -52,6 +53,7 @@
from synapse.metrics import LaterGauge
from synapse.streams.config import PaginationConfig
from synapse.types import (
ISynapseReactor,
JsonDict,
MultiWriterStreamToken,
PersistedEventPosition,
Expand All @@ -61,7 +63,9 @@
StreamToken,
UserID,
)
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.async_helpers import (
timeout_deferred,
)
from synapse.util.metrics import Measure
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_client
Expand Down Expand Up @@ -90,18 +94,6 @@ def count(func: Callable[[T], bool], it: Iterable[T]) -> int:
return n


class _NotificationListener:
"""This represents a single client connection to the events stream.
The events stream handler will have yielded to the deferred, so to
notify the handler it is sufficient to resolve the deferred.
"""

__slots__ = ["deferred"]

def __init__(self, deferred: "defer.Deferred"):
self.deferred = deferred


class _NotifierUserStream:
"""This represents a user connected to the event stream.
It tracks the most recent stream token for that user.
Expand All @@ -114,11 +106,13 @@ class _NotifierUserStream:

def __init__(
self,
reactor: ISynapseReactor,
user_id: str,
rooms: StrCollection,
current_token: StreamToken,
time_now_ms: int,
):
self.reactor = reactor
self.user_id = user_id
self.rooms = set(rooms)

Expand All @@ -130,28 +124,31 @@ def __init__(
self.current_token = current_token
self.last_notified_ms = time_now_ms

self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred(
defer.Deferred()
)
# Set of listeners that we need to wake up when there has been a change.
self.listeners: Set[Deferred[StreamToken]] = set()

def notify(
def update_and_fetch_deferreds(
self,
current_token: StreamToken,
time_now_ms: int,
) -> None:
"""Notify any listeners for this user of a new event from an
event source.
) -> Collection["Deferred[StreamToken]"]:
"""Update the stream for this user because of a new event from an
event source, and return the set of deferreds to wake up.

Args:
current_token: The new current token.
time_now_ms: The current time in milliseconds.

Returns:
The set of deferreds that need to be called.
"""
self.current_token = current_token
self.last_notified_ms = time_now_ms
notify_deferred = self.notify_deferred

with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred())
notify_deferred.callback(self.current_token)
listeners = self.listeners
self.listeners = set()

return listeners

def remove(self, notifier: "Notifier") -> None:
"""Remove this listener from all the indexes in the Notifier
Expand All @@ -165,9 +162,9 @@ def remove(self, notifier: "Notifier") -> None:
notifier.user_to_user_stream.pop(self.user_id)

def count_listeners(self) -> int:
return len(self.notify_deferred.observers())
return len(self.listeners)

def new_listener(self, token: StreamToken) -> _NotificationListener:
def new_listener(self, token: StreamToken) -> "Deferred[StreamToken]":
"""Returns a deferred that is resolved when there is a new token
greater than the given token.

Expand All @@ -177,10 +174,17 @@ def new_listener(self, token: StreamToken) -> _NotificationListener:
"""
# Immediately wake up stream if something has already since happened
# since their last token.
if self.current_token != token:
return _NotificationListener(defer.succeed(self.current_token))
else:
return _NotificationListener(self.notify_deferred.observe())
if token != self.current_token:
return defer.succeed(self.current_token)

# Create a new deferred and add it to the set of listeners. We add a
# cancel handler to remove it from the set again, to handle timeouts.
deferred: "Deferred[StreamToken]" = Deferred(
canceller=lambda d: self.listeners.discard(d)
)
self.listeners.add(deferred)

return deferred


@attr.s(slots=True, frozen=True, auto_attribs=True)
Expand Down Expand Up @@ -233,6 +237,7 @@ def __init__(self, hs: "HomeServer"):
# List of callbacks to be notified when a lock is released
self._lock_released_callback: List[Callable[[str, str, str], None]] = []

self.reactor = hs.get_reactor()
self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
self._pusher_pool = hs.get_pusherpool()
Expand Down Expand Up @@ -329,12 +334,20 @@ async def on_un_partial_stated_room(
user_streams = self.room_to_user_streams.get(room_id, set())
time_now_ms = self.clock.time_msec()
current_token = self.event_sources.get_current_token()

listeners: List["Deferred[StreamToken]"] = []
for user_stream in user_streams:
try:
user_stream.notify(current_token, time_now_ms)
listeners.extend(
user_stream.update_and_fetch_deferreds(current_token, time_now_ms)
)
except Exception:
logger.exception("Failed to notify listener")

with PreserveLoggingContext():
for listener in listeners:
listener.callback(current_token)

users_woken_by_stream_counter.labels(StreamKeyType.UN_PARTIAL_STATED_ROOMS).inc(
len(user_streams)
)
Expand Down Expand Up @@ -538,12 +551,24 @@ def on_new_event(

time_now_ms = self.clock.time_msec()
current_token = self.event_sources.get_current_token()
listeners: List["Deferred[StreamToken]"] = []
for user_stream in user_streams:
try:
user_stream.notify(current_token, time_now_ms)
listeners.extend(
user_stream.update_and_fetch_deferreds(
current_token, time_now_ms
)
)
except Exception:
logger.exception("Failed to notify listener")

# We resolve all these deferreds in one go so that we only need to
# call `PreserveLoggingContext` once, as it has a bunch of overhead
# (to calculate performance stats)
with PreserveLoggingContext():
for listener in listeners:
listener.callback(current_token)

users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))

self.notify_replication()
Expand Down Expand Up @@ -582,6 +607,7 @@ async def wait_for_events(
if room_ids is None:
room_ids = await self.store.get_rooms_for_user(user_id)
user_stream = _NotifierUserStream(
reactor=self.reactor,
user_id=user_id,
rooms=room_ids,
current_token=current_token,
Expand All @@ -604,8 +630,8 @@ async def wait_for_events(
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
listener.deferred = timeout_deferred(
listener.deferred,
listener = timeout_deferred(
listener,
(end_time - now) / 1000.0,
self.hs.get_reactor(),
)
Expand All @@ -618,7 +644,7 @@ async def wait_for_events(
)

with PreserveLoggingContext():
await listener.deferred
await listener

log_kv(
{
Expand Down
Loading