Skip to content

Commit

Permalink
Server can subscribe to awareness changes (only those from the frontend)
Browse files Browse the repository at this point in the history
  • Loading branch information
brichet committed Oct 1, 2024
1 parent 667da1c commit ee37fb1
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 15 deletions.
63 changes: 49 additions & 14 deletions pycrdt_websocket/awareness.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import json
import time
from logging import Logger, getLogger
from typing import Any, Callable, Optional
from typing import Any, Callable
from uuid import uuid4

# Some magic to avoid 'event loop already running' error.
Expand All @@ -19,25 +19,34 @@


class Awareness:
client_id: int
log: Logger
meta: dict[str, Any]
_states: dict[str, Any]
_subscriptions: list[Callable[[dict[str, Any]], None]]
_user: dict[str, str] | None

def __init__(
self,
ydoc: Doc,
log: Logger | None = None,
on_change: Optional[Callable[[dict[str, Any]], None]] = None,
user: Optional[dict[str, str]] = None,
on_change: Callable[[bytes], None] | None = None,
user: dict[str, str] | None = None,
):
self.client_id = ydoc.client_id
self.log = log or getLogger(__name__)
self.meta = {}
self.states = {}
self._states = {}

if user is not None:
self.user = user
else:
self._user = DEFAULT_USER
self.states[self.client_id] = {"user": DEFAULT_USER}
self._states[self.client_id] = {"user": DEFAULT_USER}
self.on_change = on_change

self._subscriptions = []

@property
def user(self) -> dict[str, str]:
return self._user
Expand Down Expand Up @@ -70,19 +79,19 @@ def get_changes(self, message: bytes) -> dict[str, Any]:
if state is not None:
states.append(state)
client_meta = self.meta.get(client_id)
prev_state = self.states.get(client_id)
prev_state = self._states.get(client_id)
curr_clock = 0 if client_meta is None else client_meta["clock"]
if curr_clock < clock or (
curr_clock == clock and state is None and client_id in self.states
curr_clock == clock and state is None and client_id in self._states
):
if state is None:
if client_id == self.client_id and self.states.get(client_id) is not None:
if client_id == self.client_id and self._states.get(client_id) is not None:
clock += 1
else:
if client_id in self.states:
del self.states[client_id]
if client_id in self._states:
del self._states[client_id]
else:
self.states[client_id] = state
self._states[client_id] = state
self.meta[client_id] = {
"clock": clock,
"last_updated": timestamp,
Expand All @@ -95,21 +104,30 @@ def get_changes(self, message: bytes) -> dict[str, Any]:
if state != prev_state:
filtered_updated.append(client_id)
updated.append(client_id)
return {

changes = {
"added": added,
"updated": updated,
"filtered_updated": filtered_updated,
"removed": removed,
"states": states,
}

# Do not trigger the callbacks if it is only a keep alive update
if len(added) or len(filtered_updated) or len(removed):
for callback in self._subscriptions:
callback(changes)

return changes

def get_local_state(self) -> dict[str, Any]:
return self.states.get(self.client_id, {})
return self._states.get(self.client_id, {})

async def set_local_state(self, state: dict[str, Any]):
# Update the state and the meta.
timestamp = int(time.time() * 1000)
clock = self.meta.get(self.client_id, {}).get("clock", -1) + 1
self.states[self.client_id] = state
self._states[self.client_id] = state
self.meta[self.client_id] = {"clock": clock, "last_updated": timestamp}
# Build the message to broadcast, with the following information:
# - message type
Expand All @@ -134,3 +152,20 @@ async def set_local_state_field(self, field: str, value: Any):
current_state = self.get_local_state()
current_state.update({field: value})
await self.set_local_state(current_state)

def observe(self, callback: Callable[[dict[str, Any]], None]) -> None:
"""
Subscribes to awareness changes.
:param callback: Callback that will be called when the document changes.
:type callback: Callable[[str, Any], None]
"""
self._subscriptions.append(callback)

def unobserve(self) -> None:
"""
Unsubscribes to awareness changes.
This method removes all the callbacks.
"""
self._subscriptions = []
2 changes: 1 addition & 1 deletion pycrdt_websocket/yroom.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,4 +315,4 @@ async def local_update_awareness(self, state):
)
tg.start_soon(client.send, state)
except Exception as e:
self.log.error(f"Error while broadcasting awareness changes: {e}")
self.log.error("Error while broadcasting awareness changes: %s", e)

0 comments on commit ee37fb1

Please sign in to comment.