Skip to content

Commit

Permalink
Support document fork
Browse files Browse the repository at this point in the history
  • Loading branch information
davidbrochart committed Aug 27, 2024
1 parent 179a146 commit 732f1b9
Showing 1 changed file with 21 additions and 11 deletions.
32 changes: 21 additions & 11 deletions pycrdt_websocket/yroom.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@


class YRoom:
clients: list
clients: set[Websocket]
fork_ydocs: set[Doc]
ydoc: Doc
ystore: BaseYStore | None
ready_event: Event
Expand All @@ -51,6 +52,7 @@ def __init__(
ystore: BaseYStore | None = None,
exception_handler: Callable[[Exception, Logger], bool] | None = None,
log: Logger | None = None,
ydoc: Doc | None = None,
):
"""Initialize the object.
Expand All @@ -74,13 +76,14 @@ def __init__(
returns True if the exception was handled.
log: An optional logger.
"""
self.ydoc = Doc()
self.ydoc = Doc() if ydoc is None else ydoc
self.awareness = Awareness(self.ydoc)
self.ready_event = Event()
self.ready = ready
self.ystore = ystore
self.log = log or getLogger(__name__)
self.clients = []
self.clients = set()
self.fork_ydocs = set()
self._on_message = None
self.exception_handler = exception_handler
self._stopped = Event()
Expand Down Expand Up @@ -147,13 +150,19 @@ async def _broadcast_updates(self):
return
# broadcast internal ydoc's update to all clients, that includes changes from the
# clients and changes from the backend (out-of-band changes)
for client in self.clients:
try:
self.log.debug("Sending Y update to client with endpoint: %s", client.path)
message = create_update_message(update)
self._task_group.start_soon(client.send, message)
except Exception as exception:
self._handle_exception(exception)
for ydoc in self.fork_ydocs:
ydoc.apply_update(update)

if self.clients:
message = create_update_message(update)
for client in self.clients:
try:
self.log.debug(
"Sending Y update to remote client with endpoint: %s", client.path
)
self._task_group.start_soon(client.send, message)
except Exception as exception:
self._handle_exception(exception)
if self.ystore:
try:
self._task_group.start_soon(self.ystore.write, update)
Expand Down Expand Up @@ -245,7 +254,7 @@ async def serve(self, websocket: Websocket):
"""
try:
async with create_task_group() as tg:
self.clients.append(websocket)
self.clients.add(websocket)
sync_message = create_sync_message(self.ydoc)
self.log.debug(
"Sending %s message to endpoint: %s",
Expand Down Expand Up @@ -297,5 +306,6 @@ async def serve(self, websocket: Websocket):
tg.start_soon(client.send, message)
# remove this client
self.clients = [c for c in self.clients if c != websocket]
self.clients.remove(websocket)
except Exception as exception:
self._handle_exception(exception)

0 comments on commit 732f1b9

Please sign in to comment.