Skip to content

Commit

Permalink
Optimise notifier (#17765)
Browse files Browse the repository at this point in the history
The notifier is quite inefficient when it has to wake up many user
streams all at once

From a silly benchmark this takes the time to notify 1M user streams
from ~30s to ~5s
  • Loading branch information
erikjohnston authored Sep 30, 2024
1 parent ece66ba commit 93889eb
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 35 deletions.
1 change: 1 addition & 0 deletions changelog.d/17765.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Increase performance of the notifier when there are many syncing users.
46 changes: 21 additions & 25 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
)
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.metrics import Measure
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_client

if TYPE_CHECKING:
Expand Down Expand Up @@ -120,14 +121,13 @@ def __init__(
):
self.user_id = user_id
self.rooms = set(rooms)
self.current_token = current_token

# The last token for which we should wake up any streams that have a
# token that comes before it. This gets updated every time we get poked.
# We start it at the current token since if we get any streams
# that have a token from before we have no idea whether they should be
# woken up or not, so lets just wake them up.
self.last_notified_token = current_token
self.current_token = current_token
self.last_notified_ms = time_now_ms

self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred(
Expand All @@ -136,33 +136,19 @@ def __init__(

def notify(
self,
stream_key: StreamKeyType,
stream_id: Union[int, RoomStreamToken, MultiWriterStreamToken],
current_token: StreamToken,
time_now_ms: int,
) -> None:
"""Notify any listeners for this user of a new event from an
event source.
Args:
stream_key: The stream the event came from.
stream_id: The new id for the stream the event came from.
current_token: The new current token.
time_now_ms: The current time in milliseconds.
"""
self.current_token = self.current_token.copy_and_advance(stream_key, stream_id)
self.last_notified_token = self.current_token
self.current_token = current_token
self.last_notified_ms = time_now_ms
notify_deferred = self.notify_deferred

log_kv(
{
"notify": self.user_id,
"stream": stream_key,
"stream_id": stream_id,
"listeners": self.count_listeners(),
}
)

users_woken_by_stream_counter.labels(stream_key).inc()

with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred())
notify_deferred.callback(self.current_token)
Expand Down Expand Up @@ -191,7 +177,7 @@ def new_listener(self, token: StreamToken) -> _NotificationListener:
"""
# Immediately wake up stream if something has already since happened
# since their last token.
if self.last_notified_token != token:
if self.current_token != token:
return _NotificationListener(defer.succeed(self.current_token))
else:
return _NotificationListener(self.notify_deferred.observe())
Expand Down Expand Up @@ -342,14 +328,17 @@ async def on_un_partial_stated_room(
# Wake up all related user stream notifiers
user_streams = self.room_to_user_streams.get(room_id, set())
time_now_ms = self.clock.time_msec()
current_token = self.event_sources.get_current_token()
for user_stream in user_streams:
try:
user_stream.notify(
StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms
)
user_stream.notify(current_token, time_now_ms)
except Exception:
logger.exception("Failed to notify listener")

users_woken_by_stream_counter.labels(StreamKeyType.UN_PARTIAL_STATED_ROOMS).inc(
len(user_streams)
)

# Poke the replication so that other workers also see the write to
# the un-partial-stated rooms stream.
self.notify_replication()
Expand Down Expand Up @@ -519,12 +508,16 @@ def on_new_event(
rooms = rooms or []

with Measure(self.clock, "on_new_event"):
user_streams = set()
user_streams: Set[_NotifierUserStream] = set()

log_kv(
{
"waking_up_explicit_users": len(users),
"waking_up_explicit_rooms": len(rooms),
"users": shortstr(users),
"rooms": shortstr(rooms),
"stream": stream_key,
"stream_id": new_token,
}
)

Expand All @@ -544,12 +537,15 @@ def on_new_event(
)

time_now_ms = self.clock.time_msec()
current_token = self.event_sources.get_current_token()
for user_stream in user_streams:
try:
user_stream.notify(stream_key, new_token, time_now_ms)
user_stream.notify(current_token, time_now_ms)
except Exception:
logger.exception("Failed to notify listener")

users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))

self.notify_replication()

# Notify appservices.
Expand Down
31 changes: 21 additions & 10 deletions tests/rest/client/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,22 +282,33 @@ def test_sync_backwards_typing(self) -> None:
self.assertEqual(200, channel.code)
next_batch = channel.json_body["next_batch"]

# This should time out! But it does not, because our stream token is
# ahead, and therefore it's saying the typing (that we've actually
# already seen) is new, since it's got a token above our new, now-reset
# stream token.
channel = self.make_request("GET", sync_url % (access_token, next_batch))
self.assertEqual(200, channel.code)
next_batch = channel.json_body["next_batch"]

# Clear the typing information, so that it doesn't think everything is
# in the future.
# in the future. This happens automatically when the typing stream
# resets.
typing._reset()

# Now it SHOULD fail as it never completes!
# Nothing new, so we time out.
with self.assertRaises(TimedOutException):
self.make_request("GET", sync_url % (access_token, next_batch))

# Sync and start typing again.
sync_channel = self.make_request(
"GET", sync_url % (access_token, next_batch), await_result=False
)
self.assertFalse(sync_channel.is_finished())

channel = self.make_request(
"PUT",
typing_url % (room, other_user_id, other_access_token),
b'{"typing": true, "timeout": 30000}',
)
self.assertEqual(200, channel.code)

# Sync should now return.
sync_channel.await_result()
self.assertEqual(200, sync_channel.code)
next_batch = sync_channel.json_body["next_batch"]


class SyncKnockTestCase(KnockingStrippedStateEventHelperMixin):
servlets = [
Expand Down

0 comments on commit 93889eb

Please sign in to comment.