Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast server awareness to all clients #73

Merged
merged 22 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions pycrdt_websocket/yroom.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from functools import partial
from inspect import isawaitable
from logging import Logger, getLogger
from typing import Awaitable, Callable
from typing import Any, Awaitable, Callable

from anyio import (
TASK_STATUS_IGNORED,
Expand All @@ -16,16 +16,17 @@
from anyio.abc import TaskGroup, TaskStatus
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from pycrdt import (
Awareness,
Doc,
Subscription,
YMessageType,
YSyncMessageType,
create_awareness_message,
create_sync_message,
create_update_message,
handle_sync_message,
)

from .awareness import Awareness
from .websocket import Websocket
from .ystore import BaseYStore
from .yutils import put_updates
Expand Down Expand Up @@ -77,11 +78,12 @@ def __init__(
ydoc: An optional document for the room (a new one is created otherwise).
"""
self.ydoc = Doc() if ydoc is None else ydoc
self.awareness = Awareness(self.ydoc)
self.ready_event = Event()
self.ready = ready
self.ystore = ystore
self.log = log or getLogger(__name__)
self.awareness = Awareness(self.ydoc)
self.awareness.observe(self.send_server_awareness)
self.clients = set()
self._on_message = None
self.exception_handler = exception_handler
Expand Down Expand Up @@ -304,3 +306,30 @@ async def serve(self, websocket: Websocket):
self.clients.remove(websocket)
except Exception as exception:
self._handle_exception(exception)

def send_server_awareness(self, type: str, changes: tuple[dict[str, Any], Any]) -> None:
"""
Callback to broadcast the server awareness to clients.
"""
brichet marked this conversation as resolved.
Show resolved Hide resolved
if type != "change" or changes[1] != "local":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we actually broadcast every update?

Suggested change
if type != "change" or changes[1] != "local":
if type != "update" or changes[1] != "local":

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your right, we should broadcast all events.
Currently the update event is triggered only when the change event is triggered.
Should we trigger an update every X sec ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need this logic in pycrdt. I'll work on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's what I meant.

return

if self._task_group is not None:
brichet marked this conversation as resolved.
Show resolved Hide resolved
updated_clients = [v for value in changes[0].values() for v in value]
state = self.awareness.encode_awareness_update(updated_clients)
message = create_awareness_message(state)
self._task_group.start_soon(self._local_update_awareness, message)
brichet marked this conversation as resolved.
Show resolved Hide resolved
else:
self.log.error("Cannot broadcast server awareness: YRoom not started")

async def _local_update_awareness(self, state: bytes) -> None:
brichet marked this conversation as resolved.
Show resolved Hide resolved
try:
async with create_task_group() as tg:
for client in self.clients:
self.log.debug(
"Sending awareness from server to client with endpoint: %s",
client.path,
)
tg.start_soon(client.send, state)
except Exception as e:
self.log.error("Error while broadcasting awareness changes: %s", e)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ classifiers = [
dependencies = [
"anyio >=3.6.2,<5",
"sqlite-anyio >=0.2.3,<0.3.0",
"pycrdt >=0.9.16,<0.10.0",
"pycrdt >=0.10.0,<0.11.0",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's take the latest bugfix.
I'm also wondering if we should wait for the implementation of periodic awareness updates in pycrdt?

Suggested change
"pycrdt >=0.10.0,<0.11.0",
"pycrdt >=0.10.1,<0.11.0",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is required for this change.
But we should probably remove the type != "update" condition to broadcast all the event.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But an "update" change is sent for all events, right? So we should keep it, to deduplicate "change" events. Or am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, sorry again for the confusion. The change event is only for observers, but not for clients...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'll merge then.

]

[project.optional-dependencies]
Expand Down
Loading