Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

local backends #13

Merged
merged 13 commits into from
Oct 17, 2024
3 changes: 2 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ jobs:
with:
python-version: "3.10"
- run: pip install ".[test,pinecone]"
- run: coverage run --source=affine -m pytest -v tests/unit-tests
- run: pip install scikit-learn pynndescent annoy faiss-cpu
- run: coverage run --source=affine -m pytest -v --durations 0 tests/unit-tests
- run: coverage report
- name: upload coverage report as artifact
uses: actions/upload-artifact@v3
Expand Down
1 change: 0 additions & 1 deletion affine/engine/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ class Engine(ABC):
_RETURNS_NORMALIZED_FOR_COSINE = False

@abstractmethod
# TODO: add `return_vectors` as an argument here?
def _query(
self,
filter_set: FilterSet,
Expand Down
179 changes: 167 additions & 12 deletions affine/engine/local.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import pickle
import warnings
from abc import ABC, abstractmethod
from collections import defaultdict
from pathlib import Path
from typing import BinaryIO, Type

import numpy as np

from affine.collection import Collection, Filter, FilterSet, Similarity
from affine.collection import Collection, Filter, FilterSet, Metric, Similarity
from affine.engine import Engine
from affine.query import QueryObject

Expand Down Expand Up @@ -42,20 +43,164 @@ def apply_filters_to_records(
return ret


def filter_by_similarity(
similarity: Similarity, limit: int, records: list[Collection]
) -> list[Collection]:
vectors = np.stack([getattr(r, similarity.field).array for r in records])
query_vector = similarity.get_array()
distances = np.linalg.norm(vectors - query_vector, axis=1)
topk_indices = distances.argsort()[:limit]
return [records[i] for i in topk_indices]
def build_data_matrix(
field_name: str, records: list[Collection]
) -> np.ndarray:
return np.stack([getattr(r, field_name).array for r in records])


class LocalBackend(ABC):
@abstractmethod
def create_index(self, data: np.ndarray, metric: Metric) -> None:
pass

@abstractmethod
def query(self, q: np.ndarray, k: int) -> list[int]:
pass

# TODO: implement save and load
# @abstractmethod
# def save(self, fp):
# pass

# @abstractmethod
# def load(self, fp):
# pass


class NumPyBackend(LocalBackend):
def create_index(self, data: np.ndarray, metric: Metric) -> None:
self.metric = metric
self._index = data

def query(self, q: np.ndarray, k: int) -> list[int]:
if self.metric == Metric.COSINE:
return np.argsort(
-np.dot(self._index, q)
/ (
np.linalg.norm(
self._index, axis=1
) # * np.linalg.norm(q) don't need to divide by this if not returning distances
)
)[:k].tolist()
return np.linalg.norm(self._index - q, axis=1).argsort()[:k].tolist()


class KDTreeBackend(LocalBackend):
def __init__(self, **kwargs):
self.kwargs = kwargs

def create_index(self, data: np.ndarray, metric: Metric) -> None:
try:
from sklearn.neighbors import KDTree
except ModuleNotFoundError:
raise RuntimeError(
"KDTree backend requires scikit-learn to be installed"
)
self._metric = metric
if metric == Metric.COSINE:
data = data / np.linalg.norm(data, axis=1).reshape(-1, 1)

self.tree = KDTree(data, **self.kwargs)

def query(self, q: np.ndarray, k: int) -> list[int]:
# q should be shape (N,)
assert len(q.shape) == 1
q = q.reshape(1, -1)

if self._metric == Metric.COSINE:
q = q / np.linalg.norm(q)
return self.tree.query(q, k)[1][0].tolist()


class PyNNDescentBackend(LocalBackend):
def __init__(self, **kwargs):
self.kwargs = kwargs

def create_index(self, data: np.ndarray, metric: Metric) -> None:
try:
from pynndescent import NNDescent
except ModuleNotFoundError:
raise RuntimeError(
"PyNNDescentBackend backend requires pynndescent to be installed"
)
self.index = NNDescent(data, metric=metric.value, **self.kwargs)

def query(self, q: np.ndarray, k: int) -> list[int]:
if len(q.shape) == 1:
q = q.reshape(1, -1)
idxs, _ = self.index.query(q, k)
return idxs[0].tolist()


class AnnoyBackend(LocalBackend):
def __init__(self, n_trees: int, n_jobs: int = -1):
self.n_trees = n_trees
self.n_jobs = n_jobs

def create_index(self, data: np.ndarray, metric: Metric) -> None:
try:
from annoy import AnnoyIndex
except ModuleNotFoundError:
raise RuntimeError(
"AnnoyBackend backend requires annoy to be installed"
)

annoy_metric = "angular" if metric == Metric.COSINE else "euclidean"
self.index = AnnoyIndex(data.shape[1], metric=annoy_metric)
for i, v in enumerate(data):
self.index.add_item(i, v)
self.index.build(self.n_trees, self.n_jobs)

def query(self, q: np.ndarray, k: int) -> list[int]:
return self.index.get_nns_by_vector(q, k)


class FAISSBackend(LocalBackend):
def __init__(self, index_factory_str: str):
"""
Parameters
----------
index_factory_str : str
A string that specifies the index type to be created.
See https://github.com/facebookresearch/faiss/wiki/The-index-factory for details.
"""
self.index_factory_str = index_factory_str

def create_index(self, data: np.ndarray, metric: Metric) -> None:
try:
import faiss
except ModuleNotFoundError:
raise RuntimeError(
"FAISSBackend backend requires FAISS to be installed. See "
"https://github.com/facebookresearch/faiss/blob/main/INSTALL.md for installation instructions."
)
self.metric = metric
if metric == Metric.COSINE:
data = data / np.linalg.norm(data, axis=1).reshape(-1, 1)
self.index = faiss.index_factory(data.shape[1], self.index_factory_str)
self.index.add(data)

def query(self, q: np.ndarray, k: int) -> list[int]:
q = q.reshape(1, -1)
if self.metric == Metric.COSINE:
q = q / np.linalg.norm(q)
_, idxs = self.index.search(q, k)
assert idxs.shape[0] == 1
return idxs[0].tolist()


class LocalEngine(Engine):
def __init__(self) -> None: # maybe add option to the init for ANN algo
def __init__(
self, backend: LocalBackend | None = None
) -> None: # maybe add option to the init for ANN algo
self.records: dict[str, list[Collection]] = defaultdict(list)
self.build_collection_id_counter()
self.backend = backend or NumPyBackend()
# maps collection class name and then field name to metric
self.collection_name_to_field_to_metric: dict[
str, dict[str, Metric]
] = {}

def build_collection_id_counter(self):
# maybe pickle this too on save?
Expand Down Expand Up @@ -98,7 +243,14 @@ def _query(
return records
return records[:limit]

return filter_by_similarity(similarity, limit, records)
data = build_data_matrix(similarity.field, records)
q = similarity.get_array()
metric = self.collection_name_to_field_to_metric[
filter_set.collection
][similarity.field]
self.backend.create_index(data, metric)
neighbors = self.backend.query(q, limit)
return [records[i] for i in neighbors]

def insert(self, record: Collection) -> int:
record.id = self.collection_id_counter[record.__class__.__name__] + 1
Expand All @@ -108,7 +260,10 @@ def insert(self, record: Collection) -> int:
return record.id

def register_collection(self, collection_class: Type[Collection]) -> None:
pass
self.collection_name_to_field_to_metric[collection_class.__name__] = {
field_name: metric
for field_name, _, metric in collection_class.get_vector_fields()
}

def _delete_by_id(self, collection: Type[Collection], id: str) -> None:
collection_name = collection.__name__
Expand Down
11 changes: 9 additions & 2 deletions affine/engine/pinecone.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@
from pinecone import Index, Pinecone, PodSpec, ScoredVector, ServerlessSpec
from pinecone import Vector as PineconeVector

from affine.collection import Collection, Filter, FilterSet, Metric, Similarity
from affine.collection import (
Collection,
Filter,
FilterSet,
Metric,
Similarity,
Vector,
)
from affine.engine import Engine


Expand Down Expand Up @@ -79,7 +86,7 @@ def _convert_pinecone_to_collection(
)
kwargs = pc_record.metadata.copy()
if pc_record.values:
kwargs[vf_name] = pc_record.values
kwargs[vf_name] = Vector(pc_record.values)
else:
kwargs[vf_name] = None

Expand Down
2 changes: 1 addition & 1 deletion affine/engine/weaviate.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def _query(
similarity.get_list(),
target_vector=similarity.field,
filters=where_filter,
include_vector=True,
include_vector=with_vectors,
limit=limit,
).objects
else:
Expand Down
80 changes: 80 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ def data():


def _test_engine(db: Engine):
db.register_collection(Person)
db.register_collection(Product)
assert len(db.query(Person).all()) == 0

for rec in _data:
Expand Down Expand Up @@ -135,3 +137,81 @@ def _test_engine(db: Engine):
@pytest.fixture
def generic_test_engine():
return _test_engine


def _test_euclidean_similarity(db: Engine) -> list:
class TestCol(Collection):
a: float
b: Vector[100, Metric.EUCLIDEAN]

# generate 100 vectors to query against
records = [
TestCol(a=float(i), b=Vector([float(i + 1)] * 100)) for i in range(100)
]
db.register_collection(TestCol)
created_ids = []
for record in records:
created_ids.append(db.insert(record))

# query each vector and check the result
for i, record in enumerate(records):
q = (
db.query(TestCol, with_vectors=True)
.similarity(TestCol.b == record.b)
.limit(3)
)
assert len(q) == 3
for j in [-1, 0, 1]:
idx = i + j
if idx >= 0 and idx < 100:
assert records[i + j] in q

return created_ids


def _test_cosine_similarity(db: Engine):
# create vectors like [1, 1, 1, ...], [2, 2, 2, ...],
# and [1 + eps, 1, ...] and make sure cosine is working
class TestColCosine(Collection):
a: float
b: Vector[100, Metric.COSINE]

db.register_collection(TestColCosine)

created_ids = []
for i in range(50):
created_ids.append(
db.insert(
TestColCosine(a=float(2 * i), b=Vector([float(i + 1)] * 100))
)
)
created_ids.append(
db.insert(
TestColCosine(
a=float(2 * i + 1),
b=Vector([float(i + 1) + 1] + [float(i + 1)] * 99),
)
)
)

for i in range(50):
q = (
db.query(TestColCosine, with_vectors=True)
.similarity(TestColCosine.b == Vector([float(i + 1)] * 100))
.limit(3)
)
assert len(q) == 3
# all the vectors should have even index
assert all([int(r.a) % 2 == 0 for r in q])

return created_ids


@pytest.fixture
def generic_test_euclidean_similarity():
return _test_euclidean_similarity


@pytest.fixture
def generic_test_cosine_similarity():
return _test_cosine_similarity
4 changes: 4 additions & 0 deletions tests/functional-tests/test_pinecone.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,7 @@ def test_pinecone_engine(
)
== 0
)


def test_similarity(db: PineconeEngine, generic_test_similarity, created_ids):
created_ids.extend(generic_test_similarity(db))
10 changes: 10 additions & 0 deletions tests/functional-tests/test_qdrant.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ def test_qdrant_engine(db: QdrantEngine, generic_test_engine):
generic_test_engine(db)


def test_euclidean_similarity(
db: QdrantEngine, generic_test_euclidean_similarity
):
generic_test_euclidean_similarity(db)


def test_cosine_similarity(db: QdrantEngine, generic_test_cosine_similarity):
generic_test_cosine_similarity(db)


def test_auto_creation(
PersonCollection: Type[Collection],
ProductCollection: Type[Collection],
Expand Down
Loading
Loading