From 42118893c741d6b5e2254c5ff7825770e6362471 Mon Sep 17 00:00:00 2001 From: Jialin Zhang Date: Tue, 23 Apr 2024 13:48:59 -0700 Subject: [PATCH] adding exception handling for room start tasks --- pycrdt_websocket/yroom.py | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/pycrdt_websocket/yroom.py b/pycrdt_websocket/yroom.py index f0c12c8..f173c81 100644 --- a/pycrdt_websocket/yroom.py +++ b/pycrdt_websocket/yroom.py @@ -5,6 +5,7 @@ from inspect import isawaitable from logging import Logger, getLogger from typing import Awaitable, Callable +from tornado.websocket import WebSocketClosedError from anyio import ( TASK_STATUS_IGNORED, @@ -138,12 +139,20 @@ async def _broadcast_updates(self): # broadcast internal ydoc's update to all clients, that includes changes from the # clients and changes from the backend (out-of-band changes) for client in self.clients: - self.log.debug("Sending Y update to client with endpoint: %s", client.path) - message = create_update_message(update) - self._task_group.start_soon(client.send, message) + try: + self.log.debug("Sending Y update to client with endpoint: %s", client.path) + message = create_update_message(update) + self._task_group.start_soon(client.send, message) + except Exception as e: + self.log.error("Error sending Y update to client with endpoint: %s", client.path, exc_info=e) + if isinstance(e, WebSocketClosedError): + self.client.remove(client) if self.ystore: - self.log.debug("Writing Y update to YStore") - self._task_group.start_soon(self.ystore.write, update) + try: + self._task_group.start_soon(self.ystore.write, update) + self.log.debug("Writing Y update to YStore") + except Exception as e: + self.log.error("Error writing Y update to YStore", exc_info=e) async def __aenter__(self) -> YRoom: async with self._start_lock: @@ -210,9 +219,9 @@ async def serve(self, websocket: Websocket): websocket: The WebSocket through which to serve the client. """ async with create_task_group() as tg: - self.clients.append(websocket) - await sync(self.ydoc, websocket, self.log) try: + self.clients.append(websocket) + await sync(self.ydoc, websocket, self.log) async for message in websocket: # filter messages (e.g. awareness) skip = False @@ -246,7 +255,9 @@ async def serve(self, websocket: Websocket): ) tg.start_soon(client.send, message) except Exception as e: - self.log.debug("Error serving endpoint: %s", websocket.path, exc_info=e) + self.log.error("Error serving endpoint: %s", websocket.path, exc_info=e) + if isinstance(e, WebSocketClosedError): + raise e # remove this client self.clients = [c for c in self.clients if c != websocket]