Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3196 from matrix-org/erikj/pagination_return
Browse files Browse the repository at this point in the history
 Refactor pagination DB API to return concrete type
  • Loading branch information
erikjohnston authored May 9, 2018
2 parents 0461ef0 + 75552d2 commit 1e5280b
Showing 1 changed file with 49 additions and 28 deletions.
77 changes: 49 additions & 28 deletions synapse/storage/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import logging

from six.moves import range
from collections import namedtuple


logger = logging.getLogger(__name__)
Expand All @@ -59,6 +60,12 @@
_TOPOLOGICAL_TOKEN = "topological"


# Used as return values for pagination APIs
_EventDictReturn = namedtuple("_EventDictReturn", (
"event_id", "topological_ordering", "stream_ordering",
))


def lower_bound(token, engine, inclusive=False):
inclusive = "=" if inclusive else ""
if token.topological is None:
Expand Down Expand Up @@ -256,24 +263,28 @@ def f(txn):
" ORDER BY stream_ordering %s LIMIT ?"
) % (order,)
txn.execute(sql, (room_id, from_id, to_id, limit))

rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
else:
sql = (
"SELECT event_id, stream_ordering FROM events WHERE"
"SELECT event_id, topological_ordering, stream_ordering"
" FROM events"
" WHERE"
" room_id = ?"
" AND not outlier"
" AND stream_ordering <= ?"
" ORDER BY topological_ordering %s, stream_ordering %s LIMIT ?"
) % (order, order,)
txn.execute(sql, (room_id, to_id, limit))

rows = self.cursor_to_dict(txn)
rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]

return rows

rows = yield self.runInteraction("get_room_events_stream_for_room", f)

ret = yield self._get_events(
[r["event_id"] for r in rows],
[r.event_id for r in rows],
get_prev_content=True
)

Expand All @@ -283,7 +294,7 @@ def f(txn):
ret.reverse()

if rows:
key = "s%d" % min(r["stream_ordering"] for r in rows)
key = "s%d" % min(r.stream_ordering for r in rows)
else:
# Assume we didn't get anything because there was nothing to
# get.
Expand Down Expand Up @@ -330,14 +341,15 @@ def f(txn):
" ORDER BY stream_ordering ASC"
)
txn.execute(sql, (user_id, to_id,))
rows = self.cursor_to_dict(txn)

rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]

return rows

rows = yield self.runInteraction("get_membership_changes_for_user", f)

ret = yield self._get_events(
[r["event_id"] for r in rows],
[r.event_id for r in rows],
get_prev_content=True
)

Expand All @@ -353,14 +365,14 @@ def get_recent_events_for_room(self, room_id, limit, end_token):

logger.debug("stream before")
events = yield self._get_events(
[r["event_id"] for r in rows],
[r.event_id for r in rows],
get_prev_content=True
)
logger.debug("stream after")

self._set_before_and_after(events, rows)

defer.returnValue((events, token))
defer.returnValue((events, (token, end_token)))

@defer.inlineCallbacks
def get_recent_event_ids_for_room(self, room_id, limit, end_token):
Expand All @@ -372,15 +384,14 @@ def get_recent_event_ids_for_room(self, room_id, limit, end_token):
end_token (str): The stream token representing now.
Returns:
Deferred[tuple[list[dict], tuple[str, str]]]: Returns a list of
dicts (which include event_ids, etc), and a tuple for
`(start_token, end_token)` representing the range of rows
returned.
The returned events are in ascending order.
Deferred[tuple[list[_EventDictReturn], str]]: Returns a list of
_EventDictReturn and a token pointing to the start of the returned
events.
The events returned are in ascending order.
"""
# Allow a zero limit here, and no-op.
if limit == 0:
defer.returnValue(([], (end_token, end_token)))
defer.returnValue(([], end_token))

end_token = RoomStreamToken.parse_stream_token(end_token)

Expand All @@ -392,7 +403,7 @@ def get_recent_event_ids_for_room(self, room_id, limit, end_token):
# We want to return the results in ascending order.
rows.reverse()

defer.returnValue((rows, (token, str(end_token))))
defer.returnValue((rows, token))

def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
"""Gets details of the first event in a room at or after a stream ordering
Expand Down Expand Up @@ -496,10 +507,20 @@ def _get_max_topological_txn(self, txn, room_id):

@staticmethod
def _set_before_and_after(events, rows, topo_order=True):
"""Inserts ordering information to events' internal metadata from
the DB rows.
Args:
events (list[FrozenEvent])
rows (list[_EventDictReturn])
topo_order (bool): Whether the events were ordered topologically
or by stream ordering. If true then all rows should have a non
null topological_ordering.
"""
for event, row in zip(events, rows):
stream = row["stream_ordering"]
if topo_order:
topo = event.depth
stream = row.stream_ordering
if topo_order and row.topological_ordering:
topo = row.topological_ordering
else:
topo = None
internal = event.internal_metadata
Expand Down Expand Up @@ -586,12 +607,12 @@ def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_lim
rows, start_token = self._paginate_room_events_txn(
txn, room_id, before_token, direction='b', limit=before_limit,
)
events_before = [r["event_id"] for r in rows]
events_before = [r.event_id for r in rows]

rows, end_token = self._paginate_room_events_txn(
txn, room_id, after_token, direction='f', limit=after_limit,
)
events_after = [r["event_id"] for r in rows]
events_after = [r.event_id for r in rows]

return {
"before": {
Expand Down Expand Up @@ -672,9 +693,9 @@ def _paginate_room_events_txn(self, txn, room_id, from_token, to_token=None,
those that match the filter.
Returns:
tuple[list[dict], str]: Returns the results as a list of dicts and
a token that points to the end of the result set. The dicts have
the keys "event_id", "toplogical_ordering" and "stream_ordering".
Deferred[tuple[list[_EventDictReturn], str]]: Returns the results
as a list of _EventDictReturn and a token that points to the end
of the result set.
"""
# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
Expand Down Expand Up @@ -725,11 +746,11 @@ def _paginate_room_events_txn(self, txn, room_id, from_token, to_token=None,

txn.execute(sql, args)

rows = self.cursor_to_dict(txn)
rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]

if rows:
topo = rows[-1]["topological_ordering"]
toke = rows[-1]["stream_ordering"]
topo = rows[-1].topological_ordering
toke = rows[-1].stream_ordering
if direction == 'b':
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
Expand Down Expand Up @@ -764,7 +785,7 @@ def paginate_room_events(self, room_id, from_key, to_key=None,
Returns:
tuple[list[dict], str]: Returns the results as a list of dicts and
a token that points to the end of the result set. The dicts have
the keys "event_id", "toplogical_ordering" and "stream_orderign".
the keys "event_id", "topological_ordering" and "stream_orderign".
"""

from_key = RoomStreamToken.parse(from_key)
Expand All @@ -777,7 +798,7 @@ def paginate_room_events(self, room_id, from_key, to_key=None,
)

events = yield self._get_events(
[r["event_id"] for r in rows],
[r.event_id for r in rows],
get_prev_content=True
)

Expand Down

0 comments on commit 1e5280b

Please sign in to comment.