Skip to content

Commit

Permalink
Allow to set SQLiteYStore's database path and document time-to-live (j…
Browse files Browse the repository at this point in the history
…upyterlab#66)

* Allow to set SQLiteYStore's database path and document time-to-live

* No need for sqlite_ystore_factory()

* make configurable traits local to JupyterSQLiteYStore (#1)

* make configurable traits local to JupyterSQLiteYStore

* Lint

* Set YStore config on class before instantiation

Co-authored-by: David Brochart <david.brochart@gmail.com>

Co-authored-by: david qiu <david@qiu.dev>
  • Loading branch information
2 people authored and hbcarlos committed Jan 29, 2023
1 parent c5d4156 commit 6960518
Showing 3 changed files with 62 additions and 33 deletions.
8 changes: 4 additions & 4 deletions jupyter_server_ydoc/app.py
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@
from traitlets import Float, Int, Type
from ypy_websocket.ystore import BaseYStore # type: ignore

from .handlers import JupyterSQLiteYStore, YDocRoomIdHandler, YDocWebSocketHandler
from .handlers import SQLiteYStore, YDocRoomIdHandler, YDocWebSocketHandler


class YDocExtension(ExtensionApp):
@@ -41,12 +41,12 @@ class YDocExtension(ExtensionApp):
)

ystore_class = Type(
default_value=JupyterSQLiteYStore,
default_value=SQLiteYStore,
klass=BaseYStore,
config=True,
help="""The YStore class to use for storing Y updates. Defaults to JupyterSQLiteYStore,
help="""The YStore class to use for storing Y updates. Defaults to an SQLiteYStore,
which stores Y updates in a '.jupyter_ystore.db' SQLite database in the current
directory, and clears history every 24 hours.""",
directory.""",
)

def initialize_settings(self):
85 changes: 57 additions & 28 deletions jupyter_server_ydoc/handlers.py
Original file line number Diff line number Diff line change
@@ -13,31 +13,46 @@
from jupyter_ydoc import ydocs as YDOCS # type: ignore
from tornado import web
from tornado.websocket import WebSocketHandler
from ypy_websocket import WebsocketServer, YMessageType, YRoom # type: ignore
from ypy_websocket.ystore import ( # type: ignore
BaseYStore,
SQLiteYStore,
TempFileYStore,
YDocNotFound,
)
from traitlets import Int, Unicode
from traitlets.config import LoggingConfigurable
from ypy_websocket.websocket_server import WebsocketServer, YRoom # type: ignore
from ypy_websocket.ystore import BaseYStore # type: ignore
from ypy_websocket.ystore import SQLiteYStore as _SQLiteYStore
from ypy_websocket.ystore import TempFileYStore as _TempFileYStore
from ypy_websocket.ystore import YDocNotFound
from ypy_websocket.yutils import YMessageType # type: ignore

YFILE = YDOCS["file"]


class JupyterTempFileYStore(TempFileYStore):
class TempFileYStore(_TempFileYStore):
prefix_dir = "jupyter_ystore_"


class JupyterSQLiteYStore(SQLiteYStore):
db_path = ".jupyter_ystore.db"
document_ttl = 24 * 60 * 60
class SQLiteYStoreMetaclass(type(LoggingConfigurable), type(_SQLiteYStore)): # type: ignore
pass


class SQLiteYStore(LoggingConfigurable, _SQLiteYStore, metaclass=SQLiteYStoreMetaclass):
db_path = Unicode(
".jupyter_ystore.db",
config=True,
help="""The path to the YStore database. Defaults to '.jupyter_ystore.db' in the current
directory.""",
)

document_ttl = Int(
None,
allow_none=True,
config=True,
help="""The document time-to-live in seconds. Defaults to None (document history is never
cleared).""",
)


class DocumentRoom(YRoom):
"""A Y room for a possibly stored document (e.g. a notebook)."""

is_transient = False

def __init__(self, type: str, ystore: BaseYStore, log: Optional[Logger]):
super().__init__(ready=False, ystore=ystore, log=log)
self.type = type
@@ -49,8 +64,6 @@ def __init__(self, type: str, ystore: BaseYStore, log: Optional[Logger]):
class TransientRoom(YRoom):
"""A Y room for sharing state (e.g. awareness)."""

is_transient = True

def __init__(self, log: Optional[Logger]):
super().__init__(log=log)

@@ -132,6 +145,7 @@ async def __anext__(self):

def get_file_info(self) -> Tuple[str, str, str]:
assert self.websocket_server is not None
assert isinstance(self.room, DocumentRoom)
room_name = self.websocket_server.get_room_name(self.room)
file_format: str
file_type: str
@@ -144,7 +158,9 @@ def get_file_info(self) -> Tuple[str, str, str]:
assert file_path is not None
if file_path != self.room.document.path:
self.log.debug(
"File with ID %s was moved from %s to %s", self.room.document.path, file_path
"File with ID %s was moved from %s to %s",
self.room.document.path,
file_path,
)
self.room.document.path = file_path
return file_format, file_type, file_path
@@ -163,8 +179,13 @@ async def get(self, *args, **kwargs):
async def open(self, path):
ystore_class = self.settings["collaborative_ystore_class"]
if self.websocket_server is None:
for k, v in self.config.get(ystore_class.__name__, {}).items():
setattr(ystore_class, k, v)
YDocWebSocketHandler.websocket_server = JupyterWebsocketServer(
rooms_ready=False, auto_clean_rooms=False, ystore_class=ystore_class, log=self.log
rooms_ready=False,
auto_clean_rooms=False,
ystore_class=ystore_class,
log=self.log,
)
self._message_queue = asyncio.Queue()
self.lock = asyncio.Lock()
@@ -175,10 +196,10 @@ async def open(self, path):
asyncio.create_task(self.websocket_server.serve(self))

# cancel the deletion of the room if it was scheduled
if not self.room.is_transient and self.room.cleaner is not None:
if isinstance(self.room, DocumentRoom) and self.room.cleaner is not None:
self.room.cleaner.cancel()

if not self.room.is_transient and not self.room.ready:
if isinstance(self.room, DocumentRoom) and not self.room.ready:
file_format, file_type, file_path = self.get_file_info()
self.log.debug("Opening Y document from disk: %s", file_path)
model = await ensure_async(
@@ -188,26 +209,30 @@ async def open(self, path):
# check again if ready, because loading the file can be async
if not self.room.ready:
# try to apply Y updates from the YStore for this document
try:
await self.room.ystore.apply_updates(self.room.ydoc)
read_from_source = False
except YDocNotFound:
# YDoc not found in the YStore, create the document from the source file (no change history)
read_from_source = True
read_from_source = True
if self.room.ystore is not None:
try:
await self.room.ystore.apply_updates(self.room.ydoc)
read_from_source = False
except YDocNotFound:
# YDoc not found in the YStore, create the document from the source file (no change history)
pass
if not read_from_source:
# if YStore updates and source file are out-of-sync, resync updates with source
if self.room.document.source != model["content"]:
read_from_source = True
if read_from_source:
self.room.document.source = model["content"]
await self.room.ystore.encode_state_as_update(self.room.ydoc)
if self.room.ystore:
await self.room.ystore.encode_state_as_update(self.room.ydoc)
self.room.document.dirty = False
self.room.ready = True
self.room.watcher = asyncio.create_task(self.watch_file())
# save the document when changed
self.room.document.observe(self.on_document_change)

async def watch_file(self):
assert isinstance(self.room, DocumentRoom)
poll_interval = self.settings["collaborative_file_poll_interval"]
if not poll_interval:
self.room.watcher = None
@@ -217,6 +242,7 @@ async def watch_file(self):
await self.maybe_load_document()

async def maybe_load_document(self):
assert isinstance(self.room, DocumentRoom)
file_format, file_type, file_path = self.get_file_info()
async with self.lock:
model = await ensure_async(
@@ -267,7 +293,8 @@ def on_message(self, message):
# filter out message depending on changes
if skip:
self.log.debug(
"Filtered out Y message of type: %s", YMessageType(message_type).raw_str()
"Filtered out Y message of type: %s",
YMessageType(message_type).name,
)
return skip
self._message_queue.put_nowait(message)
@@ -276,12 +303,13 @@ def on_message(self, message):
def on_close(self) -> None:
# stop serving this client
self._message_queue.put_nowait(b"")
if not self.room.is_transient and self.room.clients == [self]:
if isinstance(self.room, DocumentRoom) and self.room.clients == [self]:
# no client in this room after we disconnect
# keep the document for a while in case someone reconnects
self.room.cleaner = asyncio.create_task(self.clean_room())

async def clean_room(self) -> None:
assert isinstance(self.room, DocumentRoom)
seconds = self.settings["collaborative_document_cleanup_delay"]
if seconds is None:
return
@@ -309,6 +337,7 @@ def on_document_change(self, event):
self.saving_document = asyncio.create_task(self.maybe_save_document())

async def maybe_save_document(self):
assert isinstance(self.room, DocumentRoom)
seconds = self.settings["collaborative_document_save_delay"]
if seconds is None:
return
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ classifiers = [
requires-python = ">=3.7"
dependencies = [
"jupyter_ydoc>=0.2.0,<0.4.0",
"ypy-websocket>=0.8.0,<0.9.0",
"ypy-websocket>=0.8.2,<0.9.0",
"jupyter_server_fileid >=0.6.0,<1"
]

0 comments on commit 6960518

Please sign in to comment.