diff --git a/tests/conftest.py b/tests/conftest.py index 452c42a..d4eaadb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,11 +1,28 @@ import subprocess import pytest +import y_py as Y from websockets import serve # type: ignore from ypy_websocket import WebsocketServer +class TestYDoc: + def __init__(self): + self.ydoc = Y.YDoc() + self.array = self.ydoc.get_array("array") + self.state = None + self.value = 0 + + def update(self): + with self.ydoc.begin_transaction() as txn: + self.array.append(txn, self.value) + self.value += 1 + update = Y.encode_state_as_update(self.ydoc, self.state) + self.state = Y.encode_state_vector(self.ydoc) + return update + + @pytest.fixture async def yws_server(request): try: @@ -23,3 +40,8 @@ def yjs_client(request): p = subprocess.Popen(["node", f"tests/yjs_client_{client_id}.js"]) yield p p.kill() + + +@pytest.fixture +def test_ydoc(): + return TestYDoc() diff --git a/tests/test_ystore.py b/tests/test_ystore.py index da1d54c..bdd208f 100644 --- a/tests/test_ystore.py +++ b/tests/test_ystore.py @@ -59,29 +59,28 @@ async def test_ystore(YStore): @pytest.mark.asyncio -async def test_document_ttl_sqlite_ystore(): +async def test_document_ttl_sqlite_ystore(test_ydoc): store_name = "my_store" ystore = MySQLiteYStore(store_name, metadata_callback=MetadataCallback(), delete_db=True) - - await ystore.write(b"a") - async with aiosqlite.connect(ystore.db_path) as db: - assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 1 - now = time.time() - # assert that adding a record before document TTL doesn't delete document history - with patch("time.time") as mock_time: - mock_time.return_value = now - await ystore.write(b"b") - async with aiosqlite.connect(ystore.db_path) as db: - assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 2 + for i in range(3): + # assert that adding a record before document TTL doesn't delete document history + with patch("time.time") as mock_time: + mock_time.return_value = now + await ystore.write(test_ydoc.update()) + async with aiosqlite.connect(ystore.db_path) as db: + assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[ + 0 + ] == i + 1 # assert that adding a record after document TTL deletes previous document history with patch("time.time") as mock_time: - mock_time.return_value = now + ystore.document_ttl + 1000 - await ystore.write(b"c") + mock_time.return_value = now + ystore.document_ttl + 1 + await ystore.write(test_ydoc.update()) async with aiosqlite.connect(ystore.db_path) as db: - assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 1 + # two updates in DB: one squashed update and the new update + assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 2 @pytest.mark.asyncio diff --git a/ypy_websocket/ystore.py b/ypy_websocket/ystore.py index e9fa5b2..793bd8e 100644 --- a/ypy_websocket/ystore.py +++ b/ypy_websocket/ystore.py @@ -224,7 +224,6 @@ async def read(self) -> AsyncIterator[Tuple[bytes, bytes]]: # type: ignore raise YDocNotFound async def write(self, data: bytes) -> None: - metadata = await self.get_metadata() await self.db_initialized async with aiosqlite.connect(self.db_path) as db: # first, determine time elapsed since last update @@ -235,11 +234,26 @@ async def write(self, data: bytes) -> None: row = await cursor.fetchone() diff = (time.time() - row[0]) if row else 0 - # if diff > document_ttl, delete document history if diff > self.document_ttl: + # squash updates + ydoc = Y.YDoc() + async with db.execute( + "SELECT yupdate FROM yupdates WHERE path = ?", (self.path,) + ) as cursor: + async for update, in cursor: + Y.apply_update(ydoc, update) + # delete history await db.execute("DELETE FROM yupdates WHERE path = ?", (self.path,)) + # insert squashed updates + squashed_update = Y.encode_state_as_update(ydoc) + metadata = await self.get_metadata() + await db.execute( + "INSERT INTO yupdates VALUES (?, ?, ?, ?)", + (self.path, squashed_update, metadata, time.time()), + ) # finally, write this update to the DB + metadata = await self.get_metadata() await db.execute( "INSERT INTO yupdates VALUES (?, ?, ?, ?)", (self.path, data, metadata, time.time()),