From 6960518894105d6b9e23d8ef58599b56bf38022c Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 6 Dec 2022 13:03:54 +0100 Subject: [PATCH] Allow to set SQLiteYStore's database path and document time-to-live (#66) * Allow to set SQLiteYStore's database path and document time-to-live * No need for sqlite_ystore_factory() * make configurable traits local to JupyterSQLiteYStore (#1) * make configurable traits local to JupyterSQLiteYStore * Lint * Set YStore config on class before instantiation Co-authored-by: David Brochart Co-authored-by: david qiu --- jupyter_server_ydoc/app.py | 8 ++-- jupyter_server_ydoc/handlers.py | 85 ++++++++++++++++++++++----------- pyproject.toml | 2 +- 3 files changed, 62 insertions(+), 33 deletions(-) diff --git a/jupyter_server_ydoc/app.py b/jupyter_server_ydoc/app.py index 13b3c991..ea43084e 100644 --- a/jupyter_server_ydoc/app.py +++ b/jupyter_server_ydoc/app.py @@ -9,7 +9,7 @@ from traitlets import Float, Int, Type from ypy_websocket.ystore import BaseYStore # type: ignore -from .handlers import JupyterSQLiteYStore, YDocRoomIdHandler, YDocWebSocketHandler +from .handlers import SQLiteYStore, YDocRoomIdHandler, YDocWebSocketHandler class YDocExtension(ExtensionApp): @@ -41,12 +41,12 @@ class YDocExtension(ExtensionApp): ) ystore_class = Type( - default_value=JupyterSQLiteYStore, + default_value=SQLiteYStore, klass=BaseYStore, config=True, - help="""The YStore class to use for storing Y updates. Defaults to JupyterSQLiteYStore, + help="""The YStore class to use for storing Y updates. Defaults to an SQLiteYStore, which stores Y updates in a '.jupyter_ystore.db' SQLite database in the current - directory, and clears history every 24 hours.""", + directory.""", ) def initialize_settings(self): diff --git a/jupyter_server_ydoc/handlers.py b/jupyter_server_ydoc/handlers.py index c1f3c971..517b983a 100644 --- a/jupyter_server_ydoc/handlers.py +++ b/jupyter_server_ydoc/handlers.py @@ -13,31 +13,46 @@ from jupyter_ydoc import ydocs as YDOCS # type: ignore from tornado import web from tornado.websocket import WebSocketHandler -from ypy_websocket import WebsocketServer, YMessageType, YRoom # type: ignore -from ypy_websocket.ystore import ( # type: ignore - BaseYStore, - SQLiteYStore, - TempFileYStore, - YDocNotFound, -) +from traitlets import Int, Unicode +from traitlets.config import LoggingConfigurable +from ypy_websocket.websocket_server import WebsocketServer, YRoom # type: ignore +from ypy_websocket.ystore import BaseYStore # type: ignore +from ypy_websocket.ystore import SQLiteYStore as _SQLiteYStore +from ypy_websocket.ystore import TempFileYStore as _TempFileYStore +from ypy_websocket.ystore import YDocNotFound +from ypy_websocket.yutils import YMessageType # type: ignore YFILE = YDOCS["file"] -class JupyterTempFileYStore(TempFileYStore): +class TempFileYStore(_TempFileYStore): prefix_dir = "jupyter_ystore_" -class JupyterSQLiteYStore(SQLiteYStore): - db_path = ".jupyter_ystore.db" - document_ttl = 24 * 60 * 60 +class SQLiteYStoreMetaclass(type(LoggingConfigurable), type(_SQLiteYStore)): # type: ignore + pass + + +class SQLiteYStore(LoggingConfigurable, _SQLiteYStore, metaclass=SQLiteYStoreMetaclass): + db_path = Unicode( + ".jupyter_ystore.db", + config=True, + help="""The path to the YStore database. Defaults to '.jupyter_ystore.db' in the current + directory.""", + ) + + document_ttl = Int( + None, + allow_none=True, + config=True, + help="""The document time-to-live in seconds. Defaults to None (document history is never + cleared).""", + ) class DocumentRoom(YRoom): """A Y room for a possibly stored document (e.g. a notebook).""" - is_transient = False - def __init__(self, type: str, ystore: BaseYStore, log: Optional[Logger]): super().__init__(ready=False, ystore=ystore, log=log) self.type = type @@ -49,8 +64,6 @@ def __init__(self, type: str, ystore: BaseYStore, log: Optional[Logger]): class TransientRoom(YRoom): """A Y room for sharing state (e.g. awareness).""" - is_transient = True - def __init__(self, log: Optional[Logger]): super().__init__(log=log) @@ -132,6 +145,7 @@ async def __anext__(self): def get_file_info(self) -> Tuple[str, str, str]: assert self.websocket_server is not None + assert isinstance(self.room, DocumentRoom) room_name = self.websocket_server.get_room_name(self.room) file_format: str file_type: str @@ -144,7 +158,9 @@ def get_file_info(self) -> Tuple[str, str, str]: assert file_path is not None if file_path != self.room.document.path: self.log.debug( - "File with ID %s was moved from %s to %s", self.room.document.path, file_path + "File with ID %s was moved from %s to %s", + self.room.document.path, + file_path, ) self.room.document.path = file_path return file_format, file_type, file_path @@ -163,8 +179,13 @@ async def get(self, *args, **kwargs): async def open(self, path): ystore_class = self.settings["collaborative_ystore_class"] if self.websocket_server is None: + for k, v in self.config.get(ystore_class.__name__, {}).items(): + setattr(ystore_class, k, v) YDocWebSocketHandler.websocket_server = JupyterWebsocketServer( - rooms_ready=False, auto_clean_rooms=False, ystore_class=ystore_class, log=self.log + rooms_ready=False, + auto_clean_rooms=False, + ystore_class=ystore_class, + log=self.log, ) self._message_queue = asyncio.Queue() self.lock = asyncio.Lock() @@ -175,10 +196,10 @@ async def open(self, path): asyncio.create_task(self.websocket_server.serve(self)) # cancel the deletion of the room if it was scheduled - if not self.room.is_transient and self.room.cleaner is not None: + if isinstance(self.room, DocumentRoom) and self.room.cleaner is not None: self.room.cleaner.cancel() - if not self.room.is_transient and not self.room.ready: + if isinstance(self.room, DocumentRoom) and not self.room.ready: file_format, file_type, file_path = self.get_file_info() self.log.debug("Opening Y document from disk: %s", file_path) model = await ensure_async( @@ -188,19 +209,22 @@ async def open(self, path): # check again if ready, because loading the file can be async if not self.room.ready: # try to apply Y updates from the YStore for this document - try: - await self.room.ystore.apply_updates(self.room.ydoc) - read_from_source = False - except YDocNotFound: - # YDoc not found in the YStore, create the document from the source file (no change history) - read_from_source = True + read_from_source = True + if self.room.ystore is not None: + try: + await self.room.ystore.apply_updates(self.room.ydoc) + read_from_source = False + except YDocNotFound: + # YDoc not found in the YStore, create the document from the source file (no change history) + pass if not read_from_source: # if YStore updates and source file are out-of-sync, resync updates with source if self.room.document.source != model["content"]: read_from_source = True if read_from_source: self.room.document.source = model["content"] - await self.room.ystore.encode_state_as_update(self.room.ydoc) + if self.room.ystore: + await self.room.ystore.encode_state_as_update(self.room.ydoc) self.room.document.dirty = False self.room.ready = True self.room.watcher = asyncio.create_task(self.watch_file()) @@ -208,6 +232,7 @@ async def open(self, path): self.room.document.observe(self.on_document_change) async def watch_file(self): + assert isinstance(self.room, DocumentRoom) poll_interval = self.settings["collaborative_file_poll_interval"] if not poll_interval: self.room.watcher = None @@ -217,6 +242,7 @@ async def watch_file(self): await self.maybe_load_document() async def maybe_load_document(self): + assert isinstance(self.room, DocumentRoom) file_format, file_type, file_path = self.get_file_info() async with self.lock: model = await ensure_async( @@ -267,7 +293,8 @@ def on_message(self, message): # filter out message depending on changes if skip: self.log.debug( - "Filtered out Y message of type: %s", YMessageType(message_type).raw_str() + "Filtered out Y message of type: %s", + YMessageType(message_type).name, ) return skip self._message_queue.put_nowait(message) @@ -276,12 +303,13 @@ def on_message(self, message): def on_close(self) -> None: # stop serving this client self._message_queue.put_nowait(b"") - if not self.room.is_transient and self.room.clients == [self]: + if isinstance(self.room, DocumentRoom) and self.room.clients == [self]: # no client in this room after we disconnect # keep the document for a while in case someone reconnects self.room.cleaner = asyncio.create_task(self.clean_room()) async def clean_room(self) -> None: + assert isinstance(self.room, DocumentRoom) seconds = self.settings["collaborative_document_cleanup_delay"] if seconds is None: return @@ -309,6 +337,7 @@ def on_document_change(self, event): self.saving_document = asyncio.create_task(self.maybe_save_document()) async def maybe_save_document(self): + assert isinstance(self.room, DocumentRoom) seconds = self.settings["collaborative_document_save_delay"] if seconds is None: return diff --git a/pyproject.toml b/pyproject.toml index f75a38e0..3e1e9e51 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,7 @@ classifiers = [ requires-python = ">=3.7" dependencies = [ "jupyter_ydoc>=0.2.0,<0.4.0", - "ypy-websocket>=0.8.0,<0.9.0", + "ypy-websocket>=0.8.2,<0.9.0", "jupyter_server_fileid >=0.6.0,<1" ]