Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Remove 'StreamToken.is_after'
Browse files Browse the repository at this point in the history
It involves comparing two RoomStreamTokens, which is an undefined
operation.
  • Loading branch information
erikjohnston committed Sep 29, 2020
1 parent ef6cb36 commit d68a91d
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 34 deletions.
3 changes: 2 additions & 1 deletion synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ async def _room_initial_sync_parted(
if limit is None:
limit = 10

stream_token = await self.store.get_stream_token_for_event(member_event_id)
leave_position = await self.store.get_position_for_event(member_event_id)
stream_token = leave_position.to_room_stream_token()

messages, token = await self.store.get_recent_events_for_room(
room_id, limit=limit, end_token=stream_token
Expand Down
18 changes: 13 additions & 5 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1595,16 +1595,24 @@ async def _get_rooms_changed(

if leave_events:
leave_event = leave_events[-1]
leave_stream_token = await self.store.get_stream_token_for_event(
leave_position = await self.store.get_position_for_event(
leave_event.event_id
)
leave_token = since_token.copy_and_replace(
"room_key", leave_stream_token
)

if since_token and since_token.is_after(leave_token):
# If the leave event happened before the since token then we
# bail.
if since_token and not leave_position.persisted_after(
since_token.room_key
):
continue

# We can safely convert the position of the leave event into a
# stream token as it'll only be used in the context of this
# room. (c.f. the docstring of `to_room_stream_token`).
leave_token = since_token.copy_and_replace(
"room_key", leave_position.to_room_stream_token()
)

# If this is an out of band message, like a remote invite
# rejection, we include it in the recents batch. Otherwise, we
# let _load_filtered_recents handle fetching the correct
Expand Down
4 changes: 2 additions & 2 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def new_listener(self, token: StreamToken) -> _NotificationListener:
"""
# Immediately wake up stream if something has already since happened
# since their last token.
if self.last_notified_token.is_after(token):
if self.last_notified_token != token:
return _NotificationListener(defer.succeed(self.current_token))
else:
return _NotificationListener(self.notify_deferred.observe())
Expand Down Expand Up @@ -470,7 +470,7 @@ async def get_events_for(
async def check_for_updates(
before_token: StreamToken, after_token: StreamToken
) -> EventStreamResult:
if not after_token.is_after(before_token):
if after_token == before_token:
return EventStreamResult([], (from_token, from_token))

events = [] # type: List[EventBase]
Expand Down
25 changes: 13 additions & 12 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
- topological tokems: "t%d-%d", where the integers map to the topological
and stream ordering columns respectively.
"""

import abc
import logging
from collections import namedtuple
Expand All @@ -54,7 +53,7 @@
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
from synapse.types import Collection, RoomStreamToken
from synapse.types import Collection, PersistedEventPosition, RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache

if TYPE_CHECKING:
Expand Down Expand Up @@ -614,17 +613,19 @@ def get_stream_id_for_event_txn(
allow_none=allow_none,
)

async def get_stream_token_for_event(self, event_id: str) -> RoomStreamToken:
"""The stream token for an event
Args:
event_id: The id of the event to look up a stream token for.
Raises:
StoreError if the event wasn't in the database.
Returns:
A stream token.
async def get_position_for_event(self, event_id: str) -> PersistedEventPosition:
"""Get the persisted position for an event
"""
stream_id = await self.get_stream_id_for_event(event_id)
return RoomStreamToken(None, stream_id)
row = await self.db_pool.simple_select_one(
table="events",
keyvalues={"event_id": event_id},
retcols=("stream_ordering", "instance_name"),
desc="get_position_for_event",
)

return PersistedEventPosition(
row["instance_name"] or "master", row["stream_ordering"]
)

async def get_topological_token_for_event(self, event_id: str) -> RoomStreamToken:
"""The stream token for an event
Expand Down
26 changes: 12 additions & 14 deletions synapse/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,20 +458,6 @@ def to_string(self):
def room_stream_id(self):
return self.room_key.stream

def is_after(self, other):
"""Does this token contain events that the other doesn't?"""
return (
(other.room_stream_id < self.room_stream_id)
or (int(other.presence_key) < int(self.presence_key))
or (int(other.typing_key) < int(self.typing_key))
or (int(other.receipt_key) < int(self.receipt_key))
or (int(other.account_data_key) < int(self.account_data_key))
or (int(other.push_rules_key) < int(self.push_rules_key))
or (int(other.to_device_key) < int(self.to_device_key))
or (int(other.device_list_key) < int(self.device_list_key))
or (int(other.groups_key) < int(self.groups_key))
)

def copy_and_advance(self, key, new_value) -> "StreamToken":
"""Advance the given key in the token to a new value if and only if the
new value is after the old value.
Expand Down Expand Up @@ -509,6 +495,18 @@ class PersistedEventPosition:
def persisted_after(self, token: RoomStreamToken) -> bool:
return token.stream < self.stream

def to_room_stream_token(self) -> RoomStreamToken:
"""Converts the position to a room stream token such that events
persisted in the same room after this position will be after the
returned `RoomStreamToken`.
Note: no guarentees are made about ordering w.r.t. events in other
rooms.
"""
# Doing the naive thing satisfies the desired properties described in
# the docstring.
return RoomStreamToken(None, self.stream)


class ThirdPartyInstanceID(
namedtuple("ThirdPartyInstanceID", ("appservice_id", "network_id"))
Expand Down

0 comments on commit d68a91d

Please sign in to comment.