Skip to content

Commit

Permalink
[ENH]: CIP-5: Large Batch Handling Improvements Proposal (#1077)
Browse files Browse the repository at this point in the history
- Including only CIP for review.

Refs: #1049

## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
	 - New proposal to handle large batches of embeddings gracefully

## Test plan
*How are these changes tested?*

- [ ] Tests pass locally with `pytest` for python, `yarn test` for js

## Documentation Changes
TBD

---------

Signed-off-by: sunilkumardash9 <[email protected]>
Co-authored-by: Sunil Kumar Dash <[email protected]>
  • Loading branch information
tazarov and sunilkumardash9 authored Sep 18, 2023
1 parent 2b434b8 commit 82b9c83
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 44 deletions.
7 changes: 7 additions & 0 deletions chromadb/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,3 +378,10 @@ def get_settings(self) -> Settings:
"""
pass

@property
@abstractmethod
def max_batch_size(self) -> int:
"""Return the maximum number of records that can be submitted in a single call
to submit_embeddings."""
pass
81 changes: 46 additions & 35 deletions chromadb/api/fastapi.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import logging
from typing import Optional, cast
from typing import Optional, cast, Tuple
from typing import Sequence
from uuid import UUID

Expand All @@ -23,6 +23,7 @@
GetResult,
QueryResult,
CollectionMetadata,
validate_batch,
)
from chromadb.auth import (
ClientAuthProvider,
Expand All @@ -38,6 +39,7 @@

class FastAPI(API):
_settings: Settings
_max_batch_size: int = -1

@staticmethod
def _validate_host(host: str) -> None:
Expand Down Expand Up @@ -296,6 +298,29 @@ def _delete(
raise_chroma_error(resp)
return cast(IDs, resp.json())

def _submit_batch(
self,
batch: Tuple[
IDs, Optional[Embeddings], Optional[Metadatas], Optional[Documents]
],
url: str,
) -> requests.Response:
"""
Submits a batch of embeddings to the database
"""
resp = self._session.post(
self._api_url + url,
data=json.dumps(
{
"ids": batch[0],
"embeddings": batch[1],
"metadatas": batch[2],
"documents": batch[3],
}
),
)
return resp

@override
def _add(
self,
Expand All @@ -309,18 +334,9 @@ def _add(
Adds a batch of embeddings to the database
- pass in column oriented data lists
"""
resp = self._session.post(
self._api_url + "/collections/" + str(collection_id) + "/add",
data=json.dumps(
{
"ids": ids,
"embeddings": embeddings,
"metadatas": metadatas,
"documents": documents,
}
),
)

batch = (ids, embeddings, metadatas, documents)
validate_batch(batch, {"max_batch_size": self.max_batch_size})
resp = self._submit_batch(batch, "/collections/" + str(collection_id) + "/add")
raise_chroma_error(resp)
return True

Expand All @@ -337,18 +353,11 @@ def _update(
Updates a batch of embeddings in the database
- pass in column oriented data lists
"""
resp = self._session.post(
self._api_url + "/collections/" + str(collection_id) + "/update",
data=json.dumps(
{
"ids": ids,
"embeddings": embeddings,
"metadatas": metadatas,
"documents": documents,
}
),
batch = (ids, embeddings, metadatas, documents)
validate_batch(batch, {"max_batch_size": self.max_batch_size})
resp = self._submit_batch(
batch, "/collections/" + str(collection_id) + "/update"
)

resp.raise_for_status()
return True

Expand All @@ -365,18 +374,11 @@ def _upsert(
Upserts a batch of embeddings in the database
- pass in column oriented data lists
"""
resp = self._session.post(
self._api_url + "/collections/" + str(collection_id) + "/upsert",
data=json.dumps(
{
"ids": ids,
"embeddings": embeddings,
"metadatas": metadatas,
"documents": documents,
}
),
batch = (ids, embeddings, metadatas, documents)
validate_batch(batch, {"max_batch_size": self.max_batch_size})
resp = self._submit_batch(
batch, "/collections/" + str(collection_id) + "/upsert"
)

resp.raise_for_status()
return True

Expand Down Expand Up @@ -434,6 +436,15 @@ def get_settings(self) -> Settings:
"""Returns the settings of the client"""
return self._settings

@property
@override
def max_batch_size(self) -> int:
if self._max_batch_size == -1:
resp = self._session.get(self._api_url + "/pre-flight-checks")
raise_chroma_error(resp)
self._max_batch_size = cast(int, resp.json()["max_batch_size"])
return self._max_batch_size


def raise_chroma_error(resp: requests.Response) -> None:
"""Raises an error if the response is not ok, using a ChromaError if possible"""
Expand Down
46 changes: 40 additions & 6 deletions chromadb/api/segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
validate_update_metadata,
validate_where,
validate_where_document,
validate_batch,
)
from chromadb.telemetry.events import CollectionAddEvent, CollectionDeleteEvent

Expand All @@ -38,6 +39,7 @@
import logging
import re


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -241,9 +243,18 @@ def _add(
) -> bool:
coll = self._get_collection(collection_id)
self._manager.hint_use_collection(collection_id, t.Operation.ADD)

validate_batch(
(ids, embeddings, metadatas, documents),
{"max_batch_size": self.max_batch_size},
)
records_to_submit = []
for r in _records(t.Operation.ADD, ids, embeddings, metadatas, documents):
for r in _records(
t.Operation.ADD,
ids=ids,
embeddings=embeddings,
metadatas=metadatas,
documents=documents,
):
self._validate_embedding_record(coll, r)
records_to_submit.append(r)
self._producer.submit_embeddings(coll["topic"], records_to_submit)
Expand All @@ -262,9 +273,18 @@ def _update(
) -> bool:
coll = self._get_collection(collection_id)
self._manager.hint_use_collection(collection_id, t.Operation.UPDATE)

validate_batch(
(ids, embeddings, metadatas, documents),
{"max_batch_size": self.max_batch_size},
)
records_to_submit = []
for r in _records(t.Operation.UPDATE, ids, embeddings, metadatas, documents):
for r in _records(
t.Operation.UPDATE,
ids=ids,
embeddings=embeddings,
metadatas=metadatas,
documents=documents,
):
self._validate_embedding_record(coll, r)
records_to_submit.append(r)
self._producer.submit_embeddings(coll["topic"], records_to_submit)
Expand All @@ -282,9 +302,18 @@ def _upsert(
) -> bool:
coll = self._get_collection(collection_id)
self._manager.hint_use_collection(collection_id, t.Operation.UPSERT)

validate_batch(
(ids, embeddings, metadatas, documents),
{"max_batch_size": self.max_batch_size},
)
records_to_submit = []
for r in _records(t.Operation.UPSERT, ids, embeddings, metadatas, documents):
for r in _records(
t.Operation.UPSERT,
ids=ids,
embeddings=embeddings,
metadatas=metadatas,
documents=documents,
):
self._validate_embedding_record(coll, r)
records_to_submit.append(r)
self._producer.submit_embeddings(coll["topic"], records_to_submit)
Expand Down Expand Up @@ -524,6 +553,11 @@ def reset(self) -> bool:
def get_settings(self) -> Settings:
return self._settings

@property
@override
def max_batch_size(self) -> int:
return self._producer.max_batch_size

def _topic(self, collection_id: UUID) -> str:
return f"persistent://{self._tenant_id}/{self._topic_ns}/{collection_id}"

Expand Down
12 changes: 11 additions & 1 deletion chromadb/api/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional, Union, Sequence, TypeVar, List, Dict, Any
from typing import Optional, Union, Sequence, TypeVar, List, Dict, Any, Tuple
from typing_extensions import Literal, TypedDict, Protocol
import chromadb.errors as errors
from chromadb.types import (
Expand Down Expand Up @@ -367,3 +367,13 @@ def validate_embeddings(embeddings: Embeddings) -> Embeddings:
f"Expected each value in the embedding to be a int or float, got {embeddings}"
)
return embeddings


def validate_batch(
batch: Tuple[IDs, Optional[Embeddings], Optional[Metadatas], Optional[Documents]],
limits: Dict[str, Any],
) -> None:
if len(batch[0]) > limits["max_batch_size"]:
raise ValueError(
f"Batch size {len(batch[0])} exceeds maximum batch size {limits['max_batch_size']}"
)
8 changes: 8 additions & 0 deletions chromadb/server/fastapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ def __init__(self, settings: Settings):
self.router.add_api_route("/api/v1/reset", self.reset, methods=["POST"])
self.router.add_api_route("/api/v1/version", self.version, methods=["GET"])
self.router.add_api_route("/api/v1/heartbeat", self.heartbeat, methods=["GET"])
self.router.add_api_route(
"/api/v1/pre-flight-checks", self.pre_flight_checks, methods=["GET"]
)

self.router.add_api_route(
"/api/v1/collections",
Expand Down Expand Up @@ -312,3 +315,8 @@ def get_nearest_neighbors(
include=query.include,
)
return nnresult

def pre_flight_checks(self) -> Dict[str, Any]:
return {
"max_batch_size": self._api.max_batch_size,
}
81 changes: 79 additions & 2 deletions chromadb/test/property/test_add.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from typing import cast
import random
import uuid
from random import randint
from typing import cast, List, Any, Dict
import pytest
import hypothesis.strategies as st
from hypothesis import given, settings
from chromadb.api import API
from chromadb.api.types import Embeddings
from chromadb.api.types import Embeddings, Metadatas
import chromadb.test.property.strategies as strategies
import chromadb.test.property.invariants as invariants
from chromadb.utils.batch_utils import create_batches

collection_st = st.shared(strategies.collections(with_hnsw_params=True), key="coll")

Expand Down Expand Up @@ -44,6 +48,79 @@ def test_add(
)


def create_large_recordset(
min_size: int = 45000,
max_size: int = 50000,
) -> strategies.RecordSet:
size = randint(min_size, max_size)

ids = [str(uuid.uuid4()) for _ in range(size)]
metadatas = [{"some_key": f"{i}"} for i in range(size)]
documents = [f"Document {i}" for i in range(size)]
embeddings = [[1, 2, 3] for _ in range(size)]
record_set: Dict[str, List[Any]] = {
"ids": ids,
"embeddings": cast(Embeddings, embeddings),
"metadatas": metadatas,
"documents": documents,
}
return record_set


@given(collection=collection_st)
@settings(deadline=None, max_examples=1)
def test_add_large(api: API, collection: strategies.Collection) -> None:
api.reset()
record_set = create_large_recordset(
min_size=api.max_batch_size,
max_size=api.max_batch_size + int(api.max_batch_size * random.random()),
)
coll = api.create_collection(
name=collection.name,
metadata=collection.metadata,
embedding_function=collection.embedding_function,
)
normalized_record_set = invariants.wrap_all(record_set)

if not invariants.is_metadata_valid(normalized_record_set):
with pytest.raises(Exception):
coll.add(**normalized_record_set)
return
for batch in create_batches(
api=api,
ids=cast(List[str], record_set["ids"]),
embeddings=cast(Embeddings, record_set["embeddings"]),
metadatas=cast(Metadatas, record_set["metadatas"]),
documents=cast(List[str], record_set["documents"]),
):
coll.add(*batch)
invariants.count(coll, cast(strategies.RecordSet, normalized_record_set))


@given(collection=collection_st)
@settings(deadline=None, max_examples=1)
def test_add_large_exceeding(api: API, collection: strategies.Collection) -> None:
api.reset()
record_set = create_large_recordset(
min_size=api.max_batch_size,
max_size=api.max_batch_size + int(api.max_batch_size * random.random()),
)
coll = api.create_collection(
name=collection.name,
metadata=collection.metadata,
embedding_function=collection.embedding_function,
)
normalized_record_set = invariants.wrap_all(record_set)

if not invariants.is_metadata_valid(normalized_record_set):
with pytest.raises(Exception):
coll.add(**normalized_record_set)
return
with pytest.raises(Exception) as e:
coll.add(**record_set)
assert "exceeds maximum batch size" in str(e.value)


# TODO: This test fails right now because the ids are not sorted by the input order
@pytest.mark.xfail(
reason="This is expected to fail right now. We should change the API to sort the \
Expand Down
Loading

0 comments on commit 82b9c83

Please sign in to comment.