Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt to fix for #13 #14 #15

Merged
merged 3 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
python-version: ["3.10", "3.11"]
python-version: ["3.10", "3.11", "3.12"]
os: [ubuntu-latest, macos-latest, windows-latest]

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ htmlcov
.DS_Store
.vscode
geckodriver.log
noupload

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ Unlike `TinyDB` which has a minimal core, `Async-TinyDB` is designed to have max

* **Event Hooks**: You can now use event hooks to hook into an operation. See [Event Hooks](./docs/EventHooks.md) for more details.

* **Redesigned ID & Doc Class**: You can [replace](#replacing-id-&-document-class) and [customise them](#customise-id-class) more pleasingly.
* **Redesigned ID & Doc Class**: You can [replace](#replacing-id-&-document-class) and [customise them](#customise-id-class) easily.

* **DB-level Caching**: This significantly improves the performance of all operations. However, it may cause dirty reads with some types of storage [^disable-db-level].

* **Built-in `Modifier`**: Use `Modifier` to easily [compress](./docs/Modifier.md#Compression), [encrypt](#encryption) and [extend types](./docs/Modifier.md#Conversion) of your database. Sure you can do much more than these. _(See [Modifier](./docs/Modifier.md))_

* **Isolation Level**: Performance or thread-safe or even ACID? It's up to you[^isolevel].
* **Isolation Level**: Performance or ACID? It's up to you[^isolevel].

* **Atomic Write**: Shipped with `JSONStorage`

Expand Down
12 changes: 8 additions & 4 deletions asynctinydb/storages.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Any, Callable, Awaitable, Mapping, MutableMapping, TypeVar, cast
from typing import TypeAlias
import os
import shutil
from tempfile import NamedTemporaryFile
import ujson as json
from vermils.react import EventHook, ActionChain, EventHint, ActionCentipede
Expand Down Expand Up @@ -218,11 +219,10 @@ async def write(self, data: Mapping):
await self._event_hook.aemit("write.pre", self, data))
data = pre if pre is not None else data
# Convert keys to strings
data = await self._sink.run(stringify_keys, data)
data = stringify_keys(data)

# Serialize the database state using the user-provided arguments
task = self._sink.run(json.dumps, data or {}, **self.kwargs)
serialized: bytes | str = await task
serialized: bytes | str = json.dumps(data or {}, **self.kwargs)

# Post-process the serialized data
if 'b' in self._mode and isinstance(serialized, str):
Expand Down Expand Up @@ -267,7 +267,11 @@ def _atomic_write(self, data):
f.close()

# Use os.replace to ensure atomicity
os.replace(f.name, self._path)
try:
os.replace(f.name, self._path)
except OSError:
shutil.copy(f.name, self._path)
os.remove(f.name)

def __del__(self):
try:
Expand Down
110 changes: 59 additions & 51 deletions asynctinydb/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from contextlib import suppress
import inspect
import uuid
import json # For pretty printing
import asyncio
from copy import deepcopy
from typing import AsyncGenerator, Collection, MutableMapping
from typing import AsyncGenerator, Collection, Coroutine, MutableMapping
from typing import overload, Callable, Iterable
from typing import Mapping, Generic, cast, TypeVar, Type, Any, ParamSpec
from .queries import QueryLike, is_cacheable
from vermils.react import EventHook, EventHint, ActionChain
from .storages import Storage
from .utils import LRUCache
from vermils.asynctools import sync_await, AsinkRunner, async_run
from vermils.asynctools import sync_await

__all__ = ("Document", "Table", "IncreID")
IDVar = TypeVar("IDVar", bound="BaseID")
Expand Down Expand Up @@ -130,6 +132,7 @@ def mark_existed(cls, table: Table, new_id: StrID):
def clear_cache(cls, table: Table):
...


class UUID(uuid.UUID, BaseID):
"""ID class using uuid4 UUIDs."""

Expand Down Expand Up @@ -261,7 +264,7 @@ def __init__(
"""Whether to disable the DB-level cache for this table."""
self._storage = storage
self._name = name
self._cache: MutableMapping[IDVar, DocVar] = None # type: ignore[assignment]
self._cache: MutableMapping[IDVar, DocVar] | None = None
"""Cache for documents in this table."""
self._query_cache: LRUCache[QueryLike, tuple[IDVar, ...]] \
= self.query_cache_class(capacity=cache_size)
Expand All @@ -271,8 +274,9 @@ def __init__(

self._isolevel = 0
self._closed = False
self._sink = AsinkRunner()
"""Serialise all operations on this table."""
self._lock = asyncio.Lock()
self._query_cache_clear_flag = False
self._data_cache_clear_flag = False

self._event_hook = EventHook()
"""Hook for events."""
Expand Down Expand Up @@ -441,7 +445,7 @@ async def search(
"""

table = await self._read_table()
ret = await self._run_with_iso(self._search, cond, table, limit, doc_ids)
ret = self._search(cond, table, limit, doc_ids)
return list(ret.values())

async def get(
Expand All @@ -465,7 +469,7 @@ async def get(

table = await self._read_table()
doc_ids = None if doc_id is None else (doc_id,)
ret = await self._run_with_iso(self._search, cond, table, 1, doc_ids)
ret = self._search(cond, table, 1, doc_ids)
if ret:
return ret.popitem()[1]
return None
Expand Down Expand Up @@ -698,7 +702,6 @@ async def close(self) -> None:
if not self._closed:
self.clear_cache()
self.clear_data_cache()
await self._sink.aclose()
self._closed = True

def clear_cache(self) -> None:
Expand All @@ -708,9 +711,7 @@ def clear_cache(self) -> None:
Scheduled to be executed immediately
"""

# Put function in execution queue and run with a higher priority
# This is to ensure thread safety
self._sink.sync_run_as(16, self._query_cache.clear).result()
self._query_cache_clear_flag = True

def clear_data_cache(self):
"""
Expand All @@ -719,10 +720,7 @@ def clear_data_cache(self):
Scheduled to be executed immediately
"""

def updater():
self._cache = None
# Put function in execution queue and run with a higher priority
self._sink.sync_run_as(16, updater).result()
self._data_cache_clear_flag = True

def __len__(self):
"""
Expand Down Expand Up @@ -759,7 +757,6 @@ def __del__(self):
"""
Clean up the table.
"""
self._sink.close()

def _search(self, cond: QueryLike | None,
docs: MutableMapping[IDVar, DocVar],
Expand All @@ -772,6 +769,10 @@ def _search(self, cond: QueryLike | None,
cond = cast(QueryLike, cond)
# Only cache cacheable queries, this value may alter.

if self._query_cache_clear_flag:
self._query_cache.clear()
self._query_cache_clear_flag = False

# First, we check if the query has a cache
cached_ids = self._query_cache.get(cond) if cacheable else None
if cached_ids is not None:
Expand Down Expand Up @@ -823,25 +824,34 @@ def _search(self, cond: QueryLike | None,

return deepcopy(docs) if self._isolevel >= 2 else docs.copy()

async def _read_table(self) -> MutableMapping[IDVar, DocVar]:
async def _read_table(self, block=True) -> MutableMapping[IDVar, DocVar]:
"""
Read the table data from the underlying storage
if cache is not exist.
"""

# If cache exists
if self._cache is not None:
return self._cache
try:
if block:
await self._lock.acquire()
# If cache exists
if self._cache is not None and not self._data_cache_clear_flag:
return self._cache

self._data_cache_clear_flag = False

# Read the table data from the underlying storage
raw = await self._read_raw_table()
cooked: dict[IDVar, DocVar] | None = None
# Read the table data from the underlying storage
raw = await self._read_raw_table()
cooked = None

cooked = await self._run_with_iso(self._cook, raw)
if not self.no_dbcache:
# Caching if no_dbcache is not set
self._cache = cooked
return cooked
cooked = self._cook(raw)
if not self.no_dbcache:
# Caching if no_dbcache is not set
self._cache = cooked
return cooked

finally:
if block:
self._lock.release()

def _cook(self, raw: Mapping[Any, Mapping]
) -> MutableMapping[IDVar, DocVar]:
Expand Down Expand Up @@ -872,7 +882,9 @@ async def _read_raw_table(self) -> MutableMapping[Any, Mapping]:
return tables.get(self.name, {})

async def _update_table(self,
updater: Callable[[MutableMapping[IDVar, DocVar]], None]):
updater: Callable[
[MutableMapping[IDVar, DocVar]],
None | Coroutine[None, None, None]]):
"""
Perform a table update operation.

Expand All @@ -886,31 +898,27 @@ async def _update_table(self,
document class, as the table data will *not* be returned to the user.
"""

tables: MutableMapping[Any, Mapping] = await self._storage.read() or {}
async with self._lock:
tables: MutableMapping[Any, Mapping] = await self._storage.read() or {}

table = await self._read_table()
table = await self._read_table(block=False)

# Perform the table update operation
await self._run_with_iso(updater, table)
tables[self.name] = table
# Perform the table update operation
ret = updater(table)
if inspect.isawaitable(ret):
await ret
tables[self.name] = table

try:
# Write the newly updated data back to the storage
await self._storage.write(tables)
except BaseException:
# Writing failure, data cache is out of sync
self.clear_data_cache()
raise
finally:
# Clear the query cache, as the table contents have changed
self.clear_cache()

async def _run_with_iso(self, func: Callable[ARGS, V],
*args: ARGS.args, **kwargs: ARGS.kwargs) -> V:
"""Run sync function with isolation level"""
if self._isolevel:
return await self._sink.run(func, *args, **kwargs)
return await async_run(func, *args, **kwargs)
try:
# Write the newly updated data back to the storage
await self._storage.write(tables)
except BaseException:
# Writing failure, data cache is out of sync
self.clear_data_cache()
raise
finally:
# Clear the query cache, as the table contents have changed
self._query_cache.clear()


###### Event Hints ######
Expand Down
Loading