From 49f577b9a04e5169e95332f9a0f87865fa7c393e Mon Sep 17 00:00:00 2001 From: vdergachyov Date: Sun, 1 Jan 2023 18:07:21 +0600 Subject: [PATCH 1/4] Reduce CPU usage in the connection thread loop --- aiosqlite/core.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/aiosqlite/core.py b/aiosqlite/core.py index 4acc895..273b8aa 100644 --- a/aiosqlite/core.py +++ b/aiosqlite/core.py @@ -69,6 +69,10 @@ def __init__( DeprecationWarning, ) + def _stop_running(self): + self._running = False + self._tx.put_nowait(None) + @property def _conn(self) -> sqlite3.Connection: if self._connection is None: @@ -99,12 +103,13 @@ def run(self) -> None: # Continues running until all queue items are processed, # even after connection is closed (so we can finalize all # futures) - try: - future, function = self._tx.get(timeout=0.1) - except Empty: - if self._running: - continue + + tx_item = self._tx.get() + if tx_item == None: break + + future, function = tx_item + try: LOG.debug("executing %s", function) result = function() @@ -144,7 +149,7 @@ async def _connect(self) -> "Connection": self._tx.put_nowait((future, self._connector)) self._connection = await future except Exception: - self._running = False + self._stop_running() self._connection = None raise @@ -181,7 +186,7 @@ async def close(self) -> None: LOG.info("exception occurred while closing connection") raise finally: - self._running = False + self._stop_running() self._connection = None @contextmanager From 95ec896f1925dcaa891446615948107c270d4fab Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 5 Jan 2024 21:02:35 -1000 Subject: [PATCH 2/4] Additional speed ups and typing --- aiosqlite/core.py | 43 ++++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/aiosqlite/core.py b/aiosqlite/core.py index 273b8aa..aba45f0 100644 --- a/aiosqlite/core.py +++ b/aiosqlite/core.py @@ -12,7 +12,7 @@ import warnings from functools import partial from pathlib import Path -from queue import Empty, Queue +from queue import Empty, Queue, SimpleQueue from threading import Thread from typing import ( Any, @@ -21,6 +21,7 @@ Generator, Iterable, Optional, + Tuple, Type, Union, ) @@ -42,11 +43,19 @@ IsolationLevel = Optional[Literal["DEFERRED", "IMMEDIATE", "EXCLUSIVE"]] -def get_loop(future: asyncio.Future) -> asyncio.AbstractEventLoop: - if sys.version_info >= (3, 7): - return future.get_loop() - else: - return future._loop +def set_result(fut: asyncio.Future, result: Any) -> None: + """Set the result of a future if it hasn't been set already.""" + if not fut.done(): + fut.set_result(result) + + +def set_exception(fut: asyncio.Future, e: BaseException) -> None: + """Set the exception of a future if it hasn't been set already.""" + if not fut.done(): + fut.set_exception(e) + + +_STOP_RUNNING_SENTINEL = object() class Connection(Thread): @@ -60,7 +69,9 @@ def __init__( self._running = True self._connection: Optional[sqlite3.Connection] = None self._connector = connector - self._tx: Queue = Queue() + self._tx: SimpleQueue[ + Optional[Tuple[asyncio.Future, Callable[[], None]]] + ] = SimpleQueue() self._iter_chunk_size = iter_chunk_size if loop is not None: @@ -71,7 +82,7 @@ def __init__( def _stop_running(self): self._running = False - self._tx.put_nowait(None) + self._tx.put_nowait(_STOP_RUNNING_SENTINEL) @property def _conn(self) -> sqlite3.Connection: @@ -105,7 +116,7 @@ def run(self) -> None: # futures) tx_item = self._tx.get() - if tx_item == None: + if tx_item is _STOP_RUNNING_SENTINEL: break future, function = tx_item @@ -114,20 +125,10 @@ def run(self) -> None: LOG.debug("executing %s", function) result = function() LOG.debug("operation %s completed", function) - - def set_result(fut, result): - if not fut.done(): - fut.set_result(result) - - get_loop(future).call_soon_threadsafe(set_result, future, result) + future.get_loop().call_soon_threadsafe(set_result, future, result) except BaseException as e: LOG.debug("returning exception %s", e) - - def set_exception(fut, e): - if not fut.done(): - fut.set_exception(e) - - get_loop(future).call_soon_threadsafe(set_exception, future, e) + future.get_loop().call_soon_threadsafe(set_exception, future, e) async def _execute(self, fn, *args, **kwargs): """Queue a function with the given arguments for execution.""" From 898165153b07b97c50be9b4eb50e90c84b0f7782 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 5 Jan 2024 21:23:57 -1000 Subject: [PATCH 3/4] lint --- aiosqlite/core.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/aiosqlite/core.py b/aiosqlite/core.py index 99b74b3..a70d7d1 100644 --- a/aiosqlite/core.py +++ b/aiosqlite/core.py @@ -12,8 +12,18 @@ from pathlib import Path from queue import Empty, Queue, SimpleQueue from threading import Thread -from typing import (Any, AsyncIterator, Callable, Generator, Iterable, Literal, - Optional, Tuple, Type, Union) +from typing import ( + Any, + AsyncIterator, + Callable, + Generator, + Iterable, + Literal, + Optional, + Tuple, + Type, + Union, +) from warnings import warn from .context import contextmanager From acf9d418e5201ee93e9d1e0df454135aaefa6d33 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 5 Jan 2024 21:38:00 -1000 Subject: [PATCH 4/4] lint --- aiosqlite/core.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/aiosqlite/core.py b/aiosqlite/core.py index a70d7d1..c2f7449 100644 --- a/aiosqlite/core.py +++ b/aiosqlite/core.py @@ -63,9 +63,7 @@ def __init__( self._running = True self._connection: Optional[sqlite3.Connection] = None self._connector = connector - self._tx: SimpleQueue[ - Optional[Tuple[asyncio.Future, Callable[[], None]]] - ] = SimpleQueue() + self._tx: SimpleQueue[Tuple[asyncio.Future, Callable[[], Any]]] = SimpleQueue() self._iter_chunk_size = iter_chunk_size if loop is not None: @@ -76,7 +74,8 @@ def __init__( def _stop_running(self): self._running = False - self._tx.put_nowait(_STOP_RUNNING_SENTINEL) + # PEP 661 is not accepted yet, so we cannot type a sentinel + self._tx.put_nowait(_STOP_RUNNING_SENTINEL) # type: ignore[arg-type] @property def _conn(self) -> sqlite3.Connection: