Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH]: CIP-5: Large Batch Handling Improvements Proposal #1077

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions chromadb/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,3 +378,10 @@ def get_settings(self) -> Settings:

"""
pass

@property
@abstractmethod
def max_batch_size(self) -> int:
"""Return the maximum number of records that can be submitted in a single call
to submit_embeddings."""
pass
81 changes: 46 additions & 35 deletions chromadb/api/fastapi.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
from typing import Optional, cast
from typing import Optional, cast, Tuple
from typing import Sequence
from uuid import UUID

Expand All @@ -22,6 +22,7 @@
GetResult,
QueryResult,
CollectionMetadata,
validate_batch,
)
from chromadb.auth import (
ClientAuthProvider,
Expand All @@ -34,6 +35,7 @@

class FastAPI(API):
_settings: Settings
_max_batch_size: int = -1

def __init__(self, system: System):
super().__init__(system)
Expand Down Expand Up @@ -249,6 +251,29 @@ def _delete(
raise_chroma_error(resp)
return cast(IDs, resp.json())

def _submit_batch(
self,
batch: Tuple[
IDs, Optional[Embeddings], Optional[Metadatas], Optional[Documents]
],
url: str,
) -> requests.Response:
"""
Submits a batch of embeddings to the database
"""
resp = self._session.post(
self._api_url + url,
data=json.dumps(
{
"ids": batch[0],
"embeddings": batch[1],
"metadatas": batch[2],
"documents": batch[3],
}
),
)
return resp

@override
def _add(
self,
Expand All @@ -262,18 +287,9 @@ def _add(
Adds a batch of embeddings to the database
- pass in column oriented data lists
"""
resp = self._session.post(
self._api_url + "/collections/" + str(collection_id) + "/add",
data=json.dumps(
{
"ids": ids,
"embeddings": embeddings,
"metadatas": metadatas,
"documents": documents,
}
),
)

batch = (ids, embeddings, metadatas, documents)
validate_batch(batch, {"max_batch_size": self.max_batch_size})
resp = self._submit_batch(batch, "/collections/" + str(collection_id) + "/add")
raise_chroma_error(resp)
return True

Expand All @@ -290,18 +306,11 @@ def _update(
Updates a batch of embeddings in the database
- pass in column oriented data lists
"""
resp = self._session.post(
self._api_url + "/collections/" + str(collection_id) + "/update",
data=json.dumps(
{
"ids": ids,
"embeddings": embeddings,
"metadatas": metadatas,
"documents": documents,
}
),
batch = (ids, embeddings, metadatas, documents)
validate_batch(batch, {"max_batch_size": self.max_batch_size})
resp = self._submit_batch(
batch, "/collections/" + str(collection_id) + "/update"
)

resp.raise_for_status()
return True

Expand All @@ -318,18 +327,11 @@ def _upsert(
Upserts a batch of embeddings in the database
- pass in column oriented data lists
"""
resp = self._session.post(
self._api_url + "/collections/" + str(collection_id) + "/upsert",
data=json.dumps(
{
"ids": ids,
"embeddings": embeddings,
"metadatas": metadatas,
"documents": documents,
}
),
batch = (ids, embeddings, metadatas, documents)
validate_batch(batch, {"max_batch_size": self.max_batch_size})
resp = self._submit_batch(
batch, "/collections/" + str(collection_id) + "/upsert"
)

resp.raise_for_status()
return True

Expand Down Expand Up @@ -387,6 +389,15 @@ def get_settings(self) -> Settings:
"""Returns the settings of the client"""
return self._settings

@property
@override
def max_batch_size(self) -> int:
resp = self._session.get(self._api_url + "/pre-flight-checks")
tazarov marked this conversation as resolved.
Show resolved Hide resolved
raise_chroma_error(resp)
if self._max_batch_size == -1:
self._max_batch_size = cast(int, resp.json()["max_batch_size"])
return self._max_batch_size


def raise_chroma_error(resp: requests.Response) -> None:
"""Raises an error if the response is not ok, using a ChromaError if possible"""
Expand Down
46 changes: 40 additions & 6 deletions chromadb/api/segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
validate_update_metadata,
validate_where,
validate_where_document,
validate_batch,
)
from chromadb.telemetry.events import CollectionAddEvent, CollectionDeleteEvent

Expand All @@ -38,6 +39,7 @@
import logging
import re


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -241,9 +243,18 @@ def _add(
) -> bool:
coll = self._get_collection(collection_id)
self._manager.hint_use_collection(collection_id, t.Operation.ADD)

validate_batch(
(ids, embeddings, metadatas, documents),
{"max_batch_size": self.max_batch_size},
)
records_to_submit = []
for r in _records(t.Operation.ADD, ids, embeddings, metadatas, documents):
for r in _records(
t.Operation.UPSERT,
ids=ids,
embeddings=embeddings,
metadatas=metadatas,
documents=documents,
):
self._validate_embedding_record(coll, r)
records_to_submit.append(r)
self._producer.submit_embeddings(coll["topic"], records_to_submit)
Expand All @@ -262,9 +273,18 @@ def _update(
) -> bool:
coll = self._get_collection(collection_id)
self._manager.hint_use_collection(collection_id, t.Operation.UPDATE)

validate_batch(
(ids, embeddings, metadatas, documents),
{"max_batch_size": self.max_batch_size},
)
records_to_submit = []
for r in _records(t.Operation.UPDATE, ids, embeddings, metadatas, documents):
for r in _records(
t.Operation.UPSERT,
ids=ids,
embeddings=embeddings,
metadatas=metadatas,
documents=documents,
):
self._validate_embedding_record(coll, r)
records_to_submit.append(r)
self._producer.submit_embeddings(coll["topic"], records_to_submit)
Expand All @@ -282,9 +302,18 @@ def _upsert(
) -> bool:
coll = self._get_collection(collection_id)
self._manager.hint_use_collection(collection_id, t.Operation.UPSERT)

validate_batch(
(ids, embeddings, metadatas, documents),
{"max_batch_size": self.max_batch_size},
)
records_to_submit = []
for r in _records(t.Operation.UPSERT, ids, embeddings, metadatas, documents):
for r in _records(
t.Operation.UPSERT,
ids=ids,
embeddings=embeddings,
metadatas=metadatas,
documents=documents,
):
self._validate_embedding_record(coll, r)
records_to_submit.append(r)
self._producer.submit_embeddings(coll["topic"], records_to_submit)
Expand Down Expand Up @@ -524,6 +553,11 @@ def reset(self) -> bool:
def get_settings(self) -> Settings:
return self._settings

@property
@override
def max_batch_size(self) -> int:
return self._producer.max_batch_size

def _topic(self, collection_id: UUID) -> str:
return f"persistent://{self._tenant_id}/{self._topic_ns}/{collection_id}"

Expand Down
12 changes: 11 additions & 1 deletion chromadb/api/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional, Union, Sequence, TypeVar, List, Dict, Any
from typing import Optional, Union, Sequence, TypeVar, List, Dict, Any, Tuple
from typing_extensions import Literal, TypedDict, Protocol
import chromadb.errors as errors
from chromadb.types import (
Expand Down Expand Up @@ -343,3 +343,13 @@ def validate_embeddings(embeddings: Embeddings) -> Embeddings:
f"Expected each value in the embedding to be a int or float, got {embeddings}"
)
return embeddings


def validate_batch(
batch: Tuple[IDs, Optional[Embeddings], Optional[Metadatas], Optional[Documents]],
limits: Dict[str, Any],
) -> None:
if len(batch[0]) > limits["max_batch_size"]:
raise ValueError(
f"Batch size {len(batch[0])} exceeds maximum batch size {limits['max_batch_size']}"
)
8 changes: 8 additions & 0 deletions chromadb/server/fastapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ def __init__(self, settings: Settings):
self.router.add_api_route("/api/v1/reset", self.reset, methods=["POST"])
self.router.add_api_route("/api/v1/version", self.version, methods=["GET"])
self.router.add_api_route("/api/v1/heartbeat", self.heartbeat, methods=["GET"])
self.router.add_api_route(
"/api/v1/pre-flight-checks", self.pre_flight_checks, methods=["GET"]
)

self.router.add_api_route(
"/api/v1/collections",
Expand Down Expand Up @@ -312,3 +315,8 @@ def get_nearest_neighbors(
include=query.include,
)
return nnresult

def pre_flight_checks(self) -> Dict[str, Any]:
return {
"max_batch_size": self._api.max_batch_size,
}
70 changes: 70 additions & 0 deletions chromadb/test/property/test_add.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import random
import uuid
from random import randint
from typing import cast
import pytest
import hypothesis.strategies as st
Expand All @@ -6,6 +9,7 @@
from chromadb.api.types import Embeddings
import chromadb.test.property.strategies as strategies
import chromadb.test.property.invariants as invariants
from chromadb.utils.batch_utils import create_batches

collection_st = st.shared(strategies.collections(with_hnsw_params=True), key="coll")

Expand Down Expand Up @@ -44,6 +48,72 @@ def test_add(
)


def create_large_recordset(
min_size: int = 45000,
max_size: int = 50000,
) -> strategies.RecordSet:
size = randint(min_size, max_size)

ids = [str(uuid.uuid4()) for _ in range(size)]
metadatas = [{"some_key": f"{i}"} for i in range(size)]
documents = [f"Document {i}" for i in range(size)]
embeddings = [[1, 2, 3] for _ in range(size)]
return {
"ids": ids,
"embeddings": embeddings,
"metadatas": metadatas,
"documents": documents,
}


@given(collection=collection_st)
@settings(deadline=None, max_examples=1)
def test_add_large(api: API, collection: strategies.Collection) -> None:
api.reset()
record_set = create_large_recordset(
min_size=api.max_batch_size,
max_size=api.max_batch_size + int(api.max_batch_size * random.random()),
)
coll = api.create_collection(
name=collection.name,
metadata=collection.metadata,
embedding_function=collection.embedding_function,
)
normalized_record_set = invariants.wrap_all(record_set)

if not invariants.is_metadata_valid(normalized_record_set):
with pytest.raises(Exception):
coll.add(**normalized_record_set)
return
for batch in create_batches(api.max_batch_size, **record_set):
coll.add(*batch)
invariants.count(coll, cast(strategies.RecordSet, normalized_record_set))


@given(collection=collection_st)
@settings(deadline=None, max_examples=1)
def test_add_large_exceeding(api: API, collection: strategies.Collection) -> None:
api.reset()
record_set = create_large_recordset(
min_size=api.max_batch_size,
max_size=api.max_batch_size + int(api.max_batch_size * random.random()),
)
coll = api.create_collection(
name=collection.name,
metadata=collection.metadata,
embedding_function=collection.embedding_function,
)
normalized_record_set = invariants.wrap_all(record_set)

if not invariants.is_metadata_valid(normalized_record_set):
with pytest.raises(Exception):
coll.add(**normalized_record_set)
return
with pytest.raises(Exception) as e:
coll.add(**record_set)
assert "exceeds maximum batch size" in str(e.value)


# TODO: This test fails right now because the ids are not sorted by the input order
@pytest.mark.xfail(
reason="This is expected to fail right now. We should change the API to sort the \
Expand Down
18 changes: 18 additions & 0 deletions chromadb/test/test_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# type: ignore
import requests

import chromadb
from chromadb.api.fastapi import FastAPI
from chromadb.api.types import QueryResult
from chromadb.config import Settings
import chromadb.server.fastapi
Expand Down Expand Up @@ -164,6 +166,22 @@ def test_heartbeat(api):
assert heartbeat > datetime.now() - timedelta(seconds=10)


def test_max_batch_size(api):
print(api)
batch_size = api.max_batch_size
assert batch_size > 0


def test_pre_flight_checks(api):
if not isinstance(api, FastAPI):
pytest.skip("Not a FastAPI instance")

resp = requests.get(f"{api._api_url}/pre-flight-checks")
assert resp.status_code == 200
assert resp.json() is not None
assert "max_batch_size" in resp.json().keys()


batch_records = {
"embeddings": [[1.1, 2.3, 3.2], [1.2, 2.24, 3.2]],
"ids": ["https://example.com/1", "https://example.com/2"],
Expand Down
Loading