From 4a900eb1b18d76172762555ab34c2750dca80ad0 Mon Sep 17 00:00:00 2001 From: Trayan Azarov Date: Sun, 14 Apr 2024 09:21:45 +0300 Subject: [PATCH 1/5] feat: Connection pool FD leak v2 --- chromadb/db/impl/sqlite_pool.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/chromadb/db/impl/sqlite_pool.py b/chromadb/db/impl/sqlite_pool.py index 83a3edf104b..1f2ed6d0d0e 100644 --- a/chromadb/db/impl/sqlite_pool.py +++ b/chromadb/db/impl/sqlite_pool.py @@ -1,4 +1,5 @@ import sqlite3 +import weakref from abc import ABC, abstractmethod from typing import Any, Set import threading @@ -70,7 +71,7 @@ class LockPool(Pool): shared cache mode. We use the shared cache mode to allow multiple threads to share a database. """ - _connections: Set[Connection] + _connections: Set[weakref.ReferenceType[Connection]] _lock: threading.RLock _connection: threading.local _db_file: str @@ -93,7 +94,7 @@ def connect(self, *args: Any, **kwargs: Any) -> Connection: self, self._db_file, self._is_uri, *args, **kwargs ) self._connection.conn = new_connection - self._connections.add(new_connection) + self._connections.add(weakref.ref(new_connection)) return new_connection @override @@ -106,7 +107,8 @@ def return_to_pool(self, conn: Connection) -> None: @override def close(self) -> None: for conn in self._connections: - conn.close_actual() + if conn() is not None: + conn().close_actual() # type: ignore self._connections.clear() self._connection = threading.local() try: @@ -120,7 +122,7 @@ class PerThreadPool(Pool): extended to do so and block on connect() if the cap is reached. """ - _connections: Set[Connection] + _connections: Set[weakref.ReferenceType[Connection]] _lock: threading.Lock _connection: threading.local _db_file: str @@ -143,14 +145,15 @@ def connect(self, *args: Any, **kwargs: Any) -> Connection: ) self._connection.conn = new_connection with self._lock: - self._connections.add(new_connection) + self._connections.add(weakref.ref(new_connection)) return new_connection @override def close(self) -> None: with self._lock: for conn in self._connections: - conn.close_actual() + if conn() is not None: + conn().close_actual() # type: ignore self._connections.clear() self._connection = threading.local() From 0c8de0a526cddb749356a5af77636c6a8a2fd325 Mon Sep 17 00:00:00 2001 From: Trayan Azarov Date: Sun, 14 Apr 2024 09:28:38 +0300 Subject: [PATCH 2/5] fix: Making it work under python 3.8 --- chromadb/db/impl/sqlite_pool.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/chromadb/db/impl/sqlite_pool.py b/chromadb/db/impl/sqlite_pool.py index 1f2ed6d0d0e..5444d71b4bc 100644 --- a/chromadb/db/impl/sqlite_pool.py +++ b/chromadb/db/impl/sqlite_pool.py @@ -4,6 +4,7 @@ from typing import Any, Set import threading from overrides import override +from typing_extensions import Annotated class Connection: @@ -71,7 +72,7 @@ class LockPool(Pool): shared cache mode. We use the shared cache mode to allow multiple threads to share a database. """ - _connections: Set[weakref.ReferenceType[Connection]] + _connections: Set[Annotated[weakref.ReferenceType, Connection]] _lock: threading.RLock _connection: threading.local _db_file: str @@ -122,7 +123,7 @@ class PerThreadPool(Pool): extended to do so and block on connect() if the cap is reached. """ - _connections: Set[weakref.ReferenceType[Connection]] + _connections: Set[Annotated[weakref.ReferenceType, Connection]] _lock: threading.Lock _connection: threading.local _db_file: str From dc274441714b4335707f42fe05650deab239b181 Mon Sep 17 00:00:00 2001 From: Trayan Azarov Date: Tue, 16 Apr 2024 20:34:42 +0300 Subject: [PATCH 3/5] test: Added invariants for fd in multithreaded tests --- chromadb/test/property/invariants.py | 18 ++++++++++++++++++ chromadb/test/test_multithreaded.py | 4 +++- requirements_dev.txt | 1 + 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/chromadb/test/property/invariants.py b/chromadb/test/property/invariants.py index c992f18f5a0..c731b4c3049 100644 --- a/chromadb/test/property/invariants.py +++ b/chromadb/test/property/invariants.py @@ -1,4 +1,8 @@ +import gc import math + +import psutil + from chromadb.test.property.strategies import NormalizedRecordSet, RecordSet from typing import Callable, Optional, Tuple, Union, List, TypeVar, cast from typing_extensions import Literal @@ -163,6 +167,20 @@ def _exact_distances( return np.argsort(distances).tolist(), distances.tolist() +def fd_not_exceeding_threadpool_size(threadpool_size: int) -> None: + """ + Checks that the open file descriptors are not exceeding the threadpool size + works only for SegmentAPI + """ + current_process = psutil.Process() + open_files = current_process.open_files() + if len([p.path for p in open_files if "sqlite3" in p.path]) - 1 <= threadpool_size: + gc.collect() # GC to collect the orphaned TLS objects + assert ( + len([p.path for p in open_files if "sqlite3" in p.path]) - 1 <= threadpool_size + ) + + def ann_accuracy( collection: Collection, record_set: RecordSet, diff --git a/chromadb/test/test_multithreaded.py b/chromadb/test/test_multithreaded.py index 7cad62a07fe..8673809f62a 100644 --- a/chromadb/test/test_multithreaded.py +++ b/chromadb/test/test_multithreaded.py @@ -7,6 +7,7 @@ from chromadb.api import ClientAPI import chromadb.test.property.invariants as invariants +from chromadb.api.segment import SegmentAPI from chromadb.test.property.strategies import RecordSet from chromadb.test.property.strategies import test_hnsw_config from chromadb.types import Metadata @@ -193,7 +194,8 @@ def perform_operation( exception = future.exception() if exception is not None: raise exception - + if isinstance(api, SegmentAPI): # we can't check invariants for FastAPI + invariants.fd_not_exceeding_threadpool_size(num_workers) # Check that invariants hold invariants.count(coll, records_set) invariants.ids_match(coll, records_set) diff --git a/requirements_dev.txt b/requirements_dev.txt index 53d311409fe..15333a4ce40 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -6,6 +6,7 @@ hypothesis>=6.103.1 hypothesis[numpy]>=6.103.1 mypy-protobuf pre-commit +psutil pytest pytest-asyncio pytest-xdist From 4ce4727fb912744cbfdfd8780f279772e547f19d Mon Sep 17 00:00:00 2001 From: Trayan Azarov Date: Wed, 17 Apr 2024 13:59:45 +0300 Subject: [PATCH 4/5] test: Invariant is now applied only to persisted local segment --- chromadb/test/property/invariants.py | 12 +++++++++++- chromadb/test/test_multithreaded.py | 4 +++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/chromadb/test/property/invariants.py b/chromadb/test/property/invariants.py index c731b4c3049..a7bd2c6cade 100644 --- a/chromadb/test/property/invariants.py +++ b/chromadb/test/property/invariants.py @@ -1,5 +1,6 @@ import gc import math +from time import sleep import psutil @@ -174,8 +175,17 @@ def fd_not_exceeding_threadpool_size(threadpool_size: int) -> None: """ current_process = psutil.Process() open_files = current_process.open_files() - if len([p.path for p in open_files if "sqlite3" in p.path]) - 1 <= threadpool_size: + max_retries = 5 + retry_count = 0 + # we probably don't need the below but we keep it to avoid flaky tests. + while ( + len([p.path for p in open_files if "sqlite3" in p.path]) - 1 > threadpool_size + and retry_count < max_retries + ): gc.collect() # GC to collect the orphaned TLS objects + open_files = current_process.open_files() + retry_count += 1 + sleep(1) assert ( len([p.path for p in open_files if "sqlite3" in p.path]) - 1 <= threadpool_size ) diff --git a/chromadb/test/test_multithreaded.py b/chromadb/test/test_multithreaded.py index 8673809f62a..a2d6f2e1ba5 100644 --- a/chromadb/test/test_multithreaded.py +++ b/chromadb/test/test_multithreaded.py @@ -194,7 +194,9 @@ def perform_operation( exception = future.exception() if exception is not None: raise exception - if isinstance(api, SegmentAPI): # we can't check invariants for FastAPI + if ( + isinstance(api, SegmentAPI) and api.get_settings().is_persistent is True + ): # we can't check invariants for FastAPI invariants.fd_not_exceeding_threadpool_size(num_workers) # Check that invariants hold invariants.count(coll, records_set) From 1c0c615d621b3169bf37d02fb160eb2de08f6a6b Mon Sep 17 00:00:00 2001 From: Trayan Azarov Date: Mon, 22 Jul 2024 15:18:30 +0300 Subject: [PATCH 5/5] test: Fixed var ref --- chromadb/test/test_multithreaded.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chromadb/test/test_multithreaded.py b/chromadb/test/test_multithreaded.py index a2d6f2e1ba5..745f562f4b0 100644 --- a/chromadb/test/test_multithreaded.py +++ b/chromadb/test/test_multithreaded.py @@ -195,7 +195,7 @@ def perform_operation( if exception is not None: raise exception if ( - isinstance(api, SegmentAPI) and api.get_settings().is_persistent is True + isinstance(client, SegmentAPI) and client.get_settings().is_persistent is True ): # we can't check invariants for FastAPI invariants.fd_not_exceeding_threadpool_size(num_workers) # Check that invariants hold