Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add squashed update after deleting document history #53

Merged
merged 1 commit into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,28 @@
import subprocess

import pytest
import y_py as Y
from websockets import serve # type: ignore

from ypy_websocket import WebsocketServer


class TestYDoc:
def __init__(self):
self.ydoc = Y.YDoc()
self.array = self.ydoc.get_array("array")
self.state = None
self.value = 0

def update(self):
with self.ydoc.begin_transaction() as txn:
self.array.append(txn, self.value)
self.value += 1
update = Y.encode_state_as_update(self.ydoc, self.state)
self.state = Y.encode_state_vector(self.ydoc)
return update


@pytest.fixture
async def yws_server(request):
try:
Expand All @@ -23,3 +40,8 @@ def yjs_client(request):
p = subprocess.Popen(["node", f"tests/yjs_client_{client_id}.js"])
yield p
p.kill()


@pytest.fixture
def test_ydoc():
return TestYDoc()
29 changes: 14 additions & 15 deletions tests/test_ystore.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,29 +59,28 @@ async def test_ystore(YStore):


@pytest.mark.asyncio
async def test_document_ttl_sqlite_ystore():
async def test_document_ttl_sqlite_ystore(test_ydoc):
store_name = "my_store"
ystore = MySQLiteYStore(store_name, metadata_callback=MetadataCallback(), delete_db=True)

await ystore.write(b"a")
async with aiosqlite.connect(ystore.db_path) as db:
assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 1

now = time.time()

# assert that adding a record before document TTL doesn't delete document history
with patch("time.time") as mock_time:
mock_time.return_value = now
await ystore.write(b"b")
async with aiosqlite.connect(ystore.db_path) as db:
assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 2
for i in range(3):
# assert that adding a record before document TTL doesn't delete document history
with patch("time.time") as mock_time:
mock_time.return_value = now
await ystore.write(test_ydoc.update())
async with aiosqlite.connect(ystore.db_path) as db:
assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[
0
] == i + 1

# assert that adding a record after document TTL deletes previous document history
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# assert that adding a record after document TTL deletes previous document history
# assert that adding a record after document TTL squashes previous document history

with patch("time.time") as mock_time:
mock_time.return_value = now + ystore.document_ttl + 1000
await ystore.write(b"c")
mock_time.return_value = now + ystore.document_ttl + 1
await ystore.write(test_ydoc.update())
async with aiosqlite.connect(ystore.db_path) as db:
assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 1
# two updates in DB: one squashed update and the new update
assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 2


@pytest.mark.asyncio
Expand Down
18 changes: 16 additions & 2 deletions ypy_websocket/ystore.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ async def read(self) -> AsyncIterator[Tuple[bytes, bytes]]: # type: ignore
raise YDocNotFound

async def write(self, data: bytes) -> None:
metadata = await self.get_metadata()
await self.db_initialized
async with aiosqlite.connect(self.db_path) as db:
# first, determine time elapsed since last update
Expand All @@ -235,11 +234,26 @@ async def write(self, data: bytes) -> None:
row = await cursor.fetchone()
diff = (time.time() - row[0]) if row else 0

# if diff > document_ttl, delete document history
if diff > self.document_ttl:
# squash updates
ydoc = Y.YDoc()
async with db.execute(
"SELECT yupdate FROM yupdates WHERE path = ?", (self.path,)
) as cursor:
async for update, in cursor:
Y.apply_update(ydoc, update)
# delete history
await db.execute("DELETE FROM yupdates WHERE path = ?", (self.path,))
# insert squashed updates
squashed_update = Y.encode_state_as_update(ydoc)
metadata = await self.get_metadata()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we lift this definition out of the if block so it's not evaluated again on line 256?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, as the metadata can depend on time, for instance.

await db.execute(
"INSERT INTO yupdates VALUES (?, ?, ?, ?)",
(self.path, squashed_update, metadata, time.time()),
)

# finally, write this update to the DB
metadata = await self.get_metadata()
await db.execute(
"INSERT INTO yupdates VALUES (?, ?, ?, ?)",
(self.path, data, metadata, time.time()),
Expand Down