Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add document TTL for SQLiteYStore #50

Merged
merged 6 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,7 @@ dmypy.json

# Pyre type checker
.pyre/

# test JS dependencies
tests/node_modules
tests/package-lock.json
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import subprocess
import tempfile
from pathlib import Path
dlqqq marked this conversation as resolved.
Show resolved Hide resolved

import pytest
from websockets import serve # type: ignore
Expand Down
33 changes: 33 additions & 0 deletions tests/test_ystore.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import asyncio
import os
import tempfile
import time
from pathlib import Path
from unittest.mock import patch

import aiosqlite
import pytest

from ypy_websocket.ystore import SQLiteYStore, TempFileYStore
Expand All @@ -25,6 +29,9 @@ class MyTempFileYStore(TempFileYStore):
class MySQLiteYStore(SQLiteYStore):
db_path = str(Path(tempfile.mkdtemp(prefix="test_sql_")) / "ystore.db")

def __del__(self):
os.remove(self.db_path)


@pytest.mark.asyncio
@pytest.mark.parametrize("YStore", (MyTempFileYStore, MySQLiteYStore))
Expand All @@ -44,3 +51,29 @@ async def test_file_ystore(YStore):
assert d == data[i] # data
assert m == bytes(i) # metadata
i += 1


@pytest.mark.asyncio
async def test_document_ttl_sqlite_ystore():
store_name = "my_store"
ystore = MySQLiteYStore(store_name, metadata_callback=MetadataCallback())

await ystore.write(b"a")
async with aiosqlite.connect(ystore.db_path) as db:
assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 1

now = time.time()

# assert that adding a record before document TTL doesn't delete document history
with patch("time.time") as mock_time:
mock_time.return_value = now
await ystore.write(b"b")
async with aiosqlite.connect(ystore.db_path) as db:
assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 2

# assert that adding a record after document TTL deletes previous document history
with patch("time.time") as mock_time:
mock_time.return_value = now + ystore.document_ttl + 1000
await ystore.write(b"c")
async with aiosqlite.connect(ystore.db_path) as db:
assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 1
26 changes: 23 additions & 3 deletions ypy_websocket/ystore.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import tempfile
import time
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
from typing import AsyncIterator, Callable, Optional, Tuple

Expand Down Expand Up @@ -126,6 +126,10 @@ class MySQLiteYStore(SQLiteYStore):
"""

db_path: str = "ystore.db"
# Determines the "time to live" for all documents, i.e. how recent the
# latest update of a document must be before purging document history.
# Defaults to 1 day.
document_ttl: int = 24 * 60 * 60
path: str
db_created: asyncio.Event

Expand All @@ -138,7 +142,10 @@ def __init__(self, path: str, metadata_callback: Optional[Callable] = None):
async def create_db(self):
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"CREATE TABLE IF NOT EXISTS yupdates (path TEXT, yupdate BLOB, metadata BLOB, timestamp TEXT)"
"CREATE TABLE IF NOT EXISTS yupdates (path TEXT NOT NULL, yupdate BLOB, metadata BLOB, timestamp REAL NOT NULL)"
)
await db.execute(
"CREATE INDEX IF NOT EXISTS idx_yupdates_path_timestamp ON yupdates (path, timestamp)"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you describe what this does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates a composite index that makes the SELECT ... WHERE path = ? ORDER BY timestamp query more efficient. The mental model is that this constructs a B-tree where records are first sorted by path, and then ties are resolved by the timestamp, which is the best data structure for this query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

)
await db.commit()
self.db_created.set()
Expand All @@ -163,8 +170,21 @@ async def write(self, data: bytes) -> None:
await self.db_created.wait()
metadata = await self.get_metadata()
async with aiosqlite.connect(self.db_path) as db:
# first, determine time elapsed since last update
cursor = await db.execute(
"SELECT timestamp FROM yupdates WHERE path = ? ORDER BY timestamp DESC LIMIT 1",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we need to order by timestamp, the updates are supposed to be already ordered, right?

Copy link
Contributor Author

@dlqqq dlqqq Nov 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily, and even if it is, I think it's better to be explicit about the order this query requires, to avoid breaking this query if the table schema were to change in the future.

AFAIK, tables without a primary key are ordered simply by insertion order. Thus, the oldest update would be returned without the ORDER BY clause, which is not what we want. We want the most recent update.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this also probably comes with an extra cost. In our case, insertion order is already ordered by time. Isn't it possible to get the last row in the query?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this also probably comes with an extra cost.

You're right, but this is addressed by the composite index that you commented on earlier. I'm not expert in SQLite performance characteristics, but there are some justifications for this:

  1. Fetching a record by its index is not significantly slower than fetching a record by its primary key. In fact, most primary keys are actually just implemented with an implicit index in SQLite. IOW, this is about as fast as it gets.

  2. Reading an existing record from an index is about an order of magnitude slower than writing a new record. So the performance of this query is negligible relative to the write that's happening in the INSERT statement that follows in this method.

In our case, insertion order is already ordered by time. Isn't it possible to get the last row in the query?

No, ascending order is assumed if ORDER BY is not present. https://www.sqlite.org/lang_select.html#the_order_by_clause

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do keep in mind that yes, I don't have benchmarks, so these rationalizations could be completely false. However, I think there is plenty of good justification for this implementation, and we shouldn't let performance ambiguity steer us away.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the details David 👍

(self.path,),
)
row = await cursor.fetchone()
diff = (time.time() - row[0]) if row else 0

# if diff > document_ttl, delete document history
if diff > self.document_ttl:
await db.execute("DELETE FROM yupdates WHERE path = ?", (self.path,))

# finally, write this update to the DB
await db.execute(
"INSERT INTO yupdates VALUES (?, ?, ?, ?)",
(self.path, data, metadata, datetime.utcnow()),
(self.path, data, metadata, time.time()),
dlqqq marked this conversation as resolved.
Show resolved Hide resolved
)
await db.commit()