Skip to content

Commit

Permalink
adding exception handling for room start tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Jialin Zhang committed Apr 26, 2024
1 parent 29bd17b commit 4211889
Showing 1 changed file with 19 additions and 8 deletions.
27 changes: 19 additions & 8 deletions pycrdt_websocket/yroom.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from inspect import isawaitable
from logging import Logger, getLogger
from typing import Awaitable, Callable
from tornado.websocket import WebSocketClosedError

from anyio import (
TASK_STATUS_IGNORED,
Expand Down Expand Up @@ -138,12 +139,20 @@ async def _broadcast_updates(self):
# broadcast internal ydoc's update to all clients, that includes changes from the
# clients and changes from the backend (out-of-band changes)
for client in self.clients:
self.log.debug("Sending Y update to client with endpoint: %s", client.path)
message = create_update_message(update)
self._task_group.start_soon(client.send, message)
try:
self.log.debug("Sending Y update to client with endpoint: %s", client.path)
message = create_update_message(update)
self._task_group.start_soon(client.send, message)
except Exception as e:
self.log.error("Error sending Y update to client with endpoint: %s", client.path, exc_info=e)
if isinstance(e, WebSocketClosedError):
self.client.remove(client)
if self.ystore:
self.log.debug("Writing Y update to YStore")
self._task_group.start_soon(self.ystore.write, update)
try:
self._task_group.start_soon(self.ystore.write, update)
self.log.debug("Writing Y update to YStore")
except Exception as e:
self.log.error("Error writing Y update to YStore", exc_info=e)

async def __aenter__(self) -> YRoom:
async with self._start_lock:
Expand Down Expand Up @@ -210,9 +219,9 @@ async def serve(self, websocket: Websocket):
websocket: The WebSocket through which to serve the client.
"""
async with create_task_group() as tg:
self.clients.append(websocket)
await sync(self.ydoc, websocket, self.log)
try:
self.clients.append(websocket)
await sync(self.ydoc, websocket, self.log)
async for message in websocket:
# filter messages (e.g. awareness)
skip = False
Expand Down Expand Up @@ -246,7 +255,9 @@ async def serve(self, websocket: Websocket):
)
tg.start_soon(client.send, message)
except Exception as e:
self.log.debug("Error serving endpoint: %s", websocket.path, exc_info=e)
self.log.error("Error serving endpoint: %s", websocket.path, exc_info=e)
if isinstance(e, WebSocketClosedError):
raise e

# remove this client
self.clients = [c for c in self.clients if c != websocket]

0 comments on commit 4211889

Please sign in to comment.