diff --git a/pycrdt_websocket/yroom.py b/pycrdt_websocket/yroom.py index 646642a..3a8279f 100644 --- a/pycrdt_websocket/yroom.py +++ b/pycrdt_websocket/yroom.py @@ -4,7 +4,7 @@ from functools import partial from inspect import isawaitable from logging import Logger, getLogger -from typing import Awaitable, Callable +from typing import Any, Awaitable, Callable from anyio import ( TASK_STATUS_IGNORED, @@ -16,16 +16,17 @@ from anyio.abc import TaskGroup, TaskStatus from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from pycrdt import ( + Awareness, Doc, Subscription, YMessageType, YSyncMessageType, + create_awareness_message, create_sync_message, create_update_message, handle_sync_message, ) -from .awareness import Awareness from .websocket import Websocket from .ystore import BaseYStore from .yutils import put_updates @@ -77,11 +78,12 @@ def __init__( ydoc: An optional document for the room (a new one is created otherwise). """ self.ydoc = Doc() if ydoc is None else ydoc - self.awareness = Awareness(self.ydoc) self.ready_event = Event() self.ready = ready self.ystore = ystore self.log = log or getLogger(__name__) + self.awareness = Awareness(self.ydoc) + self.awareness.observe(self.send_server_awareness) self.clients = set() self._on_message = None self.exception_handler = exception_handler @@ -304,3 +306,34 @@ async def serve(self, websocket: Websocket): self.clients.remove(websocket) except Exception as exception: self._handle_exception(exception) + + def send_server_awareness(self, type: str, changes: tuple[dict[str, Any], Any]) -> None: + """ + Callback to broadcast the server awareness to clients. + + Arguments: + type: The change type. + changes: The awareness changes. + """ + if type != "update" or changes[1] != "local": + return + + if self._task_group is not None: + updated_clients = [v for value in changes[0].values() for v in value] + state = self.awareness.encode_awareness_update(updated_clients) + message = create_awareness_message(state) + self._task_group.start_soon(self._send_server_awareness, message) + else: + self.log.error("Cannot broadcast server awareness: YRoom not started") + + async def _send_server_awareness(self, state: bytes) -> None: + try: + async with create_task_group() as tg: + for client in self.clients: + self.log.debug( + "Sending awareness from server to client with endpoint: %s", + client.path, + ) + tg.start_soon(client.send, state) + except Exception as e: + self.log.error("Error while broadcasting awareness changes: %s", e) diff --git a/pyproject.toml b/pyproject.toml index 5eeb416..b9685a0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ classifiers = [ dependencies = [ "anyio >=3.6.2,<5", "sqlite-anyio >=0.2.3,<0.3.0", - "pycrdt >=0.9.16,<0.10.0", + "pycrdt >=0.10.1,<0.11.0", ] [project.optional-dependencies]