Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add room storage for Django Channels Consumers #25

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/reference/Django_Channels.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
## Consumer

::: pycrdt_websocket.django_channels.yjs_consumer.YjsConsumer

## Storage

### BaseYRoomStorage
::: pycrdt_websocket.django_channels.yroom_storage.BaseYRoomStorage

### RedisYRoomStorage
::: pycrdt_websocket.django_channels.yroom_storage.RedisYRoomStorage
1 change: 0 additions & 1 deletion docs/reference/Django_Channels_consumer.md

This file was deleted.

2 changes: 1 addition & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ nav:
- reference/WebSocket_provider.md
- reference/WebSocket_server.md
- reference/ASGI_server.md
- reference/Django_Channels_consumer.md
- reference/Django_Channels.md
- reference/WebSocket.md
- reference/Room.md
- reference/Store.md
Expand Down
2 changes: 2 additions & 0 deletions pycrdt_websocket/django_channels/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .storage.base_yroom_storage import BaseYRoomStorage as BaseYRoomStorage
from .yjs_consumer import YjsConsumer as YjsConsumer
101 changes: 101 additions & 0 deletions pycrdt_websocket/django_channels/storage/base_yroom_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from abc import ABC, abstractmethod
from typing import Optional

from pycrdt import Doc


class BaseYRoomStorage(ABC):
"""Base class for YRoom storage.
This class is responsible for storing, retrieving, updating and persisting the Ypy document.
Each Django Channels Consumer should have its own YRoomStorage instance, although all consumers
and rooms with the same room name will be connected to the same document in the end.
Updates to the document should be sent to the shared storage, instead of each
consumer having its own version of the YDoc.

A full example of a Redis as temporary storage and Postgres as persistent storage is:
```py
from typing import Optional
from django.db import models
from ypy_websocket.django_channels.yroom_storage import RedisYRoomStorage

class YDocSnapshotManager(models.Manager):
async def aget_snapshot(self, name) -> Optional[bytes]:
try:
instance: YDocSnapshot = await self.aget(name=name)
result = instance.data
if not isinstance(result, bytes):
# Postgres on psycopg2 returns memoryview
return bytes(result)
except YDocSnapshot.DoesNotExist:
return None
else:
return result

async def asave_snapshot(self, name, data):
return await self.aupdate_or_create(name=name, defaults={"data": data})

class YDocSnapshot(models.Model):
name = models.CharField(max_length=255, primary_key=True)
data = models.BinaryField()
objects = YDocSnapshotManager()

class CustomRoomStorage(RedisYRoomStorage):
async def load_snapshot(self) -> Optional[bytes]:
return await YDocSnapshot.objects.aget_snapshot(self.room_name)

async def save_snapshot(self):
current_snapshot = await self.redis.get(self.redis_key)
if not current_snapshot:
return
await YDocSnapshot.objects.asave_snapshot(
self.room_name,
current_snapshot,
)
```
"""

def __init__(self, room_name: str) -> None:
self.room_name = room_name

@abstractmethod
async def get_document(self) -> Doc:
"""Gets the document from the storage.
Ideally it should be retrieved first from temporary storage (e.g. Redis) and then from
persistent storage (e.g. a database).
Returns:
The document with the latest changes.
"""
...

@abstractmethod
async def update_document(self, update: bytes) -> None:
"""Updates the document in the storage.
Updates could be received by Yjs client (e.g. from a WebSocket) or from the server
(e.g. from a Django Celery job).
Args:
update: The update to apply to the document.
"""
...

@abstractmethod
async def load_snapshot(self) -> Optional[bytes]:
"""Gets the document encoded as update from the database. Override this method to
implement a persistent storage.
Defaults to None.
Returns:
The latest document snapshot.
"""
...

@abstractmethod
async def save_snapshot(self) -> None:
"""Saves the document encoded as update to the database."""
...

async def close(self) -> None:
"""Closes the storage connection.

Useful for cleaning up resources like closing a database
connection or saving the document before exiting.
"""
pass
110 changes: 110 additions & 0 deletions pycrdt_websocket/django_channels/storage/redis_yroom_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import time
from typing import Optional

import redis.asyncio as redis
from pycrdt import Doc

from .base_yroom_storage import BaseYRoomStorage


class RedisYRoomStorage(BaseYRoomStorage):
"""A YRoom storage that uses Redis as main storage, without
persistent storage.
Args:
room_name: The name of the room.
"""

def __init__(
self,
room_name: str,
save_throttle_interval: int | None = None,
redis_expiration_seconds: int | None = 60 * 10, # 10 minutes,
):
super().__init__(room_name)

self.save_throttle_interval = save_throttle_interval
self.last_saved_at = time.time()

self.redis_key = f"document:{self.room_name}"
self.redis = self.make_redis()
self.redis_expiration_seconds = redis_expiration_seconds

async def get_document(self) -> Doc:
snapshot = await self.redis.get(self.redis_key)

if not snapshot:
snapshot = await self.load_snapshot()

document = Doc()

if snapshot:
document.apply_update(snapshot)

return document

async def update_document(self, update: bytes):
await self.redis.watch(self.redis_key)

try:
current_document = await self.get_document()
updated_snapshot = self._apply_update_to_document(current_document, update)

async with self.redis.pipeline() as pipe:
while True:
try:
pipe.multi()
pipe.set(
name=self.redis_key,
value=updated_snapshot,
ex=self.redis_expiration_seconds,
)

await pipe.execute()

break
except redis.WatchError:
current_document = await self.get_document()
updated_snapshot = self._apply_update_to_document(
current_document,
update,
)

continue
finally:
await self.redis.unwatch()

await self.throttled_save_snapshot()

async def load_snapshot(self) -> Optional[bytes]:
return None

async def save_snapshot(self) -> None:
return None

async def throttled_save_snapshot(self) -> None:
"""Saves the document encoded as update to the database, throttled."""

if (
not self.save_throttle_interval
or time.time() - self.last_saved_at <= self.save_throttle_interval
):
return

await self.save_snapshot()

self.last_saved_at = time.time()

def make_redis(self):
"""Makes a Redis client.
Defaults to a local client"""

return redis.Redis(host="localhost", port=6379, db=0)

async def close(self):
await self.save_snapshot()
await self.redis.close()

def _apply_update_to_document(self, document: Doc, update: bytes) -> bytes:
document.apply_update(update)

return document.get_update()
Loading
Loading