diff --git a/.gitignore b/.gitignore index b6e4761..051b90c 100644 --- a/.gitignore +++ b/.gitignore @@ -127,3 +127,7 @@ dmypy.json # Pyre type checker .pyre/ + +# test JS dependencies +tests/node_modules +tests/package-lock.json diff --git a/tests/test_ystore.py b/tests/test_ystore.py index 858a570..ecbd1a4 100644 --- a/tests/test_ystore.py +++ b/tests/test_ystore.py @@ -1,7 +1,11 @@ import asyncio +import os import tempfile +import time from pathlib import Path +from unittest.mock import patch +import aiosqlite import pytest from ypy_websocket.ystore import SQLiteYStore, TempFileYStore @@ -25,6 +29,9 @@ class MyTempFileYStore(TempFileYStore): class MySQLiteYStore(SQLiteYStore): db_path = str(Path(tempfile.mkdtemp(prefix="test_sql_")) / "ystore.db") + def __del__(self): + os.remove(self.db_path) + @pytest.mark.asyncio @pytest.mark.parametrize("YStore", (MyTempFileYStore, MySQLiteYStore)) @@ -44,3 +51,29 @@ async def test_file_ystore(YStore): assert d == data[i] # data assert m == bytes(i) # metadata i += 1 + + +@pytest.mark.asyncio +async def test_document_ttl_sqlite_ystore(): + store_name = "my_store" + ystore = MySQLiteYStore(store_name, metadata_callback=MetadataCallback()) + + await ystore.write(b"a") + async with aiosqlite.connect(ystore.db_path) as db: + assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 1 + + now = time.time() + + # assert that adding a record before document TTL doesn't delete document history + with patch("time.time") as mock_time: + mock_time.return_value = now + await ystore.write(b"b") + async with aiosqlite.connect(ystore.db_path) as db: + assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 2 + + # assert that adding a record after document TTL deletes previous document history + with patch("time.time") as mock_time: + mock_time.return_value = now + ystore.document_ttl + 1000 + await ystore.write(b"c") + async with aiosqlite.connect(ystore.db_path) as db: + assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 1 diff --git a/ypy_websocket/ystore.py b/ypy_websocket/ystore.py index 0c6d3e8..910ad22 100644 --- a/ypy_websocket/ystore.py +++ b/ypy_websocket/ystore.py @@ -1,7 +1,7 @@ import asyncio import tempfile +import time from abc import ABC, abstractmethod -from datetime import datetime from pathlib import Path from typing import AsyncIterator, Callable, Optional, Tuple @@ -126,6 +126,10 @@ class MySQLiteYStore(SQLiteYStore): """ db_path: str = "ystore.db" + # Determines the "time to live" for all documents, i.e. how recent the + # latest update of a document must be before purging document history. + # Defaults to 1 day. + document_ttl: int = 24 * 60 * 60 path: str db_created: asyncio.Event @@ -138,7 +142,10 @@ def __init__(self, path: str, metadata_callback: Optional[Callable] = None): async def create_db(self): async with aiosqlite.connect(self.db_path) as db: await db.execute( - "CREATE TABLE IF NOT EXISTS yupdates (path TEXT, yupdate BLOB, metadata BLOB, timestamp TEXT)" + "CREATE TABLE IF NOT EXISTS yupdates (path TEXT NOT NULL, yupdate BLOB, metadata BLOB, timestamp REAL NOT NULL)" + ) + await db.execute( + "CREATE INDEX IF NOT EXISTS idx_yupdates_path_timestamp ON yupdates (path, timestamp)" ) await db.commit() self.db_created.set() @@ -163,8 +170,21 @@ async def write(self, data: bytes) -> None: await self.db_created.wait() metadata = await self.get_metadata() async with aiosqlite.connect(self.db_path) as db: + # first, determine time elapsed since last update + cursor = await db.execute( + "SELECT timestamp FROM yupdates WHERE path = ? ORDER BY timestamp DESC LIMIT 1", + (self.path,), + ) + row = await cursor.fetchone() + diff = (time.time() - row[0]) if row else 0 + + # if diff > document_ttl, delete document history + if diff > self.document_ttl: + await db.execute("DELETE FROM yupdates WHERE path = ?", (self.path,)) + + # finally, write this update to the DB await db.execute( "INSERT INTO yupdates VALUES (?, ?, ?, ?)", - (self.path, data, metadata, datetime.utcnow()), + (self.path, data, metadata, time.time()), ) await db.commit()