Skip to content

Commit

Permalink
Add squashed update after deleting document history
Browse files Browse the repository at this point in the history
  • Loading branch information
davidbrochart committed Nov 23, 2022
1 parent 87e6186 commit c97b1fd
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 17 deletions.
22 changes: 22 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,28 @@
import subprocess

import pytest
import y_py as Y
from websockets import serve # type: ignore

from ypy_websocket import WebsocketServer


class TestYDoc:
def __init__(self):
self.ydoc = Y.YDoc()
self.array = self.ydoc.get_array("array")
self.state = None
self.value = 0

def update(self):
with self.ydoc.begin_transaction() as txn:
self.array.append(txn, self.value)
self.value += 1
update = Y.encode_state_as_update(self.ydoc, self.state)
self.state = Y.encode_state_vector(self.ydoc)
return update


@pytest.fixture
async def yws_server(request):
try:
Expand All @@ -23,3 +40,8 @@ def yjs_client(request):
p = subprocess.Popen(["node", f"tests/yjs_client_{client_id}.js"])
yield p
p.kill()


@pytest.fixture
def test_ydoc():
return TestYDoc()
29 changes: 14 additions & 15 deletions tests/test_ystore.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,29 +59,28 @@ async def test_ystore(YStore):


@pytest.mark.asyncio
async def test_document_ttl_sqlite_ystore():
async def test_document_ttl_sqlite_ystore(test_ydoc):
store_name = "my_store"
ystore = MySQLiteYStore(store_name, metadata_callback=MetadataCallback(), delete_db=True)

await ystore.write(b"a")
async with aiosqlite.connect(ystore.db_path) as db:
assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 1

now = time.time()

# assert that adding a record before document TTL doesn't delete document history
with patch("time.time") as mock_time:
mock_time.return_value = now
await ystore.write(b"b")
async with aiosqlite.connect(ystore.db_path) as db:
assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 2
for i in range(3):
# assert that adding a record before document TTL doesn't delete document history
with patch("time.time") as mock_time:
mock_time.return_value = now
await ystore.write(test_ydoc.update())
async with aiosqlite.connect(ystore.db_path) as db:
assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[
0
] == i + 1

# assert that adding a record after document TTL deletes previous document history
with patch("time.time") as mock_time:
mock_time.return_value = now + ystore.document_ttl + 1000
await ystore.write(b"c")
mock_time.return_value = now + ystore.document_ttl + 1
await ystore.write(test_ydoc.update())
async with aiosqlite.connect(ystore.db_path) as db:
assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 1
# two updates in DB: one squashed update and the new update
assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 2


@pytest.mark.asyncio
Expand Down
18 changes: 16 additions & 2 deletions ypy_websocket/ystore.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ async def read(self) -> AsyncIterator[Tuple[bytes, bytes]]: # type: ignore
raise YDocNotFound

async def write(self, data: bytes) -> None:
metadata = await self.get_metadata()
await self.db_initialized
async with aiosqlite.connect(self.db_path) as db:
# first, determine time elapsed since last update
Expand All @@ -235,11 +234,26 @@ async def write(self, data: bytes) -> None:
row = await cursor.fetchone()
diff = (time.time() - row[0]) if row else 0

# if diff > document_ttl, delete document history
if diff > self.document_ttl:
# squash updates
ydoc = Y.YDoc()
async with db.execute(
"SELECT yupdate FROM yupdates WHERE path = ?", (self.path,)
) as cursor:
async for update, in cursor:
Y.apply_update(ydoc, update)
# delete history
await db.execute("DELETE FROM yupdates WHERE path = ?", (self.path,))
# insert squashed updates
squashed_update = Y.encode_state_as_update(ydoc)
metadata = await self.get_metadata()
await db.execute(
"INSERT INTO yupdates VALUES (?, ?, ?, ?)",
(self.path, squashed_update, metadata, time.time()),
)

# finally, write this update to the DB
metadata = await self.get_metadata()
await db.execute(
"INSERT INTO yupdates VALUES (?, ?, ?, ?)",
(self.path, data, metadata, time.time()),
Expand Down

0 comments on commit c97b1fd

Please sign in to comment.