Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast server awareness to all clients #73

Merged
merged 22 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions pycrdt_websocket/yroom.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from functools import partial
from inspect import isawaitable
from logging import Logger, getLogger
from typing import Awaitable, Callable
from typing import Any, Awaitable, Callable

from anyio import (
TASK_STATUS_IGNORED,
Expand All @@ -16,16 +16,17 @@
from anyio.abc import TaskGroup, TaskStatus
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from pycrdt import (
Awareness,
Doc,
Subscription,
YMessageType,
YSyncMessageType,
create_awareness_message,
create_sync_message,
create_update_message,
handle_sync_message,
)

from .awareness import Awareness
from .websocket import Websocket
from .ystore import BaseYStore
from .yutils import put_updates
Expand Down Expand Up @@ -77,11 +78,12 @@ def __init__(
ydoc: An optional document for the room (a new one is created otherwise).
"""
self.ydoc = Doc() if ydoc is None else ydoc
self.awareness = Awareness(self.ydoc)
self.ready_event = Event()
self.ready = ready
self.ystore = ystore
self.log = log or getLogger(__name__)
self.awareness = Awareness(self.ydoc)
self.awareness.observe(self.send_server_awareness)
self.clients = set()
self._on_message = None
self.exception_handler = exception_handler
Expand Down Expand Up @@ -304,3 +306,34 @@ async def serve(self, websocket: Websocket):
self.clients.remove(websocket)
except Exception as exception:
self._handle_exception(exception)

def send_server_awareness(self, type: str, changes: tuple[dict[str, Any], Any]) -> None:
"""
Callback to broadcast the server awareness to clients.

Arguments:
type: The change type.
changes: The awareness changes.
"""
brichet marked this conversation as resolved.
Show resolved Hide resolved
if type != "update" or changes[1] != "local":
return

if self._task_group is not None:
brichet marked this conversation as resolved.
Show resolved Hide resolved
updated_clients = [v for value in changes[0].values() for v in value]
state = self.awareness.encode_awareness_update(updated_clients)
message = create_awareness_message(state)
self._task_group.start_soon(self._send_server_awareness, message)
else:
self.log.error("Cannot broadcast server awareness: YRoom not started")

async def _send_server_awareness(self, state: bytes) -> None:
try:
async with create_task_group() as tg:
for client in self.clients:
self.log.debug(
"Sending awareness from server to client with endpoint: %s",
client.path,
)
tg.start_soon(client.send, state)
except Exception as e:
self.log.error("Error while broadcasting awareness changes: %s", e)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ classifiers = [
dependencies = [
"anyio >=3.6.2,<5",
"sqlite-anyio >=0.2.3,<0.3.0",
"pycrdt >=0.9.16,<0.10.0",
"pycrdt >=0.10.1,<0.11.0",
]

[project.optional-dependencies]
Expand Down
Loading