diff --git a/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py b/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py index b3be56f40..51d64e5e3 100644 --- a/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py +++ b/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py @@ -49,6 +49,44 @@ def get_batches_from_generator(iterable, n): class QdrantDocumentStore: + """ + QdrantDocumentStore is a Document Store for Qdrant. + It can be used with any Qdrant instance: in-memory, disk-persisted, Docker-based, + and Qdrant Cloud Cluster deployments. + + Usage example by creating an in-memory instance: + + ```python + from haystack.dataclasses.document import Document + from haystack_integrations.document_stores.qdrant import QdrantDocumentStore + + document_store = QdrantDocumentStore( + ":memory:", + recreate_index=True + ) + document_store.write_documents([ + Document(content="This is first", embedding=[0.0]*5), + Document(content="This is second", embedding=[0.1, 0.2, 0.3, 0.4, 0.5]) + ]) + ``` + + Usage example with Qdrant Cloud: + + ```python + from haystack.dataclasses.document import Document + from haystack_integrations.document_stores.qdrant import QdrantDocumentStore + + document_store = QdrantDocumentStore( + url="https://xxxxxx-xxxxx-xxxxx-xxxx-xxxxxxxxx.us-east.aws.cloud.qdrant.io:6333", + api_key="", + ) + document_store.write_documents([ + Document(content="This is first", embedding=[0.0]*5), + Document(content="This is second", embedding=[0.1, 0.2, 0.3, 0.4, 0.5]) + ]) + ``` + """ + SIMILARITY: ClassVar[Dict[str, str]] = { "cosine": rest.Distance.COSINE, "dot_product": rest.Distance.DOT, @@ -96,6 +134,98 @@ def __init__( scroll_size: int = 10_000, payload_fields_to_index: Optional[List[dict]] = None, ): + """ + :param location: + If `memory` - use in-memory Qdrant instance. + If `str` - use it as a URL parameter. + If `None` - use default values for host and port. + :param url: + Either host or str of `Optional[scheme], host, Optional[port], Optional[prefix]`. + :param port: + Port of the REST API interface. + :param grpc_port: + Port of the gRPC interface. + :param prefer_grpc: + If `True` - use gRPC interface whenever possible in custom methods. + :param https: + If `True` - use HTTPS(SSL) protocol. + :param api_key: + API key for authentication in Qdrant Cloud. + :param prefix: + If not `None` - add prefix to the REST URL path. + Example: service/v1 will result in http://localhost:6333/service/v1/{qdrant-endpoint} + for REST API. + :param timeout: + Timeout for REST and gRPC API requests. + :param host: + Host name of Qdrant service. If ùrl` and `host` are `None`, set to `localhost`. + :param path: + Persistence path for QdrantLocal. + :param force_disable_check_same_thread: + For QdrantLocal, force disable check_same_thread. + Only use this if you can guarantee that you can resolve the thread safety outside QdrantClient. + :param index: + Name of the index. + :param embedding_dim: + Dimension of the embeddings. + :param on_disk: + Whether to store the collection on disk. + :param content_field: + The field for the document content. + :param name_field: + The field for the document name. + :param embedding_field: + The field for the document embeddings. + :param use_sparse_embedding: + If set to `True`, enables support for sparse embeddings. + :param similarity: + The similarity metric to use. + :param return_embedding: + Whether to return embeddings in the search results. + :param progress_bar: + Whether to show a progress bar or not. + :param duplicate_documents: + The parameter is not used and will be removed in future release. + :param recreate_index: + Whether to recreate the index. + :param shard_number: + Number of shards in the collection. + :param replication_factor: + Replication factor for the collection. + Defines how many copies of each shard will be created. Effective only in distributed mode. + :param write_consistency_factor: + Write consistency factor for the collection. Minimum value is 1. + Defines how many replicas should apply to the operation for it to be considered successful. + Increasing this number makes the collection more resilient to inconsistencies + but will cause failures if not enough replicas are available. + Effective only in distributed mode. + :param on_disk_payload: + If `True`, the point's payload will not be stored in memory and + will be read from the disk every time it is requested. + This setting saves RAM by slightly increasing response time. + Note: indexed payload values remain in RAM. + :param hnsw_config: + Params for HNSW index. + :param optimizers_config: + Params for optimizer. + :param wal_config: + Params for Write-Ahead-Log. + :param quantization_config: + Params for quantization. If `None`, quantization will be disabled. + :param init_from: + Use data stored in another collection to initialize this collection. + :param wait_result_from_api: + Whether to wait for the result from the API after each request. + :param metadata: + Additional metadata to include with the documents. + :param write_batch_size: + The batch size for writing documents. + :param scroll_size: + The scroll size for reading documents. + :param payload_fields_to_index: + List of payload fields to index. + """ + self._client = None # Store the Qdrant client specific attributes @@ -172,6 +302,9 @@ def client(self): return self._client def count_documents(self) -> int: + """ + Returns the number of documents present in the Document Store. + """ try: response = self.client.count( collection_name=self.index, @@ -187,6 +320,15 @@ def filter_documents( self, filters: Optional[Union[Dict[str, Any], rest.Filter]] = None, ) -> List[Document]: + """ + Returns the documents that match the provided filters. + + For a detailed specification of the filters, refer to the + [documentation](https://docs.haystack.deepset.ai/docs/metadata-filtering) + + :param filters: The filters to apply to the document list. + :returns: A list of documents that match the given filters. + """ if filters and not isinstance(filters, dict) and not isinstance(filters, rest.Filter): msg = "Filter must be a dictionary or an instance of `qdrant_client.http.models.Filter`" raise ValueError(msg) @@ -204,6 +346,19 @@ def write_documents( documents: List[Document], policy: DuplicatePolicy = DuplicatePolicy.FAIL, ): + """ + Writes documents to Qdrant using the specified policy. + The QdrantDocumentStore can handle duplicate documents based on the given policy. + The available policies are: + - `FAIL`: The operation will raise an error if any document already exists. + - `OVERWRITE`: Existing documents will be overwritten with the new ones. + - `SKIP`: Existing documents will be skipped, and only new documents will be added. + + :param documents: A list of Document objects to write to Qdrant. + :param policy: The policy for handling duplicate documents. + + :returns: The number of documents written to the document store. + """ for doc in documents: if not isinstance(doc, Document): msg = f"DocumentStore.write_documents() expects a list of Documents but got an element of {type(doc)}." @@ -239,6 +394,11 @@ def write_documents( return len(document_objects) def delete_documents(self, ids: List[str]): + """ + Deletes documents that match the provided `document_ids` from the document store. + + :param document_ids: the document ids to delete + """ ids = [convert_id(_id) for _id in ids] try: self.client.delete( @@ -253,10 +413,24 @@ def delete_documents(self, ids: List[str]): @classmethod def from_dict(cls, data: Dict[str, Any]) -> "QdrantDocumentStore": + """ + Deserializes the component from a dictionary. + + :param data: + The dictionary to deserialize from. + :returns: + The deserialized component. + """ deserialize_secrets_inplace(data["init_parameters"], keys=["api_key"]) return default_from_dict(cls, data) def to_dict(self) -> Dict[str, Any]: + """ + Serializes the component to a dictionary. + + :returns: + Dictionary with serialized data. + """ params = inspect.signature(self.__init__).parameters # type: ignore # All the __init__ params must be set as attributes # Set as init_parms without default values @@ -271,6 +445,13 @@ def get_documents_generator( self, filters: Optional[Union[Dict[str, Any], rest.Filter]] = None, ) -> Generator[Document, None, None]: + """ + Returns a generator that yields documents from Qdrant based on the provided filters. + + :param filters: Filters applied to the retrieved documents. + :returns: A generator that yields documents retrieved from Qdrant. + """ + index = self.index qdrant_filters = convert_filters_to_qdrant(filters) @@ -299,6 +480,16 @@ def get_documents_by_id( ids: List[str], index: Optional[str] = None, ) -> List[Document]: + """ + Retrieves documents from Qdrant by their IDs. + + :param ids: + A list of document IDs to retrieve. + :param index: + The name of the index to retrieve documents from. + :returns: + A list of documents. + """ index = index or self.index documents: List[Document] = [] @@ -325,6 +516,21 @@ def _query_by_sparse( scale_score: bool = True, return_embedding: bool = False, ) -> List[Document]: + """ + Queries Qdrant using a sparse embedding and returns the most relevant documents. + + :param query_sparse_embedding: Sparse embedding of the query. + :param filters: Filters applied to the retrieved documents. + :param top_k: Maximum number of documents to return. + :param scale_score: Whether to scale the scores of the retrieved documents. + :param return_embedding: Whether to return the embeddings of the retrieved documents. + + :returns: List of documents that are most similar to `query_sparse_embedding`. + + :raises QdrantStoreError: + If the Document Store was initialized with `use_sparse_embeddings=False`. + """ + if not self.use_sparse_embeddings: message = ( "You are trying to query using sparse embeddings, but the Document Store " @@ -367,6 +573,17 @@ def _query_by_embedding( scale_score: bool = True, return_embedding: bool = False, ) -> List[Document]: + """ + Queries Qdrant using a dense embedding and returns the most relevant documents. + + :param query_embedding: Dense embedding of the query. + :param filters: Filters applied to the retrieved documents. + :param top_k: Maximum number of documents to return. + :param scale_score: Whether to scale the scores of the retrieved documents. + :param return_embedding: Whether to return the embeddings of the retrieved documents. + + :returns: List of documents that are most similar to `query_embedding`. + """ qdrant_filters = convert_filters_to_qdrant(filters) points = self.client.search( @@ -409,8 +626,8 @@ def _query_hybrid( :param query_embedding: Dense embedding of the query. :param query_sparse_embedding: Sparse embedding of the query. - :param filters: Filters applied to the retrieved Documents. - :param top_k: Maximum number of Documents to return. + :param filters: Filters applied to the retrieved documents. + :param top_k: Maximum number of documents to return. :param return_embedding: Whether to return the embeddings of the retrieved documents. :returns: List of Document that are most similar to `query_embedding` and `query_sparse_embedding`. @@ -474,6 +691,16 @@ def _query_hybrid( return results def get_distance(self, similarity: str) -> rest.Distance: + """ + Retrieves the distance metric for the specified similarity measure. + + :param similarity: + The similarity measure to retrieve the distance. + :returns: + The corresponding rest.Distance object. + :raises QdrantStoreError: + If the provided similarity measure is not supported. + """ try: return self.SIMILARITY[similarity] except KeyError as ke: @@ -507,6 +734,29 @@ def _set_up_collection( on_disk: bool = False, payload_fields_to_index: Optional[List[dict]] = None, ): + """ + Sets up the Qdrant collection with the specified parameters. + :param collection_name: + The name of the collection to set up. + :param embedding_dim: + The dimension of the embeddings. + :param recreate_collection: + Whether to recreate the collection if it already exists. + :param similarity: + The similarity measure to use. + :param use_sparse_embeddings: + Whether to use sparse embeddings. + :param on_disk: + Whether to store the collection on disk. + :param payload_fields_to_index: + List of payload fields to index. + + :raises QdrantStoreError: + If the collection exists with incompatible settings. + :raises ValueError: + If the collection exists with a different similarity measure or embedding dimension. + + """ distance = self.get_distance(similarity) if recreate_collection or not self.client.collection_exists(collection_name): @@ -576,6 +826,20 @@ def recreate_collection( on_disk: Optional[bool] = None, use_sparse_embeddings: Optional[bool] = None, ): + """ + Recreates the Qdrant collection with the specified parameters. + + :param collection_name: + The name of the collection to recreate. + :param distance: + The distance metric to use for the collection. + :param embedding_dim: + The dimension of the embeddings. + :param on_disk: + Whether to store the collection on disk. + :param use_sparse_embeddings: + Whether to use sparse embeddings. + """ if on_disk is None: on_disk = self.on_disk @@ -627,11 +891,11 @@ def _handle_duplicate_documents( :param documents: A list of Haystack Document objects. :param index: name of the index - :param duplicate_documents: Handle duplicates document based on parameter options. + :param duplicate_documents: Handle duplicate documents based on parameter options. Parameter options : ( 'skip','overwrite','fail') - skip (default option): Ignore the duplicates documents + skip (default option): Ignore the duplicates documents. overwrite: Update any existing documents with the same ID when adding documents. - fail: an error is raised if the document ID of the document being added already + fail: An error is raised if the document ID of the document being added already exists. :returns: A list of Haystack Document objects. """ @@ -652,10 +916,10 @@ def _handle_duplicate_documents( def _drop_duplicate_documents(self, documents: List[Document], index: Optional[str] = None) -> List[Document]: """ - Drop duplicates documents based on same hash ID + Drop duplicate documents based on same hash ID. :param documents: A list of Haystack Document objects. - :param index: name of the index + :param index: Name of the index. :returns: A list of Haystack Document objects. """ _hash_ids: Set = set()