Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refactor recent events func to use pagination func #3195

Merged
merged 2 commits into from
May 9, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 27 additions & 51 deletions synapse/storage/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from synapse.storage._base import SQLBaseStore
from synapse.storage.events import EventsWorkerStore

from synapse.util.caches.descriptors import cached
from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
Expand Down Expand Up @@ -347,9 +346,9 @@ def f(txn):
defer.returnValue(ret)

@defer.inlineCallbacks
def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
def get_recent_events_for_room(self, room_id, limit, end_token):
rows, token = yield self.get_recent_event_ids_for_room(
room_id, limit, end_token, from_token
room_id, limit, end_token,
)

logger.debug("stream before")
Expand All @@ -363,60 +362,37 @@ def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None)

defer.returnValue((events, token))

@cached(num_args=4)
def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None):
end_token = RoomStreamToken.parse_stream_token(end_token)

if from_token is None:
sql = (
"SELECT stream_ordering, topological_ordering, event_id"
" FROM events"
" WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?"
" ORDER BY topological_ordering DESC, stream_ordering DESC"
" LIMIT ?"
)
else:
from_token = RoomStreamToken.parse_stream_token(from_token)
sql = (
"SELECT stream_ordering, topological_ordering, event_id"
" FROM events"
" WHERE room_id = ? AND stream_ordering > ?"
" AND stream_ordering <= ? AND outlier = ?"
" ORDER BY topological_ordering DESC, stream_ordering DESC"
" LIMIT ?"
)

def get_recent_events_for_room_txn(txn):
if from_token is None:
txn.execute(sql, (room_id, end_token.stream, False, limit,))
else:
txn.execute(sql, (
room_id, from_token.stream, end_token.stream, False, limit
))
@defer.inlineCallbacks
def get_recent_event_ids_for_room(self, room_id, limit, end_token):
"""Get the most recent events in the room in topological ordering.

rows = self.cursor_to_dict(txn)
Args:
room_id (str)
limit (int)
end_token (str): The stream token representing now.

rows.reverse() # As we selected with reverse ordering
Returns:
Deferred[tuple[list[dict], tuple[str, str]]]: Returns a list of
dicts (which include event_ids, etc), and a tuple for
`(start_token, end_token)` representing the range of rows
returned.
The returned events are in ascending order.
"""
# Allow a zero limit here, and no-op.
if limit == 0:
defer.returnValue(([], (end_token, end_token)))

if rows:
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# since we are going backwards so we subtract one from the
# stream part.
topo = rows[0]["topological_ordering"]
toke = rows[0]["stream_ordering"] - 1
start_token = str(RoomStreamToken(topo, toke))
end_token = RoomStreamToken.parse_stream_token(end_token)

token = (start_token, str(end_token))
else:
token = (str(end_token), str(end_token))
rows, token = yield self.runInteraction(
"get_recent_event_ids_for_room", self._paginate_room_events_txn,
room_id, from_token=end_token, limit=limit,
)

return rows, token
# We want to return the results in ascending order.
rows.reverse()

return self.runInteraction(
"get_recent_events_for_room", get_recent_events_for_room_txn
)
defer.returnValue((rows, (token, str(end_token))))

def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
"""Gets details of the first event in a room at or after a stream ordering
Expand Down