Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix paginating /relations with a live token (#14866)
Browse files Browse the repository at this point in the history
The `/relations` endpoint was not properly handle "live tokens"
(i.e sync tokens), to do this properly we abstract the code that
`/messages` has and re-use it.
  • Loading branch information
clokep authored Jan 26, 2023
1 parent ba79fb4 commit 345576b
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 70 deletions.
1 change: 1 addition & 0 deletions changelog.d/14866.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.53.0 where `next_batch` tokens from `/sync` could not be used with the `/relations` endpoint.
38 changes: 17 additions & 21 deletions synapse/storage/databases/main/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@
LoggingTransaction,
make_in_list_sql_clause,
)
from synapse.storage.databases.main.stream import generate_pagination_where_clause
from synapse.storage.databases.main.stream import (
generate_next_token,
generate_pagination_bounds,
generate_pagination_where_clause,
)
from synapse.storage.engines import PostgresEngine
from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken
from synapse.types import JsonDict, StreamKeyType, StreamToken
from synapse.util.caches.descriptors import cached, cachedList

if TYPE_CHECKING:
Expand Down Expand Up @@ -207,24 +211,23 @@ async def get_relations_for_event(
where_clause.append("type = ?")
where_args.append(event_type)

order, from_bound, to_bound = generate_pagination_bounds(
direction,
from_token.room_key if from_token else None,
to_token.room_key if to_token else None,
)

pagination_clause = generate_pagination_where_clause(
direction=direction,
column_names=("topological_ordering", "stream_ordering"),
from_token=from_token.room_key.as_historical_tuple()
if from_token
else None,
to_token=to_token.room_key.as_historical_tuple() if to_token else None,
from_token=from_bound,
to_token=to_bound,
engine=self.database_engine,
)

if pagination_clause:
where_clause.append(pagination_clause)

if direction == "b":
order = "DESC"
else:
order = "ASC"

sql = """
SELECT event_id, relation_type, sender, topological_ordering, stream_ordering
FROM event_relations
Expand Down Expand Up @@ -266,16 +269,9 @@ def _get_recent_references_for_event_txn(
topo_orderings = topo_orderings[:limit]
stream_orderings = stream_orderings[:limit]

topo = topo_orderings[-1]
token = stream_orderings[-1]
if direction == "b":
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
token -= 1
next_key = RoomStreamToken(topo, token)
next_key = generate_next_token(
direction, topo_orderings[-1], stream_orderings[-1]
)

if from_token:
next_token = from_token.copy_and_replace(
Expand Down
154 changes: 105 additions & 49 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,104 @@ def generate_pagination_where_clause(
return " AND ".join(where_clause)


def generate_pagination_bounds(
direction: str,
from_token: Optional[RoomStreamToken],
to_token: Optional[RoomStreamToken],
) -> Tuple[
str, Optional[Tuple[Optional[int], int]], Optional[Tuple[Optional[int], int]]
]:
"""
Generate a start and end point for this page of events.
Args:
direction: Whether pagination is going forwards or backwards. One of "f" or "b".
from_token: The token to start pagination at, or None to start at the first value.
to_token: The token to end pagination at, or None to not limit the end point.
Returns:
A three tuple of:
ASC or DESC for sorting of the query.
The starting position as a tuple of ints representing
(topological position, stream position) or None if no from_token was
provided. The topological position may be None for live tokens.
The end position in the same format as the starting position, or None
if no to_token was provided.
"""

# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
if direction == "b":
order = "DESC"
else:
order = "ASC"

# The bounds for the stream tokens are complicated by the fact
# that we need to handle the instance_map part of the tokens. We do this
# by fetching all events between the min stream token and the maximum
# stream token (as returned by `RoomStreamToken.get_max_stream_pos`) and
# then filtering the results.
from_bound: Optional[Tuple[Optional[int], int]] = None
if from_token:
if from_token.topological is not None:
from_bound = from_token.as_historical_tuple()
elif direction == "b":
from_bound = (
None,
from_token.get_max_stream_pos(),
)
else:
from_bound = (
None,
from_token.stream,
)

to_bound: Optional[Tuple[Optional[int], int]] = None
if to_token:
if to_token.topological is not None:
to_bound = to_token.as_historical_tuple()
elif direction == "b":
to_bound = (
None,
to_token.stream,
)
else:
to_bound = (
None,
to_token.get_max_stream_pos(),
)

return order, from_bound, to_bound


def generate_next_token(
direction: str, last_topo_ordering: int, last_stream_ordering: int
) -> RoomStreamToken:
"""
Generate the next room stream token based on the currently returned data.
Args:
direction: Whether pagination is going forwards or backwards. One of "f" or "b".
last_topo_ordering: The last topological ordering being returned.
last_stream_ordering: The last stream ordering being returned.
Returns:
A new RoomStreamToken to return to the client.
"""
if direction == "b":
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
last_stream_ordering -= 1
return RoomStreamToken(last_topo_ordering, last_stream_ordering)


def _make_generic_sql_bound(
bound: str,
column_names: Tuple[str, str],
Expand Down Expand Up @@ -1300,47 +1398,11 @@ def _paginate_room_events_txn(
`to_token`), or `limit` is zero.
"""

# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
args = [False, room_id]
if direction == "b":
order = "DESC"
else:
order = "ASC"

# The bounds for the stream tokens are complicated by the fact
# that we need to handle the instance_map part of the tokens. We do this
# by fetching all events between the min stream token and the maximum
# stream token (as returned by `RoomStreamToken.get_max_stream_pos`) and
# then filtering the results.
if from_token.topological is not None:
from_bound: Tuple[Optional[int], int] = from_token.as_historical_tuple()
elif direction == "b":
from_bound = (
None,
from_token.get_max_stream_pos(),
)
else:
from_bound = (
None,
from_token.stream,
)

to_bound: Optional[Tuple[Optional[int], int]] = None
if to_token:
if to_token.topological is not None:
to_bound = to_token.as_historical_tuple()
elif direction == "b":
to_bound = (
None,
to_token.stream,
)
else:
to_bound = (
None,
to_token.get_max_stream_pos(),
)
order, from_bound, to_bound = generate_pagination_bounds(
direction, from_token, to_token
)

bounds = generate_pagination_where_clause(
direction=direction,
Expand Down Expand Up @@ -1436,16 +1498,10 @@ def _paginate_room_events_txn(
][:limit]

if rows:
topo = rows[-1].topological_ordering
token = rows[-1].stream_ordering
if direction == "b":
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
token -= 1
next_token = RoomStreamToken(topo, token)
assert rows[-1].topological_ordering is not None
next_token = generate_next_token(
direction, rows[-1].topological_ordering, rows[-1].stream_ordering
)
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_token if to_token else from_token
Expand Down

0 comments on commit 345576b

Please sign in to comment.