From ad1d2708c129e5efddc0e82c7f2cbac3a3ce15d7 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Tue, 22 Nov 2022 01:34:27 +0000 Subject: [PATCH 1/6] add document TTL for sqlite ystore --- tests/conftest.py | 6 ++++++ tests/test_ystore.py | 32 ++++++++++++++++++++++++++++++++ ypy_websocket/ystore.py | 26 +++++++++++++++++++++++--- 3 files changed, 61 insertions(+), 3 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 452c42a..946900e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,6 @@ import subprocess +import tempfile +from pathlib import Path import pytest from websockets import serve # type: ignore @@ -23,3 +25,7 @@ def yjs_client(request): p = subprocess.Popen(["node", f"tests/yjs_client_{client_id}.js"]) yield p p.kill() + +@pytest.fixture +def yjs_sqlite_db_path(): + return str(Path(tempfile.mkdtemp(prefix="test_sql_")) / "ystore.db") diff --git a/tests/test_ystore.py b/tests/test_ystore.py index 858a570..66f9441 100644 --- a/tests/test_ystore.py +++ b/tests/test_ystore.py @@ -1,6 +1,10 @@ import asyncio +import aiosqlite import tempfile from pathlib import Path +from unittest.mock import patch +import time +import os import pytest @@ -25,6 +29,9 @@ class MyTempFileYStore(TempFileYStore): class MySQLiteYStore(SQLiteYStore): db_path = str(Path(tempfile.mkdtemp(prefix="test_sql_")) / "ystore.db") + def __del__(self): + os.remove(self.db_path) + @pytest.mark.asyncio @pytest.mark.parametrize("YStore", (MyTempFileYStore, MySQLiteYStore)) @@ -44,3 +51,28 @@ async def test_file_ystore(YStore): assert d == data[i] # data assert m == bytes(i) # metadata i += 1 + +@pytest.mark.asyncio +async def test_document_ttl_sqlite_ystore(): + store_name = "my_store" + ystore = MySQLiteYStore(store_name, metadata_callback=MetadataCallback()) + + await ystore.write(b"a") + async with aiosqlite.connect(ystore.db_path) as db: + assert (await (await db.execute('SELECT count(*) FROM yupdates')).fetchone())[0] == 1 + + now = time.time() + + # assert that adding a record before document TTL doesn't delete document history + with patch("time.time") as mock_time: + mock_time.return_value = now + await ystore.write(b"b") + async with aiosqlite.connect(ystore.db_path) as db: + assert (await (await db.execute('SELECT count(*) FROM yupdates')).fetchone())[0] == 2 + + # assert that adding a record after document TTL deletes previous document history + with patch("time.time") as mock_time: + mock_time.return_value = now + ystore.document_ttl + 1000 + await ystore.write(b"c") + async with aiosqlite.connect(ystore.db_path) as db: + assert (await (await db.execute('SELECT count(*) FROM yupdates')).fetchone())[0] == 1 \ No newline at end of file diff --git a/ypy_websocket/ystore.py b/ypy_websocket/ystore.py index 0c6d3e8..96a2b8e 100644 --- a/ypy_websocket/ystore.py +++ b/ypy_websocket/ystore.py @@ -1,7 +1,7 @@ import asyncio import tempfile +import time from abc import ABC, abstractmethod -from datetime import datetime from pathlib import Path from typing import AsyncIterator, Callable, Optional, Tuple @@ -126,6 +126,10 @@ class MySQLiteYStore(SQLiteYStore): """ db_path: str = "ystore.db" + # Determines the "time to live" for all documents, i.e. how recent the + # latest update of a document must be before purging document history. + # Defaults to 1 day. + document_ttl: int = 24 * 60 * 60 path: str db_created: asyncio.Event @@ -138,7 +142,10 @@ def __init__(self, path: str, metadata_callback: Optional[Callable] = None): async def create_db(self): async with aiosqlite.connect(self.db_path) as db: await db.execute( - "CREATE TABLE IF NOT EXISTS yupdates (path TEXT, yupdate BLOB, metadata BLOB, timestamp TEXT)" + "CREATE TABLE IF NOT EXISTS yupdates (path TEXT NOT NULL, yupdate BLOB, metadata BLOB, timestamp REAL NOT NULL)" + ) + await db.execute( + "CREATE INDEX IF NOT EXISTS idx_yupdates_path_timestamp ON yupdates (path, timestamp)" ) await db.commit() self.db_created.set() @@ -163,8 +170,21 @@ async def write(self, data: bytes) -> None: await self.db_created.wait() metadata = await self.get_metadata() async with aiosqlite.connect(self.db_path) as db: + # first, determine time elapsed since last update + cursor = await db.execute( + "SELECT timestamp FROM yupdates WHERE path = ? ORDER BY timestamp DESC LIMIT 1", + (self.path,) + ) + row = await cursor.fetchone() + diff = (time.time() - row[0]) if row else 0 + + # if diff > document_ttl, delete document history + if diff > self.document_ttl: + await db.execute("DELETE FROM yupdates WHERE path = ?", (self.path,)) + + # finally, write this update to the DB await db.execute( "INSERT INTO yupdates VALUES (?, ?, ?, ?)", - (self.path, data, metadata, datetime.utcnow()), + (self.path, data, metadata, time.time()), ) await db.commit() From 406023f1a42e608b0b50ca3a5a5f27337865dbb3 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Tue, 22 Nov 2022 01:42:38 +0000 Subject: [PATCH 2/6] add test JS dependencies to gitignore --- .gitignore | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.gitignore b/.gitignore index b6e4761..004a068 100644 --- a/.gitignore +++ b/.gitignore @@ -127,3 +127,8 @@ dmypy.json # Pyre type checker .pyre/ + +# test JS dependencies +tests/node_modules +tests/package-lock.json +tests/yarn.lock From ee3ba41958758c2ea42f2d184bc8d774463dc601 Mon Sep 17 00:00:00 2001 From: david qiu Date: Tue, 22 Nov 2022 10:24:15 -0800 Subject: [PATCH 3/6] Update .gitignore Co-authored-by: David Brochart --- .gitignore | 1 - 1 file changed, 1 deletion(-) diff --git a/.gitignore b/.gitignore index 004a068..051b90c 100644 --- a/.gitignore +++ b/.gitignore @@ -131,4 +131,3 @@ dmypy.json # test JS dependencies tests/node_modules tests/package-lock.json -tests/yarn.lock From 5f1268d23ec36ff5a73b76b1342bcb8bc05e3caf Mon Sep 17 00:00:00 2001 From: david qiu Date: Tue, 22 Nov 2022 10:24:23 -0800 Subject: [PATCH 4/6] Update tests/conftest.py Co-authored-by: David Brochart --- tests/conftest.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 946900e..b925fdb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -25,7 +25,3 @@ def yjs_client(request): p = subprocess.Popen(["node", f"tests/yjs_client_{client_id}.js"]) yield p p.kill() - -@pytest.fixture -def yjs_sqlite_db_path(): - return str(Path(tempfile.mkdtemp(prefix="test_sql_")) / "ystore.db") From 645b46c2e518671a47c73ff28d5911f7889c5f1e Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Tue, 22 Nov 2022 19:54:54 +0000 Subject: [PATCH 5/6] fix lint --- tests/test_ystore.py | 13 +++++++------ ypy_websocket/ystore.py | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/tests/test_ystore.py b/tests/test_ystore.py index 66f9441..ecbd1a4 100644 --- a/tests/test_ystore.py +++ b/tests/test_ystore.py @@ -1,11 +1,11 @@ import asyncio -import aiosqlite +import os import tempfile +import time from pathlib import Path from unittest.mock import patch -import time -import os +import aiosqlite import pytest from ypy_websocket.ystore import SQLiteYStore, TempFileYStore @@ -52,6 +52,7 @@ async def test_file_ystore(YStore): assert m == bytes(i) # metadata i += 1 + @pytest.mark.asyncio async def test_document_ttl_sqlite_ystore(): store_name = "my_store" @@ -59,7 +60,7 @@ async def test_document_ttl_sqlite_ystore(): await ystore.write(b"a") async with aiosqlite.connect(ystore.db_path) as db: - assert (await (await db.execute('SELECT count(*) FROM yupdates')).fetchone())[0] == 1 + assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 1 now = time.time() @@ -68,11 +69,11 @@ async def test_document_ttl_sqlite_ystore(): mock_time.return_value = now await ystore.write(b"b") async with aiosqlite.connect(ystore.db_path) as db: - assert (await (await db.execute('SELECT count(*) FROM yupdates')).fetchone())[0] == 2 + assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 2 # assert that adding a record after document TTL deletes previous document history with patch("time.time") as mock_time: mock_time.return_value = now + ystore.document_ttl + 1000 await ystore.write(b"c") async with aiosqlite.connect(ystore.db_path) as db: - assert (await (await db.execute('SELECT count(*) FROM yupdates')).fetchone())[0] == 1 \ No newline at end of file + assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 1 diff --git a/ypy_websocket/ystore.py b/ypy_websocket/ystore.py index 96a2b8e..910ad22 100644 --- a/ypy_websocket/ystore.py +++ b/ypy_websocket/ystore.py @@ -173,7 +173,7 @@ async def write(self, data: bytes) -> None: # first, determine time elapsed since last update cursor = await db.execute( "SELECT timestamp FROM yupdates WHERE path = ? ORDER BY timestamp DESC LIMIT 1", - (self.path,) + (self.path,), ) row = await cursor.fetchone() diff = (time.time() - row[0]) if row else 0 From d3b2cf2b9228ca46e8bac0adf36300fd998083cd Mon Sep 17 00:00:00 2001 From: david qiu Date: Tue, 22 Nov 2022 12:48:27 -0800 Subject: [PATCH 6/6] Update tests/conftest.py Co-authored-by: David Brochart --- tests/conftest.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index b925fdb..452c42a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,4 @@ import subprocess -import tempfile -from pathlib import Path import pytest from websockets import serve # type: ignore