Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Make token serializing/deserializing async #8427

Merged
merged 5 commits into from
Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8427.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make stream token serializing/deserializing async.
4 changes: 2 additions & 2 deletions synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ async def get_stream(

chunk = {
"chunk": chunks,
"start": tokens[0].to_string(),
"end": tokens[1].to_string(),
"start": await tokens[0].to_string(self.store),
"end": await tokens[1].to_string(self.store),
}

return chunk
Expand Down
14 changes: 7 additions & 7 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ async def handle_room(event: RoomsForUser):
messages, time_now=time_now, as_client_event=as_client_event
)
),
"start": start_token.to_string(),
"end": end_token.to_string(),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
}

d["state"] = await self._event_serializer.serialize_events(
Expand Down Expand Up @@ -249,7 +249,7 @@ async def handle_room(event: RoomsForUser):
],
"account_data": account_data_events,
"receipts": receipt,
"end": now_token.to_string(),
"end": await now_token.to_string(self.store),
}

return ret
Expand Down Expand Up @@ -348,8 +348,8 @@ async def _room_initial_sync_parted(
"chunk": (
await self._event_serializer.serialize_events(messages, time_now)
),
"start": start_token.to_string(),
"end": end_token.to_string(),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
},
"state": (
await self._event_serializer.serialize_events(
Expand Down Expand Up @@ -447,8 +447,8 @@ async def get_receipts():
"chunk": (
await self._event_serializer.serialize_events(messages, time_now)
),
"start": start_token.to_string(),
"end": end_token.to_string(),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
},
"state": state,
"presence": presence,
Expand Down
8 changes: 4 additions & 4 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,8 @@ async def get_messages(
if not events:
return {
"chunk": [],
"start": from_token.to_string(),
"end": next_token.to_string(),
"start": await from_token.to_string(self.store),
"end": await next_token.to_string(self.store),
}

state = None
Expand Down Expand Up @@ -442,8 +442,8 @@ async def get_messages(
events, time_now, as_client_event=as_client_event
)
),
"start": from_token.to_string(),
"end": next_token.to_string(),
"start": await from_token.to_string(self.store),
"end": await next_token.to_string(self.store),
}

if state:
Expand Down
8 changes: 5 additions & 3 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1077,11 +1077,13 @@ def filter_evts(events):
# the token, which we replace.
token = StreamToken.START

results["start"] = token.copy_and_replace(
results["start"] = await token.copy_and_replace(
"room_key", results["start"]
).to_string()
).to_string(self.store)

results["end"] = token.copy_and_replace("room_key", results["end"]).to_string()
results["end"] = await token.copy_and_replace(
"room_key", results["end"]
).to_string(self.store)

return results

Expand Down
8 changes: 4 additions & 4 deletions synapse/handlers/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,13 +362,13 @@ async def search(self, user, content, batch=None):
self.storage, user.to_string(), res["events_after"]
)

res["start"] = now_token.copy_and_replace(
res["start"] = await now_token.copy_and_replace(
"room_key", res["start"]
).to_string()
).to_string(self.store)

res["end"] = now_token.copy_and_replace(
res["end"] = await now_token.copy_and_replace(
"room_key", res["end"]
).to_string()
).to_string(self.store)

if include_profile:
senders = {
Expand Down
2 changes: 1 addition & 1 deletion synapse/rest/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async def on_POST(self, request, room_id, event_id):
raise SynapseError(400, "Event is for wrong room.")

room_token = await self.store.get_topological_token_for_event(event_id)
token = str(room_token)
token = await room_token.to_string(self.store)

logger.info("[purge] purging up to token %s (event_id %s)", token, event_id)
elif "purge_up_to_ts" in body:
Expand Down
3 changes: 2 additions & 1 deletion synapse/rest/client/v1/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self, hs):
super().__init__()
self.event_stream_handler = hs.get_event_stream_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastore()

async def on_GET(self, request):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
Expand All @@ -44,7 +45,7 @@ async def on_GET(self, request):
if b"room_id" in request.args:
room_id = request.args[b"room_id"][0].decode("ascii")

pagin_config = PaginationConfig.from_request(request)
pagin_config = await PaginationConfig.from_request(self.store, request)
timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS
if b"timeout" in request.args:
try:
Expand Down
3 changes: 2 additions & 1 deletion synapse/rest/client/v1/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ def __init__(self, hs):
super().__init__()
self.initial_sync_handler = hs.get_initial_sync_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastore()

async def on_GET(self, request):
requester = await self.auth.get_user_by_req(request)
as_client_event = b"raw" not in request.args
pagination_config = PaginationConfig.from_request(request)
pagination_config = await PaginationConfig.from_request(self.store, request)
include_archived = parse_boolean(request, "archived", default=False)
content = await self.initial_sync_handler.snapshot_all_rooms(
user_id=requester.user.to_string(),
Expand Down
11 changes: 8 additions & 3 deletions synapse/rest/client/v1/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ def __init__(self, hs):
super().__init__()
self.message_handler = hs.get_message_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastore()

async def on_GET(self, request, room_id):
# TODO support Pagination stream API (limit/tokens)
Expand All @@ -465,7 +466,7 @@ async def on_GET(self, request, room_id):
if at_token_string is None:
at_token = None
else:
at_token = StreamToken.from_string(at_token_string)
at_token = await StreamToken.from_string(self.store, at_token_string)

# let you filter down on particular memberships.
# XXX: this may not be the best shape for this API - we could pass in a filter
Expand Down Expand Up @@ -521,10 +522,13 @@ def __init__(self, hs):
super().__init__()
self.pagination_handler = hs.get_pagination_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastore()

async def on_GET(self, request, room_id):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
pagination_config = PaginationConfig.from_request(request, default_limit=10)
pagination_config = await PaginationConfig.from_request(
self.store, request, default_limit=10
)
as_client_event = b"raw" not in request.args
filter_str = parse_string(request, b"filter", encoding="utf-8")
if filter_str:
Expand Down Expand Up @@ -580,10 +584,11 @@ def __init__(self, hs):
super().__init__()
self.initial_sync_handler = hs.get_initial_sync_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastore()

async def on_GET(self, request, room_id):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
pagination_config = PaginationConfig.from_request(request)
pagination_config = await PaginationConfig.from_request(self.store, request)
content = await self.initial_sync_handler.room_initial_sync(
room_id=room_id, requester=requester, pagin_config=pagination_config
)
Expand Down
3 changes: 2 additions & 1 deletion synapse/rest/client/v2_alpha/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def __init__(self, hs):
super().__init__()
self.auth = hs.get_auth()
self.device_handler = hs.get_device_handler()
self.store = hs.get_datastore()

async def on_GET(self, request):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
Expand All @@ -191,7 +192,7 @@ async def on_GET(self, request):
# changes after the "to" as well as before.
set_tag("to", parse_string(request, "to"))

from_token = StreamToken.from_string(from_token_string)
from_token = await StreamToken.from_string(self.store, from_token_string)

user_id = requester.user.to_string()

Expand Down
10 changes: 5 additions & 5 deletions synapse/rest/client/v2_alpha/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def __init__(self, hs):
super().__init__()
self.hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastore()
self.sync_handler = hs.get_sync_handler()
self.clock = hs.get_clock()
self.filtering = hs.get_filtering()
Expand Down Expand Up @@ -151,10 +152,9 @@ async def on_GET(self, request):
device_id=device_id,
)

since_token = None
if since is not None:
since_token = StreamToken.from_string(since)
else:
since_token = None
since_token = await StreamToken.from_string(self.store, since)

# send any outstanding server notices to the user.
await self._server_notices_sender.on_user_syncing(user.to_string())
Expand Down Expand Up @@ -236,7 +236,7 @@ async def encode_response(self, time_now, sync_result, access_token_id, filter):
"leave": sync_result.groups.leave,
},
"device_one_time_keys_count": sync_result.device_one_time_keys_count,
"next_batch": sync_result.next_batch.to_string(),
"next_batch": await sync_result.next_batch.to_string(self.store),
}

@staticmethod
Expand Down Expand Up @@ -413,7 +413,7 @@ def serialize(events):
result = {
"timeline": {
"events": serialized_timeline,
"prev_batch": room.timeline.prev_batch.to_string(),
"prev_batch": await room.timeline.prev_batch.to_string(self.store),
"limited": room.timeline.limited,
},
"state": {"events": serialized_state},
Expand Down
8 changes: 4 additions & 4 deletions synapse/storage/databases/main/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ async def purge_history(
The set of state groups that are referenced by deleted events.
"""

parsed_token = await RoomStreamToken.parse(self, token)

return await self.db_pool.runInteraction(
"purge_history",
self._purge_history_txn,
room_id,
token,
parsed_token,
delete_local_events,
)

def _purge_history_txn(self, txn, room_id, token_str, delete_local_events):
token = RoomStreamToken.parse(token_str)

def _purge_history_txn(self, txn, room_id, token, delete_local_events):
# Tables that should be pruned:
# event_auth
# event_backward_extremities
Expand Down
9 changes: 5 additions & 4 deletions synapse/streams/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import Optional

Expand All @@ -21,6 +20,7 @@
from synapse.api.errors import SynapseError
from synapse.http.servlet import parse_integer, parse_string
from synapse.http.site import SynapseRequest
from synapse.storage.databases.main import DataStore
from synapse.types import StreamToken

logger = logging.getLogger(__name__)
Expand All @@ -39,8 +39,9 @@ class PaginationConfig:
limit = attr.ib(type=Optional[int])

@classmethod
def from_request(
async def from_request(
cls,
store: "DataStore",
request: SynapseRequest,
raise_invalid_params: bool = True,
default_limit: Optional[int] = None,
Expand All @@ -54,13 +55,13 @@ def from_request(
if from_tok == "END":
from_tok = None # For backwards compat.
elif from_tok:
from_tok = StreamToken.from_string(from_tok)
from_tok = await StreamToken.from_string(store, from_tok)
except Exception:
raise SynapseError(400, "'from' parameter is invalid")

try:
if to_tok:
to_tok = StreamToken.from_string(to_tok)
to_tok = await StreamToken.from_string(store, to_tok)
except Exception:
raise SynapseError(400, "'to' parameter is invalid")

Expand Down
Loading