diff --git a/dev-requirements.txt b/dev-requirements.txt index c42af4eab..330cb2701 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -15,10 +15,13 @@ twine build nox -numpy pandas orjson +# mmr for vectorstore +numpy +simsimd + # Testing the 'search_mvt' API response mapbox-vector-tile # Python 3.7 gets an old version of mapbox-vector-tile, requiring an diff --git a/elasticsearch/helpers/vectorstore/__init__.py b/elasticsearch/helpers/vectorstore/__init__.py new file mode 100644 index 000000000..30a4c3d6e --- /dev/null +++ b/elasticsearch/helpers/vectorstore/__init__.py @@ -0,0 +1,62 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from elasticsearch.helpers.vectorstore._async.embedding_service import ( + AsyncElasticsearchEmbeddings, + AsyncEmbeddingService, +) +from elasticsearch.helpers.vectorstore._async.strategies import ( + AsyncBM25Strategy, + AsyncDenseVectorScriptScoreStrategy, + AsyncDenseVectorStrategy, + AsyncRetrievalStrategy, + AsyncSparseVectorStrategy, +) +from elasticsearch.helpers.vectorstore._async.vectorstore import AsyncVectorStore +from elasticsearch.helpers.vectorstore._sync.embedding_service import ( + ElasticsearchEmbeddings, + EmbeddingService, +) +from elasticsearch.helpers.vectorstore._sync.strategies import ( + BM25Strategy, + DenseVectorScriptScoreStrategy, + DenseVectorStrategy, + RetrievalStrategy, + SparseVectorStrategy, +) +from elasticsearch.helpers.vectorstore._sync.vectorstore import VectorStore +from elasticsearch.helpers.vectorstore._utils import DistanceMetric + +__all__ = [ + "AsyncBM25Strategy", + "AsyncDenseVectorScriptScoreStrategy", + "AsyncDenseVectorStrategy", + "AsyncElasticsearchEmbeddings", + "AsyncEmbeddingService", + "AsyncRetrievalStrategy", + "AsyncSparseVectorStrategy", + "AsyncVectorStore", + "BM25Strategy", + "DenseVectorScriptScoreStrategy", + "DenseVectorStrategy", + "DistanceMetric", + "ElasticsearchEmbeddings", + "EmbeddingService", + "RetrievalStrategy", + "SparseVectorStrategy", + "VectorStore", +] diff --git a/elasticsearch/helpers/vectorstore/_async/__init__.py b/elasticsearch/helpers/vectorstore/_async/__init__.py new file mode 100644 index 000000000..2a87d183f --- /dev/null +++ b/elasticsearch/helpers/vectorstore/_async/__init__.py @@ -0,0 +1,16 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/elasticsearch/helpers/vectorstore/_async/_utils.py b/elasticsearch/helpers/vectorstore/_async/_utils.py new file mode 100644 index 000000000..67b6b6a27 --- /dev/null +++ b/elasticsearch/helpers/vectorstore/_async/_utils.py @@ -0,0 +1,39 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from elasticsearch import AsyncElasticsearch, BadRequestError, NotFoundError + + +async def model_must_be_deployed(client: AsyncElasticsearch, model_id: str) -> None: + """ + :raises [NotFoundError]: if the model is neither downloaded nor deployed. + :raises [ConflictError]: if the model is downloaded but not yet deployed. + """ + doc = {"text_field": f"test if the model '{model_id}' is deployed"} + try: + await client.ml.infer_trained_model(model_id=model_id, docs=[doc]) + except BadRequestError: + # The model is deployed but expects a different input field name. + pass + + +async def model_is_deployed(client: AsyncElasticsearch, model_id: str) -> bool: + try: + await model_must_be_deployed(client, model_id) + return True + except NotFoundError: + return False diff --git a/elasticsearch/helpers/vectorstore/_async/embedding_service.py b/elasticsearch/helpers/vectorstore/_async/embedding_service.py new file mode 100644 index 000000000..20005b665 --- /dev/null +++ b/elasticsearch/helpers/vectorstore/_async/embedding_service.py @@ -0,0 +1,89 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from abc import ABC, abstractmethod +from typing import List + +from elasticsearch import AsyncElasticsearch +from elasticsearch._version import __versionstr__ as lib_version + + +class AsyncEmbeddingService(ABC): + @abstractmethod + async def embed_documents(self, texts: List[str]) -> List[List[float]]: + """Generate embeddings for a list of documents. + + :param texts: A list of document strings to generate embeddings for. + + :return: A list of embeddings, one for each document in the input. + """ + + @abstractmethod + async def embed_query(self, query: str) -> List[float]: + """Generate an embedding for a single query text. + + :param text: The query text to generate an embedding for. + + :return: The embedding for the input query text. + """ + + +class AsyncElasticsearchEmbeddings(AsyncEmbeddingService): + """Elasticsearch as a service for embedding model inference. + + You need to have an embedding model downloaded and deployed in Elasticsearch: + - https://www.elastic.co/guide/en/elasticsearch/reference/current/infer-trained-model.html + - https://www.elastic.co/guide/en/machine-learning/current/ml-nlp-deploy-models.html + """ # noqa: E501 + + def __init__( + self, + *, + client: AsyncElasticsearch, + model_id: str, + input_field: str = "text_field", + user_agent: str = f"elasticsearch-py-es/{lib_version}", + ): + """ + :param agent_header: user agent header specific to the 3rd party integration. + Used for usage tracking in Elastic Cloud. + :param model_id: The model_id of the model deployed in the Elasticsearch cluster. + :param input_field: The name of the key for the input text field in the + document. Defaults to 'text_field'. + :param client: Elasticsearch client connection. Alternatively specify the + Elasticsearch connection with the other es_* parameters. + """ + # Add integration-specific usage header for tracking usage in Elastic Cloud. + # client.options preserves existing (non-user-agent) headers. + client = client.options(headers={"User-Agent": user_agent}) + + self.client = client + self.model_id = model_id + self.input_field = input_field + + async def embed_documents(self, texts: List[str]) -> List[List[float]]: + return await self._embedding_func(texts) + + async def embed_query(self, text: str) -> List[float]: + result = await self._embedding_func([text]) + return result[0] + + async def _embedding_func(self, texts: List[str]) -> List[List[float]]: + response = await self.client.ml.infer_trained_model( + model_id=self.model_id, docs=[{self.input_field: text} for text in texts] + ) + return [doc["predicted_value"] for doc in response["inference_results"]] diff --git a/elasticsearch/helpers/vectorstore/_async/strategies.py b/elasticsearch/helpers/vectorstore/_async/strategies.py new file mode 100644 index 000000000..a7f813f43 --- /dev/null +++ b/elasticsearch/helpers/vectorstore/_async/strategies.py @@ -0,0 +1,466 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from abc import ABC, abstractmethod +from typing import Any, Dict, List, Optional, Tuple, Union, cast + +from elasticsearch import AsyncElasticsearch +from elasticsearch.helpers.vectorstore._async._utils import model_must_be_deployed +from elasticsearch.helpers.vectorstore._utils import DistanceMetric + + +class AsyncRetrievalStrategy(ABC): + @abstractmethod + def es_query( + self, + *, + query: Optional[str], + query_vector: Optional[List[float]], + text_field: str, + vector_field: str, + k: int, + num_candidates: int, + filter: List[Dict[str, Any]] = [], + ) -> Dict[str, Any]: + """ + Returns the Elasticsearch query body for the given parameters. + The store will execute the query. + + :param query: The text query. Can be None if query_vector is given. + :param k: The total number of results to retrieve. + :param num_candidates: The number of results to fetch initially in knn search. + :param filter: List of filter clauses to apply to the query. + :param query_vector: The query vector. Can be None if a query string is given. + + :return: The Elasticsearch query body. + """ + + @abstractmethod + def es_mappings_settings( + self, + *, + text_field: str, + vector_field: str, + num_dimensions: Optional[int], + ) -> Tuple[Dict[str, Any], Dict[str, Any]]: + """ + Create the required index and do necessary preliminary work, like + creating inference pipelines or checking if a required model was deployed. + + :param client: Elasticsearch client connection. + :param text_field: The field containing the text data in the index. + :param vector_field: The field containing the vector representations in the index. + :param num_dimensions: If vectors are indexed, how many dimensions do they have. + + :return: Dictionary with field and field type pairs that describe the schema. + """ + + async def before_index_creation( + self, *, client: AsyncElasticsearch, text_field: str, vector_field: str + ) -> None: + """ + Executes before the index is created. Used for setting up + any required Elasticsearch resources like a pipeline. + Defaults to a no-op. + + :param client: The Elasticsearch client. + :param text_field: The field containing the text data in the index. + :param vector_field: The field containing the vector representations in the index. + """ + pass + + def needs_inference(self) -> bool: + """ + Some retrieval strategies index embedding vectors and allow search by embedding + vector, for example the `DenseVectorStrategy` strategy. Mapping a user input query + string to an embedding vector is called inference. Inference can be applied + in Elasticsearch (using a `model_id`) or outside of Elasticsearch (using an + `EmbeddingService` defined on the `VectorStore`). In the latter case, + this method has to return True. + """ + return False + + +class AsyncSparseVectorStrategy(AsyncRetrievalStrategy): + """Sparse retrieval strategy using the `text_expansion` processor.""" + + def __init__(self, model_id: str = ".elser_model_2"): + self.model_id = model_id + self._tokens_field = "tokens" + self._pipeline_name = f"{self.model_id}_sparse_embedding" + + def es_query( + self, + *, + query: Optional[str], + query_vector: Optional[List[float]], + text_field: str, + vector_field: str, + k: int, + num_candidates: int, + filter: List[Dict[str, Any]] = [], + ) -> Dict[str, Any]: + if query_vector: + raise ValueError( + "Cannot do sparse retrieval with a query_vector. " + "Inference is currently always applied in Elasticsearch." + ) + if query is None: + raise ValueError("please specify a query string") + + return { + "query": { + "bool": { + "must": [ + { + "text_expansion": { + f"{vector_field}.{self._tokens_field}": { + "model_id": self.model_id, + "model_text": query, + } + } + } + ], + "filter": filter, + } + } + } + + def es_mappings_settings( + self, + *, + text_field: str, + vector_field: str, + num_dimensions: Optional[int], + ) -> Tuple[Dict[str, Any], Dict[str, Any]]: + mappings: Dict[str, Any] = { + "properties": { + vector_field: { + "properties": {self._tokens_field: {"type": "rank_features"}} + } + } + } + settings = {"default_pipeline": self._pipeline_name} + + return mappings, settings + + async def before_index_creation( + self, *, client: AsyncElasticsearch, text_field: str, vector_field: str + ) -> None: + if self.model_id: + await model_must_be_deployed(client, self.model_id) + + # Create a pipeline for the model + await client.ingest.put_pipeline( + id=self._pipeline_name, + description="Embedding pipeline for Python VectorStore", + processors=[ + { + "inference": { + "model_id": self.model_id, + "target_field": vector_field, + "field_map": {text_field: "text_field"}, + "inference_config": { + "text_expansion": {"results_field": self._tokens_field} + }, + } + } + ], + ) + + +class AsyncDenseVectorStrategy(AsyncRetrievalStrategy): + """K-nearest-neighbors retrieval.""" + + def __init__( + self, + *, + distance: DistanceMetric = DistanceMetric.COSINE, + model_id: Optional[str] = None, + hybrid: bool = False, + rrf: Union[bool, Dict[str, Any]] = True, + text_field: Optional[str] = "text_field", + ): + if hybrid and not text_field: + raise ValueError( + "to enable hybrid you have to specify a text_field (for BM25Strategy matching)" + ) + + self.distance = distance + self.model_id = model_id + self.hybrid = hybrid + self.rrf = rrf + self.text_field = text_field + + def es_query( + self, + *, + query: Optional[str], + query_vector: Optional[List[float]], + text_field: str, + vector_field: str, + k: int, + num_candidates: int, + filter: List[Dict[str, Any]] = [], + ) -> Dict[str, Any]: + knn = { + "filter": filter, + "field": vector_field, + "k": k, + "num_candidates": num_candidates, + } + + if query_vector is not None: + knn["query_vector"] = query_vector + else: + # Inference in Elasticsearch. When initializing we make sure to always have + # a model_id if don't have an embedding_service. + knn["query_vector_builder"] = { + "text_embedding": { + "model_id": self.model_id, + "model_text": query, + } + } + + if self.hybrid: + return self._hybrid(query=cast(str, query), knn=knn, filter=filter) + + return {"knn": knn} + + def es_mappings_settings( + self, + *, + text_field: str, + vector_field: str, + num_dimensions: Optional[int], + ) -> Tuple[Dict[str, Any], Dict[str, Any]]: + if self.distance is DistanceMetric.COSINE: + similarity = "cosine" + elif self.distance is DistanceMetric.EUCLIDEAN_DISTANCE: + similarity = "l2_norm" + elif self.distance is DistanceMetric.DOT_PRODUCT: + similarity = "dot_product" + elif self.distance is DistanceMetric.MAX_INNER_PRODUCT: + similarity = "max_inner_product" + else: + raise ValueError(f"Similarity {self.distance} not supported.") + + mappings: Dict[str, Any] = { + "properties": { + vector_field: { + "type": "dense_vector", + "dims": num_dimensions, + "index": True, + "similarity": similarity, + }, + } + } + + return mappings, {} + + async def before_index_creation( + self, *, client: AsyncElasticsearch, text_field: str, vector_field: str + ) -> None: + if self.model_id: + await model_must_be_deployed(client, self.model_id) + + def _hybrid( + self, query: str, knn: Dict[str, Any], filter: List[Dict[str, Any]] + ) -> Dict[str, Any]: + # Add a query to the knn query. + # RRF is used to even the score from the knn query and text query + # RRF has two optional parameters: {'rank_constant':int, 'window_size':int} + # https://www.elastic.co/guide/en/elasticsearch/reference/current/rrf.html + query_body = { + "knn": knn, + "query": { + "bool": { + "must": [ + { + "match": { + self.text_field: { + "query": query, + } + } + } + ], + "filter": filter, + } + }, + } + + if isinstance(self.rrf, Dict): + query_body["rank"] = {"rrf": self.rrf} + elif isinstance(self.rrf, bool) and self.rrf is True: + query_body["rank"] = {"rrf": {}} + + return query_body + + def needs_inference(self) -> bool: + return not self.model_id + + +class AsyncDenseVectorScriptScoreStrategy(AsyncRetrievalStrategy): + """Exact nearest neighbors retrieval using the `script_score` query.""" + + def __init__(self, distance: DistanceMetric = DistanceMetric.COSINE) -> None: + self.distance = distance + + def es_query( + self, + *, + query: Optional[str], + query_vector: Optional[List[float]], + text_field: str, + vector_field: str, + k: int, + num_candidates: int, + filter: List[Dict[str, Any]] = [], + ) -> Dict[str, Any]: + if not query_vector: + raise ValueError("specify a query_vector") + + if self.distance is DistanceMetric.COSINE: + similarity_algo = ( + f"cosineSimilarity(params.query_vector, '{vector_field}') + 1.0" + ) + elif self.distance is DistanceMetric.EUCLIDEAN_DISTANCE: + similarity_algo = f"1 / (1 + l2norm(params.query_vector, '{vector_field}'))" + elif self.distance is DistanceMetric.DOT_PRODUCT: + similarity_algo = f""" + double value = dotProduct(params.query_vector, '{vector_field}'); + return sigmoid(1, Math.E, -value); + """ + elif self.distance is DistanceMetric.MAX_INNER_PRODUCT: + similarity_algo = f""" + double value = dotProduct(params.query_vector, '{vector_field}'); + if (dotProduct < 0) {{ + return 1 / (1 + -1 * dotProduct); + }} + return dotProduct + 1; + """ + else: + raise ValueError(f"Similarity {self.distance} not supported.") + + query_bool: Dict[str, Any] = {"match_all": {}} + if filter: + query_bool = {"bool": {"filter": filter}} + + return { + "query": { + "script_score": { + "query": query_bool, + "script": { + "source": similarity_algo, + "params": {"query_vector": query_vector}, + }, + }, + } + } + + def es_mappings_settings( + self, + *, + text_field: str, + vector_field: str, + num_dimensions: Optional[int], + ) -> Tuple[Dict[str, Any], Dict[str, Any]]: + mappings = { + "properties": { + vector_field: { + "type": "dense_vector", + "dims": num_dimensions, + "index": False, + } + } + } + + return mappings, {} + + def needs_inference(self) -> bool: + return True + + +class AsyncBM25Strategy(AsyncRetrievalStrategy): + def __init__( + self, + k1: Optional[float] = None, + b: Optional[float] = None, + ): + self.k1 = k1 + self.b = b + + def es_query( + self, + *, + query: Optional[str], + query_vector: Optional[List[float]], + text_field: str, + vector_field: str, + k: int, + num_candidates: int, + filter: List[Dict[str, Any]] = [], + ) -> Dict[str, Any]: + return { + "query": { + "bool": { + "must": [ + { + "match": { + text_field: { + "query": query, + } + }, + }, + ], + "filter": filter, + }, + }, + } + + def es_mappings_settings( + self, + *, + text_field: str, + vector_field: str, + num_dimensions: Optional[int], + ) -> Tuple[Dict[str, Any], Dict[str, Any]]: + similarity_name = "custom_bm25" + + mappings: Dict[str, Any] = { + "properties": { + text_field: { + "type": "text", + "similarity": similarity_name, + }, + }, + } + + bm25: Dict[str, Any] = { + "type": "BM25", + } + if self.k1 is not None: + bm25["k1"] = self.k1 + if self.b is not None: + bm25["b"] = self.b + settings = { + "similarity": { + similarity_name: bm25, + } + } + + return mappings, settings diff --git a/elasticsearch/helpers/vectorstore/_async/vectorstore.py b/elasticsearch/helpers/vectorstore/_async/vectorstore.py new file mode 100644 index 000000000..b79e2dcaf --- /dev/null +++ b/elasticsearch/helpers/vectorstore/_async/vectorstore.py @@ -0,0 +1,391 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +import uuid +from typing import Any, Callable, Dict, List, Optional + +from elasticsearch import AsyncElasticsearch +from elasticsearch._version import __versionstr__ as lib_version +from elasticsearch.helpers import BulkIndexError, async_bulk +from elasticsearch.helpers.vectorstore import ( + AsyncEmbeddingService, + AsyncRetrievalStrategy, +) +from elasticsearch.helpers.vectorstore._utils import maximal_marginal_relevance + +logger = logging.getLogger(__name__) + + +class AsyncVectorStore: + """ + VectorStore is a higher-level abstraction of indexing and search. + Users can pick from available retrieval strategies. + + Documents have up to 3 fields: + - text_field: the text to be indexed and searched. + - metadata: additional information about the document, either schema-free + or defined by the supplied metadata_mappings. + - vector_field (usually not filled by the user): the embedding vector of the text. + + Depending on the strategy, vector embeddings are + - created by the user beforehand + - created by this AsyncVectorStore class in Python + - created in-stack by inference pipelines. + """ + + def __init__( + self, + client: AsyncElasticsearch, + *, + index: str, + retrieval_strategy: AsyncRetrievalStrategy, + embedding_service: Optional[AsyncEmbeddingService] = None, + num_dimensions: Optional[int] = None, + text_field: str = "text_field", + vector_field: str = "vector_field", + metadata_mappings: Optional[Dict[str, Any]] = None, + user_agent: str = f"elasticsearch-py-vs/{lib_version}", + ) -> None: + """ + :param user_header: user agent header specific to the 3rd party integration. + Used for usage tracking in Elastic Cloud. + :param index: The name of the index to query. + :param retrieval_strategy: how to index and search the data. See the strategies + module for availble strategies. + :param text_field: Name of the field with the textual data. + :param vector_field: For strategies that perform embedding inference in Python, + the embedding vector goes in this field. + :param client: Elasticsearch client connection. Alternatively specify the + Elasticsearch connection with the other es_* parameters. + """ + # Add integration-specific usage header for tracking usage in Elastic Cloud. + # client.options preserves existing (non-user-agent) headers. + client = client.options(headers={"User-Agent": user_agent}) + + if hasattr(retrieval_strategy, "text_field"): + retrieval_strategy.text_field = text_field + if hasattr(retrieval_strategy, "vector_field"): + retrieval_strategy.vector_field = vector_field + + self.client = client + self.index = index + self.retrieval_strategy = retrieval_strategy + self.embedding_service = embedding_service + self.num_dimensions = num_dimensions + self.text_field = text_field + self.vector_field = vector_field + self.metadata_mappings = metadata_mappings + + async def close(self) -> None: + return await self.client.close() + + async def add_texts( + self, + texts: List[str], + *, + metadatas: Optional[List[Dict[str, Any]]] = None, + vectors: Optional[List[List[float]]] = None, + ids: Optional[List[str]] = None, + refresh_indices: bool = True, + create_index_if_not_exists: bool = True, + bulk_kwargs: Optional[Dict[str, Any]] = None, + ) -> List[str]: + """Add documents to the Elasticsearch index. + + :param texts: List of text documents. + :param metadata: Optional list of document metadata. Must be of same length as + texts. + :param vectors: Optional list of embedding vectors. Must be of same length as + texts. + :param ids: Optional list of ID strings. Must be of same length as texts. + :param refresh_indices: Whether to refresh the index after deleting documents. + Defaults to True. + :param create_index_if_not_exists: Whether to create the index if it does not + exist. Defaults to True. + :param bulk_kwargs: Arguments to pass to the bulk function when indexing + (for example chunk_size). + + :return: List of IDs of the created documents, either echoing the provided one + or returning newly created ones. + """ + bulk_kwargs = bulk_kwargs or {} + ids = ids or [str(uuid.uuid4()) for _ in texts] + requests = [] + + if create_index_if_not_exists: + await self._create_index_if_not_exists() + + if self.embedding_service and not vectors: + vectors = await self.embedding_service.embed_documents(texts) + + for i, text in enumerate(texts): + metadata = metadatas[i] if metadatas else {} + + request: Dict[str, Any] = { + "_op_type": "index", + "_index": self.index, + self.text_field: text, + "metadata": metadata, + "_id": ids[i], + } + + if vectors: + request[self.vector_field] = vectors[i] + + requests.append(request) + + if len(requests) > 0: + try: + success, failed = await async_bulk( + self.client, + requests, + stats_only=True, + refresh=refresh_indices, + **bulk_kwargs, + ) + logger.debug(f"added texts {ids} to index") + return ids + except BulkIndexError as e: + logger.error(f"Error adding texts: {e}") + firstError = e.errors[0].get("index", {}).get("error", {}) + logger.error(f"First error reason: {firstError.get('reason')}") + raise e + + else: + logger.debug("No texts to add to index") + return [] + + async def delete( # type: ignore[no-untyped-def] + self, + *, + ids: Optional[List[str]] = None, + query: Optional[Dict[str, Any]] = None, + refresh_indices: bool = True, + **delete_kwargs, + ) -> bool: + """Delete documents from the Elasticsearch index. + + :param ids: List of IDs of documents to delete. + :param refresh_indices: Whether to refresh the index after deleting documents. + Defaults to True. + + :return: True if deletion was successful. + """ + if ids is not None and query is not None: + raise ValueError("one of ids or query must be specified") + elif ids is None and query is None: + raise ValueError("either specify ids or query") + + try: + if ids: + body = [ + {"_op_type": "delete", "_index": self.index, "_id": _id} + for _id in ids + ] + await async_bulk( + self.client, + body, + refresh=refresh_indices, + ignore_status=404, + **delete_kwargs, + ) + logger.debug(f"Deleted {len(body)} texts from index") + + else: + await self.client.delete_by_query( + index=self.index, + query=query, + refresh=refresh_indices, + **delete_kwargs, + ) + + except BulkIndexError as e: + logger.error(f"Error deleting texts: {e}") + firstError = e.errors[0].get("index", {}).get("error", {}) + logger.error(f"First error reason: {firstError.get('reason')}") + raise e + + return True + + async def search( + self, + *, + query: Optional[str], + query_vector: Optional[List[float]] = None, + k: int = 4, + num_candidates: int = 50, + fields: Optional[List[str]] = None, + filter: Optional[List[Dict[str, Any]]] = None, + custom_query: Optional[ + Callable[[Dict[str, Any], Optional[str]], Dict[str, Any]] + ] = None, + ) -> List[Dict[str, Any]]: + """ + :param query: Input query string. + :param query_vector: Input embedding vector. If given, input query string is + ignored. + :param k: Number of returned results. + :param num_candidates: Number of candidates to fetch from data nodes in knn. + :param fields: List of field names to return. + :param filter: Elasticsearch filters to apply. + :param custom_query: Function to modify the Elasticsearch query body before it is + sent to Elasticsearch. + + :return: List of document hits. Includes _index, _id, _score and _source. + """ + if fields is None: + fields = [] + if "metadata" not in fields: + fields.append("metadata") + if self.text_field not in fields: + fields.append(self.text_field) + + if self.embedding_service and not query_vector: + if not query: + raise ValueError("specify a query or a query_vector to search") + query_vector = await self.embedding_service.embed_query(query) + + query_body = self.retrieval_strategy.es_query( + query=query, + query_vector=query_vector, + text_field=self.text_field, + vector_field=self.vector_field, + k=k, + num_candidates=num_candidates, + filter=filter or [], + ) + + if custom_query is not None: + query_body = custom_query(query_body, query) + logger.debug(f"Calling custom_query, Query body now: {query_body}") + + response = await self.client.search( + index=self.index, + **query_body, + size=k, + source=True, + source_includes=fields, + ) + hits: List[Dict[str, Any]] = response["hits"]["hits"] + + return hits + + async def _create_index_if_not_exists(self) -> None: + exists = await self.client.indices.exists(index=self.index) + if exists.meta.status == 200: + logger.debug(f"Index {self.index} already exists. Skipping creation.") + return + + if self.retrieval_strategy.needs_inference(): + if not self.num_dimensions and not self.embedding_service: + raise ValueError( + "retrieval strategy requires embeddings; either embedding_service " + "or num_dimensions need to be specified" + ) + if not self.num_dimensions and self.embedding_service: + vector = await self.embedding_service.embed_query("get num dimensions") + self.num_dimensions = len(vector) + + mappings, settings = self.retrieval_strategy.es_mappings_settings( + text_field=self.text_field, + vector_field=self.vector_field, + num_dimensions=self.num_dimensions, + ) + if self.metadata_mappings: + metadata = mappings["properties"].get("metadata", {"properties": {}}) + for key in self.metadata_mappings.keys(): + if key in metadata: + raise ValueError(f"metadata key {key} already exists in mappings") + + metadata = dict(**metadata["properties"], **self.metadata_mappings) + mappings["properties"]["metadata"] = {"properties": metadata} + + await self.retrieval_strategy.before_index_creation( + client=self.client, + text_field=self.text_field, + vector_field=self.vector_field, + ) + await self.client.indices.create( + index=self.index, mappings=mappings, settings=settings + ) + + async def max_marginal_relevance_search( + self, + *, + embedding_service: AsyncEmbeddingService, + query: str, + vector_field: str, + k: int = 4, + num_candidates: int = 20, + lambda_mult: float = 0.5, + fields: Optional[List[str]] = None, + custom_query: Optional[ + Callable[[Dict[str, Any], Optional[str]], Dict[str, Any]] + ] = None, + ) -> List[Dict[str, Any]]: + """Return docs selected using the maximal marginal relevance. + + Maximal marginal relevance optimizes for similarity to query AND diversity + among selected documents. + + :param query (str): Text to look up documents similar to. + :param k (int): Number of Documents to return. Defaults to 4. + :param fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. + :param lambda_mult (float): Number between 0 and 1 that determines the degree + of diversity among the results with 0 corresponding + to maximum diversity and 1 to minimum diversity. + Defaults to 0.5. + :param fields: Other fields to get from elasticsearch source. These fields + will be added to the document metadata. + + :return: A list of Documents selected by maximal marginal relevance. + """ + remove_vector_query_field_from_metadata = True + if fields is None: + fields = [vector_field] + elif vector_field not in fields: + fields.append(vector_field) + else: + remove_vector_query_field_from_metadata = False + + # Embed the query + query_embedding = await embedding_service.embed_query(query) + + # Fetch the initial documents + got_hits = await self.search( + query=None, + query_vector=query_embedding, + k=num_candidates, + fields=fields, + custom_query=custom_query, + ) + + # Get the embeddings for the fetched documents + got_embeddings = [hit["_source"][vector_field] for hit in got_hits] + + # Select documents using maximal marginal relevance + selected_indices = maximal_marginal_relevance( + query_embedding, got_embeddings, lambda_mult=lambda_mult, k=k + ) + selected_hits = [got_hits[i] for i in selected_indices] + + if remove_vector_query_field_from_metadata: + for hit in selected_hits: + del hit["_source"][vector_field] + + return selected_hits diff --git a/elasticsearch/helpers/vectorstore/_sync/__init__.py b/elasticsearch/helpers/vectorstore/_sync/__init__.py new file mode 100644 index 000000000..2a87d183f --- /dev/null +++ b/elasticsearch/helpers/vectorstore/_sync/__init__.py @@ -0,0 +1,16 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/elasticsearch/helpers/vectorstore/_sync/_utils.py b/elasticsearch/helpers/vectorstore/_sync/_utils.py new file mode 100644 index 000000000..496aec970 --- /dev/null +++ b/elasticsearch/helpers/vectorstore/_sync/_utils.py @@ -0,0 +1,39 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from elasticsearch import BadRequestError, Elasticsearch, NotFoundError + + +def model_must_be_deployed(client: Elasticsearch, model_id: str) -> None: + """ + :raises [NotFoundError]: if the model is neither downloaded nor deployed. + :raises [ConflictError]: if the model is downloaded but not yet deployed. + """ + doc = {"text_field": f"test if the model '{model_id}' is deployed"} + try: + client.ml.infer_trained_model(model_id=model_id, docs=[doc]) + except BadRequestError: + # The model is deployed but expects a different input field name. + pass + + +def model_is_deployed(client: Elasticsearch, model_id: str) -> bool: + try: + model_must_be_deployed(client, model_id) + return True + except NotFoundError: + return False diff --git a/elasticsearch/helpers/vectorstore/_sync/embedding_service.py b/elasticsearch/helpers/vectorstore/_sync/embedding_service.py new file mode 100644 index 000000000..5b0163d98 --- /dev/null +++ b/elasticsearch/helpers/vectorstore/_sync/embedding_service.py @@ -0,0 +1,89 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from abc import ABC, abstractmethod +from typing import List + +from elasticsearch import Elasticsearch +from elasticsearch._version import __versionstr__ as lib_version + + +class EmbeddingService(ABC): + @abstractmethod + def embed_documents(self, texts: List[str]) -> List[List[float]]: + """Generate embeddings for a list of documents. + + :param texts: A list of document strings to generate embeddings for. + + :return: A list of embeddings, one for each document in the input. + """ + + @abstractmethod + def embed_query(self, query: str) -> List[float]: + """Generate an embedding for a single query text. + + :param text: The query text to generate an embedding for. + + :return: The embedding for the input query text. + """ + + +class ElasticsearchEmbeddings(EmbeddingService): + """Elasticsearch as a service for embedding model inference. + + You need to have an embedding model downloaded and deployed in Elasticsearch: + - https://www.elastic.co/guide/en/elasticsearch/reference/current/infer-trained-model.html + - https://www.elastic.co/guide/en/machine-learning/current/ml-nlp-deploy-models.html + """ # noqa: E501 + + def __init__( + self, + *, + client: Elasticsearch, + model_id: str, + input_field: str = "text_field", + user_agent: str = f"elasticsearch-py-es/{lib_version}", + ): + """ + :param agent_header: user agent header specific to the 3rd party integration. + Used for usage tracking in Elastic Cloud. + :param model_id: The model_id of the model deployed in the Elasticsearch cluster. + :param input_field: The name of the key for the input text field in the + document. Defaults to 'text_field'. + :param client: Elasticsearch client connection. Alternatively specify the + Elasticsearch connection with the other es_* parameters. + """ + # Add integration-specific usage header for tracking usage in Elastic Cloud. + # client.options preserves existing (non-user-agent) headers. + client = client.options(headers={"User-Agent": user_agent}) + + self.client = client + self.model_id = model_id + self.input_field = input_field + + def embed_documents(self, texts: List[str]) -> List[List[float]]: + return self._embedding_func(texts) + + def embed_query(self, text: str) -> List[float]: + result = self._embedding_func([text]) + return result[0] + + def _embedding_func(self, texts: List[str]) -> List[List[float]]: + response = self.client.ml.infer_trained_model( + model_id=self.model_id, docs=[{self.input_field: text} for text in texts] + ) + return [doc["predicted_value"] for doc in response["inference_results"]] diff --git a/elasticsearch/helpers/vectorstore/_sync/strategies.py b/elasticsearch/helpers/vectorstore/_sync/strategies.py new file mode 100644 index 000000000..928d34143 --- /dev/null +++ b/elasticsearch/helpers/vectorstore/_sync/strategies.py @@ -0,0 +1,466 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from abc import ABC, abstractmethod +from typing import Any, Dict, List, Optional, Tuple, Union, cast + +from elasticsearch import Elasticsearch +from elasticsearch.helpers.vectorstore._sync._utils import model_must_be_deployed +from elasticsearch.helpers.vectorstore._utils import DistanceMetric + + +class RetrievalStrategy(ABC): + @abstractmethod + def es_query( + self, + *, + query: Optional[str], + query_vector: Optional[List[float]], + text_field: str, + vector_field: str, + k: int, + num_candidates: int, + filter: List[Dict[str, Any]] = [], + ) -> Dict[str, Any]: + """ + Returns the Elasticsearch query body for the given parameters. + The store will execute the query. + + :param query: The text query. Can be None if query_vector is given. + :param k: The total number of results to retrieve. + :param num_candidates: The number of results to fetch initially in knn search. + :param filter: List of filter clauses to apply to the query. + :param query_vector: The query vector. Can be None if a query string is given. + + :return: The Elasticsearch query body. + """ + + @abstractmethod + def es_mappings_settings( + self, + *, + text_field: str, + vector_field: str, + num_dimensions: Optional[int], + ) -> Tuple[Dict[str, Any], Dict[str, Any]]: + """ + Create the required index and do necessary preliminary work, like + creating inference pipelines or checking if a required model was deployed. + + :param client: Elasticsearch client connection. + :param text_field: The field containing the text data in the index. + :param vector_field: The field containing the vector representations in the index. + :param num_dimensions: If vectors are indexed, how many dimensions do they have. + + :return: Dictionary with field and field type pairs that describe the schema. + """ + + def before_index_creation( + self, *, client: Elasticsearch, text_field: str, vector_field: str + ) -> None: + """ + Executes before the index is created. Used for setting up + any required Elasticsearch resources like a pipeline. + Defaults to a no-op. + + :param client: The Elasticsearch client. + :param text_field: The field containing the text data in the index. + :param vector_field: The field containing the vector representations in the index. + """ + pass + + def needs_inference(self) -> bool: + """ + Some retrieval strategies index embedding vectors and allow search by embedding + vector, for example the `DenseVectorStrategy` strategy. Mapping a user input query + string to an embedding vector is called inference. Inference can be applied + in Elasticsearch (using a `model_id`) or outside of Elasticsearch (using an + `EmbeddingService` defined on the `VectorStore`). In the latter case, + this method has to return True. + """ + return False + + +class SparseVectorStrategy(RetrievalStrategy): + """Sparse retrieval strategy using the `text_expansion` processor.""" + + def __init__(self, model_id: str = ".elser_model_2"): + self.model_id = model_id + self._tokens_field = "tokens" + self._pipeline_name = f"{self.model_id}_sparse_embedding" + + def es_query( + self, + *, + query: Optional[str], + query_vector: Optional[List[float]], + text_field: str, + vector_field: str, + k: int, + num_candidates: int, + filter: List[Dict[str, Any]] = [], + ) -> Dict[str, Any]: + if query_vector: + raise ValueError( + "Cannot do sparse retrieval with a query_vector. " + "Inference is currently always applied in Elasticsearch." + ) + if query is None: + raise ValueError("please specify a query string") + + return { + "query": { + "bool": { + "must": [ + { + "text_expansion": { + f"{vector_field}.{self._tokens_field}": { + "model_id": self.model_id, + "model_text": query, + } + } + } + ], + "filter": filter, + } + } + } + + def es_mappings_settings( + self, + *, + text_field: str, + vector_field: str, + num_dimensions: Optional[int], + ) -> Tuple[Dict[str, Any], Dict[str, Any]]: + mappings: Dict[str, Any] = { + "properties": { + vector_field: { + "properties": {self._tokens_field: {"type": "rank_features"}} + } + } + } + settings = {"default_pipeline": self._pipeline_name} + + return mappings, settings + + def before_index_creation( + self, *, client: Elasticsearch, text_field: str, vector_field: str + ) -> None: + if self.model_id: + model_must_be_deployed(client, self.model_id) + + # Create a pipeline for the model + client.ingest.put_pipeline( + id=self._pipeline_name, + description="Embedding pipeline for Python VectorStore", + processors=[ + { + "inference": { + "model_id": self.model_id, + "target_field": vector_field, + "field_map": {text_field: "text_field"}, + "inference_config": { + "text_expansion": {"results_field": self._tokens_field} + }, + } + } + ], + ) + + +class DenseVectorStrategy(RetrievalStrategy): + """K-nearest-neighbors retrieval.""" + + def __init__( + self, + *, + distance: DistanceMetric = DistanceMetric.COSINE, + model_id: Optional[str] = None, + hybrid: bool = False, + rrf: Union[bool, Dict[str, Any]] = True, + text_field: Optional[str] = "text_field", + ): + if hybrid and not text_field: + raise ValueError( + "to enable hybrid you have to specify a text_field (for BM25Strategy matching)" + ) + + self.distance = distance + self.model_id = model_id + self.hybrid = hybrid + self.rrf = rrf + self.text_field = text_field + + def es_query( + self, + *, + query: Optional[str], + query_vector: Optional[List[float]], + text_field: str, + vector_field: str, + k: int, + num_candidates: int, + filter: List[Dict[str, Any]] = [], + ) -> Dict[str, Any]: + knn = { + "filter": filter, + "field": vector_field, + "k": k, + "num_candidates": num_candidates, + } + + if query_vector is not None: + knn["query_vector"] = query_vector + else: + # Inference in Elasticsearch. When initializing we make sure to always have + # a model_id if don't have an embedding_service. + knn["query_vector_builder"] = { + "text_embedding": { + "model_id": self.model_id, + "model_text": query, + } + } + + if self.hybrid: + return self._hybrid(query=cast(str, query), knn=knn, filter=filter) + + return {"knn": knn} + + def es_mappings_settings( + self, + *, + text_field: str, + vector_field: str, + num_dimensions: Optional[int], + ) -> Tuple[Dict[str, Any], Dict[str, Any]]: + if self.distance is DistanceMetric.COSINE: + similarity = "cosine" + elif self.distance is DistanceMetric.EUCLIDEAN_DISTANCE: + similarity = "l2_norm" + elif self.distance is DistanceMetric.DOT_PRODUCT: + similarity = "dot_product" + elif self.distance is DistanceMetric.MAX_INNER_PRODUCT: + similarity = "max_inner_product" + else: + raise ValueError(f"Similarity {self.distance} not supported.") + + mappings: Dict[str, Any] = { + "properties": { + vector_field: { + "type": "dense_vector", + "dims": num_dimensions, + "index": True, + "similarity": similarity, + }, + } + } + + return mappings, {} + + def before_index_creation( + self, *, client: Elasticsearch, text_field: str, vector_field: str + ) -> None: + if self.model_id: + model_must_be_deployed(client, self.model_id) + + def _hybrid( + self, query: str, knn: Dict[str, Any], filter: List[Dict[str, Any]] + ) -> Dict[str, Any]: + # Add a query to the knn query. + # RRF is used to even the score from the knn query and text query + # RRF has two optional parameters: {'rank_constant':int, 'window_size':int} + # https://www.elastic.co/guide/en/elasticsearch/reference/current/rrf.html + query_body = { + "knn": knn, + "query": { + "bool": { + "must": [ + { + "match": { + self.text_field: { + "query": query, + } + } + } + ], + "filter": filter, + } + }, + } + + if isinstance(self.rrf, Dict): + query_body["rank"] = {"rrf": self.rrf} + elif isinstance(self.rrf, bool) and self.rrf is True: + query_body["rank"] = {"rrf": {}} + + return query_body + + def needs_inference(self) -> bool: + return not self.model_id + + +class DenseVectorScriptScoreStrategy(RetrievalStrategy): + """Exact nearest neighbors retrieval using the `script_score` query.""" + + def __init__(self, distance: DistanceMetric = DistanceMetric.COSINE) -> None: + self.distance = distance + + def es_query( + self, + *, + query: Optional[str], + query_vector: Optional[List[float]], + text_field: str, + vector_field: str, + k: int, + num_candidates: int, + filter: List[Dict[str, Any]] = [], + ) -> Dict[str, Any]: + if not query_vector: + raise ValueError("specify a query_vector") + + if self.distance is DistanceMetric.COSINE: + similarity_algo = ( + f"cosineSimilarity(params.query_vector, '{vector_field}') + 1.0" + ) + elif self.distance is DistanceMetric.EUCLIDEAN_DISTANCE: + similarity_algo = f"1 / (1 + l2norm(params.query_vector, '{vector_field}'))" + elif self.distance is DistanceMetric.DOT_PRODUCT: + similarity_algo = f""" + double value = dotProduct(params.query_vector, '{vector_field}'); + return sigmoid(1, Math.E, -value); + """ + elif self.distance is DistanceMetric.MAX_INNER_PRODUCT: + similarity_algo = f""" + double value = dotProduct(params.query_vector, '{vector_field}'); + if (dotProduct < 0) {{ + return 1 / (1 + -1 * dotProduct); + }} + return dotProduct + 1; + """ + else: + raise ValueError(f"Similarity {self.distance} not supported.") + + query_bool: Dict[str, Any] = {"match_all": {}} + if filter: + query_bool = {"bool": {"filter": filter}} + + return { + "query": { + "script_score": { + "query": query_bool, + "script": { + "source": similarity_algo, + "params": {"query_vector": query_vector}, + }, + }, + } + } + + def es_mappings_settings( + self, + *, + text_field: str, + vector_field: str, + num_dimensions: Optional[int], + ) -> Tuple[Dict[str, Any], Dict[str, Any]]: + mappings = { + "properties": { + vector_field: { + "type": "dense_vector", + "dims": num_dimensions, + "index": False, + } + } + } + + return mappings, {} + + def needs_inference(self) -> bool: + return True + + +class BM25Strategy(RetrievalStrategy): + def __init__( + self, + k1: Optional[float] = None, + b: Optional[float] = None, + ): + self.k1 = k1 + self.b = b + + def es_query( + self, + *, + query: Optional[str], + query_vector: Optional[List[float]], + text_field: str, + vector_field: str, + k: int, + num_candidates: int, + filter: List[Dict[str, Any]] = [], + ) -> Dict[str, Any]: + return { + "query": { + "bool": { + "must": [ + { + "match": { + text_field: { + "query": query, + } + }, + }, + ], + "filter": filter, + }, + }, + } + + def es_mappings_settings( + self, + *, + text_field: str, + vector_field: str, + num_dimensions: Optional[int], + ) -> Tuple[Dict[str, Any], Dict[str, Any]]: + similarity_name = "custom_bm25" + + mappings: Dict[str, Any] = { + "properties": { + text_field: { + "type": "text", + "similarity": similarity_name, + }, + }, + } + + bm25: Dict[str, Any] = { + "type": "BM25", + } + if self.k1 is not None: + bm25["k1"] = self.k1 + if self.b is not None: + bm25["b"] = self.b + settings = { + "similarity": { + similarity_name: bm25, + } + } + + return mappings, settings diff --git a/elasticsearch/helpers/vectorstore/_sync/vectorstore.py b/elasticsearch/helpers/vectorstore/_sync/vectorstore.py new file mode 100644 index 000000000..2feb96ec4 --- /dev/null +++ b/elasticsearch/helpers/vectorstore/_sync/vectorstore.py @@ -0,0 +1,388 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +import uuid +from typing import Any, Callable, Dict, List, Optional + +from elasticsearch import Elasticsearch +from elasticsearch._version import __versionstr__ as lib_version +from elasticsearch.helpers import BulkIndexError, bulk +from elasticsearch.helpers.vectorstore import EmbeddingService, RetrievalStrategy +from elasticsearch.helpers.vectorstore._utils import maximal_marginal_relevance + +logger = logging.getLogger(__name__) + + +class VectorStore: + """ + VectorStore is a higher-level abstraction of indexing and search. + Users can pick from available retrieval strategies. + + Documents have up to 3 fields: + - text_field: the text to be indexed and searched. + - metadata: additional information about the document, either schema-free + or defined by the supplied metadata_mappings. + - vector_field (usually not filled by the user): the embedding vector of the text. + + Depending on the strategy, vector embeddings are + - created by the user beforehand + - created by this AsyncVectorStore class in Python + - created in-stack by inference pipelines. + """ + + def __init__( + self, + client: Elasticsearch, + *, + index: str, + retrieval_strategy: RetrievalStrategy, + embedding_service: Optional[EmbeddingService] = None, + num_dimensions: Optional[int] = None, + text_field: str = "text_field", + vector_field: str = "vector_field", + metadata_mappings: Optional[Dict[str, Any]] = None, + user_agent: str = f"elasticsearch-py-vs/{lib_version}", + ) -> None: + """ + :param user_header: user agent header specific to the 3rd party integration. + Used for usage tracking in Elastic Cloud. + :param index: The name of the index to query. + :param retrieval_strategy: how to index and search the data. See the strategies + module for availble strategies. + :param text_field: Name of the field with the textual data. + :param vector_field: For strategies that perform embedding inference in Python, + the embedding vector goes in this field. + :param client: Elasticsearch client connection. Alternatively specify the + Elasticsearch connection with the other es_* parameters. + """ + # Add integration-specific usage header for tracking usage in Elastic Cloud. + # client.options preserves existing (non-user-agent) headers. + client = client.options(headers={"User-Agent": user_agent}) + + if hasattr(retrieval_strategy, "text_field"): + retrieval_strategy.text_field = text_field + if hasattr(retrieval_strategy, "vector_field"): + retrieval_strategy.vector_field = vector_field + + self.client = client + self.index = index + self.retrieval_strategy = retrieval_strategy + self.embedding_service = embedding_service + self.num_dimensions = num_dimensions + self.text_field = text_field + self.vector_field = vector_field + self.metadata_mappings = metadata_mappings + + def close(self) -> None: + return self.client.close() + + def add_texts( + self, + texts: List[str], + *, + metadatas: Optional[List[Dict[str, Any]]] = None, + vectors: Optional[List[List[float]]] = None, + ids: Optional[List[str]] = None, + refresh_indices: bool = True, + create_index_if_not_exists: bool = True, + bulk_kwargs: Optional[Dict[str, Any]] = None, + ) -> List[str]: + """Add documents to the Elasticsearch index. + + :param texts: List of text documents. + :param metadata: Optional list of document metadata. Must be of same length as + texts. + :param vectors: Optional list of embedding vectors. Must be of same length as + texts. + :param ids: Optional list of ID strings. Must be of same length as texts. + :param refresh_indices: Whether to refresh the index after deleting documents. + Defaults to True. + :param create_index_if_not_exists: Whether to create the index if it does not + exist. Defaults to True. + :param bulk_kwargs: Arguments to pass to the bulk function when indexing + (for example chunk_size). + + :return: List of IDs of the created documents, either echoing the provided one + or returning newly created ones. + """ + bulk_kwargs = bulk_kwargs or {} + ids = ids or [str(uuid.uuid4()) for _ in texts] + requests = [] + + if create_index_if_not_exists: + self._create_index_if_not_exists() + + if self.embedding_service and not vectors: + vectors = self.embedding_service.embed_documents(texts) + + for i, text in enumerate(texts): + metadata = metadatas[i] if metadatas else {} + + request: Dict[str, Any] = { + "_op_type": "index", + "_index": self.index, + self.text_field: text, + "metadata": metadata, + "_id": ids[i], + } + + if vectors: + request[self.vector_field] = vectors[i] + + requests.append(request) + + if len(requests) > 0: + try: + success, failed = bulk( + self.client, + requests, + stats_only=True, + refresh=refresh_indices, + **bulk_kwargs, + ) + logger.debug(f"added texts {ids} to index") + return ids + except BulkIndexError as e: + logger.error(f"Error adding texts: {e}") + firstError = e.errors[0].get("index", {}).get("error", {}) + logger.error(f"First error reason: {firstError.get('reason')}") + raise e + + else: + logger.debug("No texts to add to index") + return [] + + def delete( # type: ignore[no-untyped-def] + self, + *, + ids: Optional[List[str]] = None, + query: Optional[Dict[str, Any]] = None, + refresh_indices: bool = True, + **delete_kwargs, + ) -> bool: + """Delete documents from the Elasticsearch index. + + :param ids: List of IDs of documents to delete. + :param refresh_indices: Whether to refresh the index after deleting documents. + Defaults to True. + + :return: True if deletion was successful. + """ + if ids is not None and query is not None: + raise ValueError("one of ids or query must be specified") + elif ids is None and query is None: + raise ValueError("either specify ids or query") + + try: + if ids: + body = [ + {"_op_type": "delete", "_index": self.index, "_id": _id} + for _id in ids + ] + bulk( + self.client, + body, + refresh=refresh_indices, + ignore_status=404, + **delete_kwargs, + ) + logger.debug(f"Deleted {len(body)} texts from index") + + else: + self.client.delete_by_query( + index=self.index, + query=query, + refresh=refresh_indices, + **delete_kwargs, + ) + + except BulkIndexError as e: + logger.error(f"Error deleting texts: {e}") + firstError = e.errors[0].get("index", {}).get("error", {}) + logger.error(f"First error reason: {firstError.get('reason')}") + raise e + + return True + + def search( + self, + *, + query: Optional[str], + query_vector: Optional[List[float]] = None, + k: int = 4, + num_candidates: int = 50, + fields: Optional[List[str]] = None, + filter: Optional[List[Dict[str, Any]]] = None, + custom_query: Optional[ + Callable[[Dict[str, Any], Optional[str]], Dict[str, Any]] + ] = None, + ) -> List[Dict[str, Any]]: + """ + :param query: Input query string. + :param query_vector: Input embedding vector. If given, input query string is + ignored. + :param k: Number of returned results. + :param num_candidates: Number of candidates to fetch from data nodes in knn. + :param fields: List of field names to return. + :param filter: Elasticsearch filters to apply. + :param custom_query: Function to modify the Elasticsearch query body before it is + sent to Elasticsearch. + + :return: List of document hits. Includes _index, _id, _score and _source. + """ + if fields is None: + fields = [] + if "metadata" not in fields: + fields.append("metadata") + if self.text_field not in fields: + fields.append(self.text_field) + + if self.embedding_service and not query_vector: + if not query: + raise ValueError("specify a query or a query_vector to search") + query_vector = self.embedding_service.embed_query(query) + + query_body = self.retrieval_strategy.es_query( + query=query, + query_vector=query_vector, + text_field=self.text_field, + vector_field=self.vector_field, + k=k, + num_candidates=num_candidates, + filter=filter or [], + ) + + if custom_query is not None: + query_body = custom_query(query_body, query) + logger.debug(f"Calling custom_query, Query body now: {query_body}") + + response = self.client.search( + index=self.index, + **query_body, + size=k, + source=True, + source_includes=fields, + ) + hits: List[Dict[str, Any]] = response["hits"]["hits"] + + return hits + + def _create_index_if_not_exists(self) -> None: + exists = self.client.indices.exists(index=self.index) + if exists.meta.status == 200: + logger.debug(f"Index {self.index} already exists. Skipping creation.") + return + + if self.retrieval_strategy.needs_inference(): + if not self.num_dimensions and not self.embedding_service: + raise ValueError( + "retrieval strategy requires embeddings; either embedding_service " + "or num_dimensions need to be specified" + ) + if not self.num_dimensions and self.embedding_service: + vector = self.embedding_service.embed_query("get num dimensions") + self.num_dimensions = len(vector) + + mappings, settings = self.retrieval_strategy.es_mappings_settings( + text_field=self.text_field, + vector_field=self.vector_field, + num_dimensions=self.num_dimensions, + ) + if self.metadata_mappings: + metadata = mappings["properties"].get("metadata", {"properties": {}}) + for key in self.metadata_mappings.keys(): + if key in metadata: + raise ValueError(f"metadata key {key} already exists in mappings") + + metadata = dict(**metadata["properties"], **self.metadata_mappings) + mappings["properties"]["metadata"] = {"properties": metadata} + + self.retrieval_strategy.before_index_creation( + client=self.client, + text_field=self.text_field, + vector_field=self.vector_field, + ) + self.client.indices.create( + index=self.index, mappings=mappings, settings=settings + ) + + def max_marginal_relevance_search( + self, + *, + embedding_service: EmbeddingService, + query: str, + vector_field: str, + k: int = 4, + num_candidates: int = 20, + lambda_mult: float = 0.5, + fields: Optional[List[str]] = None, + custom_query: Optional[ + Callable[[Dict[str, Any], Optional[str]], Dict[str, Any]] + ] = None, + ) -> List[Dict[str, Any]]: + """Return docs selected using the maximal marginal relevance. + + Maximal marginal relevance optimizes for similarity to query AND diversity + among selected documents. + + :param query (str): Text to look up documents similar to. + :param k (int): Number of Documents to return. Defaults to 4. + :param fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. + :param lambda_mult (float): Number between 0 and 1 that determines the degree + of diversity among the results with 0 corresponding + to maximum diversity and 1 to minimum diversity. + Defaults to 0.5. + :param fields: Other fields to get from elasticsearch source. These fields + will be added to the document metadata. + + :return: A list of Documents selected by maximal marginal relevance. + """ + remove_vector_query_field_from_metadata = True + if fields is None: + fields = [vector_field] + elif vector_field not in fields: + fields.append(vector_field) + else: + remove_vector_query_field_from_metadata = False + + # Embed the query + query_embedding = embedding_service.embed_query(query) + + # Fetch the initial documents + got_hits = self.search( + query=None, + query_vector=query_embedding, + k=num_candidates, + fields=fields, + custom_query=custom_query, + ) + + # Get the embeddings for the fetched documents + got_embeddings = [hit["_source"][vector_field] for hit in got_hits] + + # Select documents using maximal marginal relevance + selected_indices = maximal_marginal_relevance( + query_embedding, got_embeddings, lambda_mult=lambda_mult, k=k + ) + selected_hits = [got_hits[i] for i in selected_indices] + + if remove_vector_query_field_from_metadata: + for hit in selected_hits: + del hit["_source"][vector_field] + + return selected_hits diff --git a/elasticsearch/helpers/vectorstore/_utils.py b/elasticsearch/helpers/vectorstore/_utils.py new file mode 100644 index 000000000..df91b5cc9 --- /dev/null +++ b/elasticsearch/helpers/vectorstore/_utils.py @@ -0,0 +1,116 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from enum import Enum +from typing import TYPE_CHECKING, List, Union + +if TYPE_CHECKING: + import numpy as np + import numpy.typing as npt + +Matrix = Union[ + List[List[float]], List["npt.NDArray[np.float64]"], "npt.NDArray[np.float64]" +] + + +class DistanceMetric(str, Enum): + """Enumerator of all Elasticsearch dense vector distance metrics.""" + + COSINE = "COSINE" + DOT_PRODUCT = "DOT_PRODUCT" + EUCLIDEAN_DISTANCE = "EUCLIDEAN_DISTANCE" + MAX_INNER_PRODUCT = "MAX_INNER_PRODUCT" + + +def maximal_marginal_relevance( + query_embedding: List[float], + embedding_list: List[List[float]], + lambda_mult: float = 0.5, + k: int = 4, +) -> List[int]: + """Calculate maximal marginal relevance.""" + + try: + import numpy as np + except ModuleNotFoundError as e: + _raise_missing_mmr_deps_error(e) + + query_embedding_arr = np.array(query_embedding) + + if min(k, len(embedding_list)) <= 0: + return [] + if query_embedding_arr.ndim == 1: + query_embedding_arr = np.expand_dims(query_embedding_arr, axis=0) + similarity_to_query = _cosine_similarity(query_embedding_arr, embedding_list)[0] + most_similar = int(np.argmax(similarity_to_query)) + idxs = [most_similar] + selected = np.array([embedding_list[most_similar]]) + while len(idxs) < min(k, len(embedding_list)): + best_score = -np.inf + idx_to_add = -1 + similarity_to_selected = _cosine_similarity(embedding_list, selected) + for i, query_score in enumerate(similarity_to_query): + if i in idxs: + continue + redundant_score = max(similarity_to_selected[i]) + equation_score = ( + lambda_mult * query_score - (1 - lambda_mult) * redundant_score + ) + if equation_score > best_score: + best_score = equation_score + idx_to_add = i + idxs.append(idx_to_add) + selected = np.append(selected, [embedding_list[idx_to_add]], axis=0) + return idxs + + +def _cosine_similarity(X: Matrix, Y: Matrix) -> "npt.NDArray[np.float64]": + """Row-wise cosine similarity between two equal-width matrices.""" + + try: + import numpy as np + import simsimd as simd + except ModuleNotFoundError as e: + _raise_missing_mmr_deps_error(e) + + if len(X) == 0 or len(Y) == 0: + return np.array([]) + + X = np.array(X) + Y = np.array(Y) + if X.shape[1] != Y.shape[1]: + raise ValueError( + f"Number of columns in X and Y must be the same. X has shape {X.shape} " + f"and Y has shape {Y.shape}." + ) + + X = np.array(X, dtype=np.float32) + Y = np.array(Y, dtype=np.float32) + Z = 1 - np.array(simd.cdist(X, Y, metric="cosine")) + if isinstance(Z, float): + return np.array([Z]) + return np.array(Z) + + +def _raise_missing_mmr_deps_error(parent_error: ModuleNotFoundError) -> None: + import sys + + raise ModuleNotFoundError( + f"Failed to compute maximal marginal relevance because the required " + f"module '{parent_error.name}' is missing. You can install it by running: " + f"'{sys.executable} -m pip install elasticsearch[vectorstore_mmr]'" + ) from parent_error diff --git a/noxfile.py b/noxfile.py index c303fe26c..a3e1fc172 100644 --- a/noxfile.py +++ b/noxfile.py @@ -48,7 +48,9 @@ def pytest_argv(): @nox.session(python=["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"]) def test(session): - session.install(".[async,requests,orjson]", env=INSTALL_ENV, silent=False) + session.install( + ".[async,requests,orjson,vectorstore_mmr]", env=INSTALL_ENV, silent=False + ) session.install("-r", "dev-requirements.txt", silent=False) session.run(*pytest_argv()) @@ -95,7 +97,7 @@ def lint(session): session.run("flake8", *SOURCE_FILES) session.run("python", "utils/license-headers.py", "check", *SOURCE_FILES) - session.install(".[async,requests,orjson]", env=INSTALL_ENV) + session.install(".[async,requests,orjson,vectorstore_mmr]", env=INSTALL_ENV) # Run mypy on the package and then the type examples separately for # the two different mypy use-cases, ourselves and our users. diff --git a/setup.py b/setup.py index dc592dcc4..e9ee3a377 100644 --- a/setup.py +++ b/setup.py @@ -92,5 +92,7 @@ "requests": ["requests>=2.4.0, <3.0.0"], "async": ["aiohttp>=3,<4"], "orjson": ["orjson>=3"], + # Maximal Marginal Relevance (MMR) for search results + "vectorstore_mmr": ["numpy>=1", "simsimd>=3"], }, ) diff --git a/test_elasticsearch/test_server/conftest.py b/test_elasticsearch/test_server/conftest.py index 558d0b013..7b87fd1d3 100644 --- a/test_elasticsearch/test_server/conftest.py +++ b/test_elasticsearch/test_server/conftest.py @@ -30,19 +30,33 @@ ELASTICSEARCH_REST_API_TESTS = [] +def _create(elasticsearch_url, transport=None, node_class=None): + # Configure the client with certificates + kw = {} + if elasticsearch_url.startswith("https://"): + kw["ca_certs"] = CA_CERTS + + # Optionally configure an HTTP conn class depending on + # 'PYTHON_CONNECTION_CLASS' env var + if "PYTHON_CONNECTION_CLASS" in os.environ: + kw["node_class"] = os.environ["PYTHON_CONNECTION_CLASS"] + + if node_class is not None and "node_class" not in kw: + kw["node_class"] = node_class + + if transport: + kw["transport_class"] = transport + + # We do this little dance with the URL to force + # Requests to respect 'headers: None' within rest API spec tests. + return elasticsearch.Elasticsearch(elasticsearch_url, **kw) + + @pytest.fixture(scope="session") def sync_client_factory(elasticsearch_url): client = None try: - # Configure the client with certificates and optionally - # an HTTP conn class depending on 'PYTHON_CONNECTION_CLASS' envvar - kw = {"ca_certs": CA_CERTS} - if "PYTHON_CONNECTION_CLASS" in os.environ: - kw["node_class"] = os.environ["PYTHON_CONNECTION_CLASS"] - - # We do this little dance with the URL to force - # Requests to respect 'headers: None' within rest API spec tests. - client = elasticsearch.Elasticsearch(elasticsearch_url, **kw) + client = _create(elasticsearch_url) # Wipe the cluster before we start testing just in case it wasn't wiped # cleanly from the previous run of pytest? diff --git a/test_elasticsearch/test_server/test_mapbox_vector_tile.py b/test_elasticsearch/test_server/test_mapbox_vector_tile.py index 988210984..332e8d144 100644 --- a/test_elasticsearch/test_server/test_mapbox_vector_tile.py +++ b/test_elasticsearch/test_server/test_mapbox_vector_tile.py @@ -17,7 +17,9 @@ import pytest -from elasticsearch import Elasticsearch, RequestError +from elasticsearch import RequestError + +from .conftest import _create @pytest.fixture(scope="function") @@ -73,7 +75,8 @@ def mvt_setup(sync_client): @pytest.mark.parametrize("node_class", ["urllib3", "requests"]) def test_mapbox_vector_tile_error(elasticsearch_url, mvt_setup, node_class, ca_certs): - client = Elasticsearch(elasticsearch_url, node_class=node_class, ca_certs=ca_certs) + client = _create(elasticsearch_url, node_class=node_class) + client.search_mvt( index="museums", zoom=13, @@ -121,7 +124,7 @@ def test_mapbox_vector_tile_response( except ImportError: return pytest.skip("Requires the 'mapbox-vector-tile' package") - client = Elasticsearch(elasticsearch_url, node_class=node_class, ca_certs=ca_certs) + client = _create(elasticsearch_url, node_class=node_class) resp = client.search_mvt( index="museums", diff --git a/test_elasticsearch/test_server/test_vectorstore/__init__.py b/test_elasticsearch/test_server/test_vectorstore/__init__.py new file mode 100644 index 000000000..87710976a --- /dev/null +++ b/test_elasticsearch/test_server/test_vectorstore/__init__.py @@ -0,0 +1,81 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import List + +from elastic_transport import Transport + +from elasticsearch.helpers.vectorstore import EmbeddingService + + +class RequestSavingTransport(Transport): + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.requests: list = [] + + def perform_request(self, *args, **kwargs): + self.requests.append(kwargs) + return super().perform_request(*args, **kwargs) + + +class FakeEmbeddings(EmbeddingService): + """Fake embeddings functionality for testing.""" + + def __init__(self, dimensionality: int = 10) -> None: + self.dimensionality = dimensionality + + def embed_documents(self, texts: List[str]) -> List[List[float]]: + """Return simple embeddings. Embeddings encode each text as its index.""" + return [ + [float(1.0)] * (self.dimensionality - 1) + [float(i)] + for i in range(len(texts)) + ] + + def embed_query(self, text: str) -> List[float]: + """Return constant query embeddings. + Embeddings are identical to embed_documents(texts)[0]. + Distance to each text will be that text's index, + as it was passed to embed_documents. + """ + return [float(1.0)] * (self.dimensionality - 1) + [float(0.0)] + + +class ConsistentFakeEmbeddings(FakeEmbeddings): + """Fake embeddings which remember all the texts seen so far to return consistent + vectors for the same texts.""" + + def __init__(self, dimensionality: int = 10) -> None: + self.known_texts: List[str] = [] + self.dimensionality = dimensionality + + def embed_documents(self, texts: List[str]) -> List[List[float]]: + """Return consistent embeddings for each text seen so far.""" + out_vectors = [] + for text in texts: + if text not in self.known_texts: + self.known_texts.append(text) + vector = [float(1.0)] * (self.dimensionality - 1) + [ + float(self.known_texts.index(text)) + ] + out_vectors.append(vector) + return out_vectors + + def embed_query(self, text: str) -> List[float]: + """Return consistent embeddings for the text, if seen before, or a constant + one if the text is unknown.""" + result = self.embed_documents([text]) + return result[0] diff --git a/test_elasticsearch/test_server/test_vectorstore/conftest.py b/test_elasticsearch/test_server/test_vectorstore/conftest.py new file mode 100644 index 000000000..a0886a9c4 --- /dev/null +++ b/test_elasticsearch/test_server/test_vectorstore/conftest.py @@ -0,0 +1,60 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import uuid + +import pytest + +from ...utils import wipe_cluster +from ..conftest import _create +from . import RequestSavingTransport + + +@pytest.fixture(scope="function") +def index() -> str: + return f"test_{uuid.uuid4().hex}" + + +@pytest.fixture(scope="function") +def sync_client_request_saving_factory(elasticsearch_url): + client = None + + try: + client = _create(elasticsearch_url) + # Wipe the cluster before we start testing just in case it wasn't wiped + # cleanly from the previous run of pytest? + wipe_cluster(client) + finally: + client.close() + + try: + # Recreate client with a transport that saves requests. + client = _create(elasticsearch_url, RequestSavingTransport) + + yield client + finally: + if client: + client.close() + + +@pytest.fixture(scope="function") +def sync_client_request_saving(sync_client_request_saving_factory): + try: + yield sync_client_request_saving_factory + finally: + # Wipe the cluster clean after every test execution. + wipe_cluster(sync_client_request_saving_factory) diff --git a/test_elasticsearch/test_server/test_vectorstore/docker-compose.yml b/test_elasticsearch/test_server/test_vectorstore/docker-compose.yml new file mode 100644 index 000000000..b13520e06 --- /dev/null +++ b/test_elasticsearch/test_server/test_vectorstore/docker-compose.yml @@ -0,0 +1,28 @@ +version: "3" + +services: + elasticsearch: + image: elasticsearch:8.13.0 + environment: + - action.destructive_requires_name=false # allow wildcard index deletions + - discovery.type=single-node + - xpack.license.self_generated.type=trial + - xpack.security.enabled=false # disable password and TLS; never do this in production! + ports: + - "9200:9200" + healthcheck: + test: + [ + "CMD-SHELL", + "curl --silent --fail http://localhost:9200/_cluster/health || exit 1" + ] + interval: 10s + retries: 60 + + # Currently fails on Mac: https://github.com/elastic/elasticsearch/issues/106206 + elasticsearch-with-models: + image: docker.elastic.co/eland/eland + depends_on: + - elasticsearch + restart: no + command: sh -c "sleep 10 && eland_import_hub_model --url http://elasticsearch:9200 --hub-model-id sentence-transformers/msmarco-minilm-l-12-v3 --start" diff --git a/test_elasticsearch/test_server/test_vectorstore/test_embedding_service.py b/test_elasticsearch/test_server/test_vectorstore/test_embedding_service.py new file mode 100644 index 000000000..c667a8a38 --- /dev/null +++ b/test_elasticsearch/test_server/test_vectorstore/test_embedding_service.py @@ -0,0 +1,91 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import re + +import pytest + +from elasticsearch import Elasticsearch +from elasticsearch.helpers.vectorstore import ElasticsearchEmbeddings +from elasticsearch.helpers.vectorstore._sync._utils import model_is_deployed + +# deployed with +# https://www.elastic.co/guide/en/machine-learning/current/ml-nlp-text-emb-vector-search-example.html +MODEL_ID = os.getenv("MODEL_ID", "sentence-transformers__msmarco-minilm-l-12-v3") +NUM_DIMENSIONS = int(os.getenv("NUM_DIMENSIONS", "384")) + + +def test_elasticsearch_embedding_documents(sync_client: Elasticsearch) -> None: + """Test Elasticsearch embedding documents.""" + + if not model_is_deployed(sync_client, MODEL_ID): + pytest.skip(f"{MODEL_ID} model is not deployed in ML Node, skipping test") + + documents = ["foo bar", "bar foo", "foo"] + embedding = ElasticsearchEmbeddings( + client=sync_client, user_agent="test", model_id=MODEL_ID + ) + output = embedding.embed_documents(documents) + assert len(output) == 3 + assert len(output[0]) == NUM_DIMENSIONS + assert len(output[1]) == NUM_DIMENSIONS + assert len(output[2]) == NUM_DIMENSIONS + + +def test_elasticsearch_embedding_query(sync_client: Elasticsearch) -> None: + """Test Elasticsearch embedding query.""" + + if not model_is_deployed(sync_client, MODEL_ID): + pytest.skip(f"{MODEL_ID} model is not deployed in ML Node, skipping test") + + document = "foo bar" + embedding = ElasticsearchEmbeddings( + client=sync_client, user_agent="test", model_id=MODEL_ID + ) + output = embedding.embed_query(document) + assert len(output) == NUM_DIMENSIONS + + +def test_user_agent_default( + sync_client: Elasticsearch, sync_client_request_saving: Elasticsearch +) -> None: + """Test to make sure the user-agent is set correctly.""" + + if not model_is_deployed(sync_client, MODEL_ID): + pytest.skip(f"{MODEL_ID} model is not deployed in ML Node, skipping test") + + embeddings = ElasticsearchEmbeddings( + client=sync_client_request_saving, model_id=MODEL_ID + ) + + expected_pattern = r"^elasticsearch-py-es/\d+\.\d+\.\d+$" + + got_agent = embeddings.client._headers["User-Agent"] + assert ( + re.match(expected_pattern, got_agent) is not None + ), f"The user agent '{got_agent}' does not match the expected pattern." + + embeddings.embed_query("foo bar") + + requests = embeddings.client.transport.requests # type: ignore + assert len(requests) == 1 + + got_request_agent = requests[0]["headers"]["User-Agent"] + assert ( + re.match(expected_pattern, got_request_agent) is not None + ), f"The user agent '{got_request_agent}' does not match the expected pattern." diff --git a/test_elasticsearch/test_server/test_vectorstore/test_vectorstore.py b/test_elasticsearch/test_server/test_vectorstore/test_vectorstore.py new file mode 100644 index 000000000..2dedc61c3 --- /dev/null +++ b/test_elasticsearch/test_server/test_vectorstore/test_vectorstore.py @@ -0,0 +1,911 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +import re +from functools import partial +from typing import Any, List, Optional, Union + +import pytest + +from elasticsearch import Elasticsearch, NotFoundError +from elasticsearch.helpers import BulkIndexError +from elasticsearch.helpers.vectorstore import ( + BM25Strategy, + DenseVectorScriptScoreStrategy, + DenseVectorStrategy, + DistanceMetric, + SparseVectorStrategy, + VectorStore, +) +from elasticsearch.helpers.vectorstore._sync._utils import model_is_deployed + +from . import ConsistentFakeEmbeddings, FakeEmbeddings + +logging.basicConfig(level=logging.DEBUG) + +""" +docker-compose up elasticsearch + +By default runs against local docker instance of Elasticsearch. +To run against Elastic Cloud, set the following environment variables: +- ES_CLOUD_ID +- ES_API_KEY + +Some of the tests require the following models to be deployed in the ML Node: +- elser (can be downloaded and deployed through Kibana and trained models UI) +- sentence-transformers__all-minilm-l6-v2 (can be deployed through the API, + loaded via eland) + +These tests that require the models to be deployed are skipped by default. +Enable them by adding the model name to the modelsDeployed list below. +""" + +ELSER_MODEL_ID = ".elser_model_2" +TRANSFORMER_MODEL_ID = "sentence-transformers__all-minilm-l6-v2" + + +class TestVectorStore: + def test_search_without_metadata( + self, sync_client: Elasticsearch, index: str + ) -> None: + """Test end to end construction and search without metadata.""" + + def assert_query(query_body: dict, query: Optional[str]) -> dict: + assert query_body == { + "knn": { + "field": "vector_field", + "filter": [], + "k": 1, + "num_candidates": 50, + "query_vector": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0], + } + } + return query_body + + store = VectorStore( + index=index, + retrieval_strategy=DenseVectorStrategy(), + embedding_service=FakeEmbeddings(), + client=sync_client, + ) + + texts = ["foo", "bar", "baz"] + store.add_texts(texts) + + output = store.search(query="foo", k=1, custom_query=assert_query) + assert [doc["_source"]["text_field"] for doc in output] == ["foo"] + + def test_search_without_metadata_async( + self, sync_client: Elasticsearch, index: str + ) -> None: + """Test end to end construction and search without metadata.""" + store = VectorStore( + index=index, + retrieval_strategy=DenseVectorStrategy(), + embedding_service=FakeEmbeddings(), + client=sync_client, + ) + + texts = ["foo", "bar", "baz"] + store.add_texts(texts) + + output = store.search(query="foo", k=1) + assert [doc["_source"]["text_field"] for doc in output] == ["foo"] + + def test_add_vectors(self, sync_client: Elasticsearch, index: str) -> None: + """ + Test adding pre-built embeddings instead of using inference for the texts. + This allows you to separate the embeddings text and the page_content + for better proximity between user's question and embedded text. + For example, your embedding text can be a question, whereas page_content + is the answer. + """ + embeddings = ConsistentFakeEmbeddings() + texts = ["foo1", "foo2", "foo3"] + metadatas = [{"page": i} for i in range(len(texts))] + + embedding_vectors = embeddings.embed_documents(texts) + + store = VectorStore( + index=index, + retrieval_strategy=DenseVectorStrategy(), + embedding_service=embeddings, + client=sync_client, + ) + + store.add_texts(texts=texts, vectors=embedding_vectors, metadatas=metadatas) + output = store.search(query="foo1", k=1) + assert [doc["_source"]["text_field"] for doc in output] == ["foo1"] + assert [doc["_source"]["metadata"]["page"] for doc in output] == [0] + + def test_search_with_metadata(self, sync_client: Elasticsearch, index: str) -> None: + """Test end to end construction and search with metadata.""" + store = VectorStore( + index=index, + retrieval_strategy=DenseVectorStrategy(), + embedding_service=ConsistentFakeEmbeddings(), + client=sync_client, + ) + + texts = ["foo", "bar", "baz"] + metadatas = [{"page": i} for i in range(len(texts))] + store.add_texts(texts=texts, metadatas=metadatas) + + output = store.search(query="foo", k=1) + assert [doc["_source"]["text_field"] for doc in output] == ["foo"] + assert [doc["_source"]["metadata"]["page"] for doc in output] == [0] + + output = store.search(query="bar", k=1) + assert [doc["_source"]["text_field"] for doc in output] == ["bar"] + assert [doc["_source"]["metadata"]["page"] for doc in output] == [1] + + def test_search_with_filter(self, sync_client: Elasticsearch, index: str) -> None: + """Test end to end construction and search with metadata.""" + store = VectorStore( + index=index, + retrieval_strategy=DenseVectorStrategy(), + embedding_service=FakeEmbeddings(), + client=sync_client, + ) + + texts = ["foo", "foo", "foo"] + metadatas = [{"page": i} for i in range(len(texts))] + store.add_texts(texts=texts, metadatas=metadatas) + + def assert_query(query_body: dict, query: Optional[str]) -> dict: + assert query_body == { + "knn": { + "field": "vector_field", + "filter": [{"term": {"metadata.page": "1"}}], + "k": 3, + "num_candidates": 50, + "query_vector": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0], + } + } + return query_body + + output = store.search( + query="foo", + k=3, + filter=[{"term": {"metadata.page": "1"}}], + custom_query=assert_query, + ) + assert [doc["_source"]["text_field"] for doc in output] == ["foo"] + assert [doc["_source"]["metadata"]["page"] for doc in output] == [1] + + def test_search_script_score(self, sync_client: Elasticsearch, index: str) -> None: + """Test end to end construction and search with metadata.""" + store = VectorStore( + index=index, + retrieval_strategy=DenseVectorScriptScoreStrategy(), + embedding_service=FakeEmbeddings(), + client=sync_client, + ) + + texts = ["foo", "bar", "baz"] + store.add_texts(texts) + + expected_query = { + "query": { + "script_score": { + "query": {"match_all": {}}, + "script": { + "source": "cosineSimilarity(params.query_vector, 'vector_field') + 1.0", # noqa: E501 + "params": { + "query_vector": [ + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 0.0, + ] + }, + }, + } + } + } + + def assert_query(query_body: dict, query: Optional[str]) -> dict: + assert query_body == expected_query + return query_body + + output = store.search(query="foo", k=1, custom_query=assert_query) + assert [doc["_source"]["text_field"] for doc in output] == ["foo"] + + def test_search_script_score_with_filter( + self, sync_client: Elasticsearch, index: str + ) -> None: + """Test end to end construction and search with metadata.""" + store = VectorStore( + index=index, + retrieval_strategy=DenseVectorScriptScoreStrategy(), + embedding_service=FakeEmbeddings(), + client=sync_client, + ) + + texts = ["foo", "bar", "baz"] + metadatas = [{"page": i} for i in range(len(texts))] + store.add_texts(texts=texts, metadatas=metadatas) + + def assert_query(query_body: dict, query: Optional[str]) -> dict: + expected_query = { + "query": { + "script_score": { + "query": {"bool": {"filter": [{"term": {"metadata.page": 0}}]}}, + "script": { + "source": "cosineSimilarity(params.query_vector, 'vector_field') + 1.0", # noqa: E501 + "params": { + "query_vector": [ + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 0.0, + ] + }, + }, + } + } + } + assert query_body == expected_query + return query_body + + output = store.search( + query="foo", + k=1, + custom_query=assert_query, + filter=[{"term": {"metadata.page": 0}}], + ) + assert [doc["_source"]["text_field"] for doc in output] == ["foo"] + assert [doc["_source"]["metadata"]["page"] for doc in output] == [0] + + def test_search_script_score_distance_dot_product( + self, sync_client: Elasticsearch, index: str + ) -> None: + """Test end to end construction and search with metadata.""" + store = VectorStore( + index=index, + retrieval_strategy=DenseVectorScriptScoreStrategy( + distance=DistanceMetric.DOT_PRODUCT, + ), + embedding_service=FakeEmbeddings(), + client=sync_client, + ) + + texts = ["foo", "bar", "baz"] + store.add_texts(texts) + + def assert_query(query_body: dict, query: Optional[str]) -> dict: + assert query_body == { + "query": { + "script_score": { + "query": {"match_all": {}}, + "script": { + "source": """ + double value = dotProduct(params.query_vector, 'vector_field'); + return sigmoid(1, Math.E, -value); + """, + "params": { + "query_vector": [ + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 0.0, + ] + }, + }, + } + } + } + return query_body + + output = store.search(query="foo", k=1, custom_query=assert_query) + assert [doc["_source"]["text_field"] for doc in output] == ["foo"] + + def test_search_knn_with_hybrid_search( + self, sync_client: Elasticsearch, index: str + ) -> None: + """Test end to end construction and search with metadata.""" + store = VectorStore( + index=index, + retrieval_strategy=DenseVectorStrategy(hybrid=True), + embedding_service=FakeEmbeddings(), + client=sync_client, + ) + + texts = ["foo", "bar", "baz"] + store.add_texts(texts) + + def assert_query(query_body: dict, query: Optional[str]) -> dict: + assert query_body == { + "knn": { + "field": "vector_field", + "filter": [], + "k": 1, + "num_candidates": 50, + "query_vector": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0], + }, + "query": { + "bool": { + "filter": [], + "must": [{"match": {"text_field": {"query": "foo"}}}], + } + }, + "rank": {"rrf": {}}, + } + return query_body + + output = store.search(query="foo", k=1, custom_query=assert_query) + assert [doc["_source"]["text_field"] for doc in output] == ["foo"] + + def test_search_knn_with_hybrid_search_rrf( + self, sync_client: Elasticsearch, index: str + ) -> None: + """Test end to end construction and rrf hybrid search with metadata.""" + texts = ["foo", "bar", "baz"] + + def assert_query( + query_body: dict, + query: Optional[str], + expected_rrf: Union[dict, bool], + ) -> dict: + cmp_query_body = { + "knn": { + "field": "vector_field", + "filter": [], + "k": 3, + "num_candidates": 50, + "query_vector": [ + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 0.0, + ], + }, + "query": { + "bool": { + "filter": [], + "must": [{"match": {"text_field": {"query": "foo"}}}], + } + }, + } + + if isinstance(expected_rrf, dict): + cmp_query_body["rank"] = {"rrf": expected_rrf} + elif isinstance(expected_rrf, bool) and expected_rrf is True: + cmp_query_body["rank"] = {"rrf": {}} + + assert query_body == cmp_query_body + + return query_body + + # 1. check query_body is okay + rrf_test_cases: List[Union[dict, bool]] = [ + True, + False, + {"rank_constant": 1, "window_size": 5}, + ] + for rrf_test_case in rrf_test_cases: + store = VectorStore( + index=index, + retrieval_strategy=DenseVectorStrategy(hybrid=True, rrf=rrf_test_case), + embedding_service=FakeEmbeddings(), + client=sync_client, + ) + store.add_texts(texts) + + ## without fetch_k parameter + output = store.search( + query="foo", + k=3, + custom_query=partial(assert_query, expected_rrf=rrf_test_case), + ) + + # 2. check query result is okay + es_output = store.client.search( + index=index, + query={ + "bool": { + "filter": [], + "must": [{"match": {"text_field": {"query": "foo"}}}], + } + }, + knn={ + "field": "vector_field", + "filter": [], + "k": 3, + "num_candidates": 50, + "query_vector": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0], + }, + size=3, + rank={"rrf": {"rank_constant": 1, "window_size": 5}}, + ) + + assert [o["_source"]["text_field"] for o in output] == [ + e["_source"]["text_field"] for e in es_output["hits"]["hits"] + ] + + # 3. check rrf default option is okay + store = VectorStore( + index=f"{index}_default", + retrieval_strategy=DenseVectorStrategy(hybrid=True), + embedding_service=FakeEmbeddings(), + client=sync_client, + ) + store.add_texts(texts) + + ## with fetch_k parameter + output = store.search( + query="foo", + k=3, + num_candidates=50, + custom_query=partial(assert_query, expected_rrf={}), + ) + + def test_search_knn_with_custom_query_fn( + self, sync_client: Elasticsearch, index: str + ) -> None: + """test that custom query function is called + with the query string and query body""" + store = VectorStore( + index=index, + retrieval_strategy=DenseVectorStrategy(), + embedding_service=FakeEmbeddings(), + client=sync_client, + ) + + def my_custom_query(query_body: dict, query: Optional[str]) -> dict: + assert query == "foo" + assert query_body == { + "knn": { + "field": "vector_field", + "filter": [], + "k": 1, + "num_candidates": 50, + "query_vector": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0], + } + } + return {"query": {"match": {"text_field": {"query": "bar"}}}} + + """Test end to end construction and search with metadata.""" + texts = ["foo", "bar", "baz"] + store.add_texts(texts) + + output = store.search(query="foo", k=1, custom_query=my_custom_query) + assert [doc["_source"]["text_field"] for doc in output] == ["bar"] + + def test_search_with_knn_infer_instack( + self, sync_client: Elasticsearch, index: str + ) -> None: + """test end to end with knn retrieval strategy and inference in-stack""" + + if not model_is_deployed(sync_client, TRANSFORMER_MODEL_ID): + pytest.skip( + f"{TRANSFORMER_MODEL_ID} model not deployed in ML Node skipping test" + ) + + text_field = "text_field" + + store = VectorStore( + index=index, + retrieval_strategy=DenseVectorStrategy( + model_id="sentence-transformers__all-minilm-l6-v2" + ), + client=sync_client, + ) + + # setting up the pipeline for inference + store.client.ingest.put_pipeline( + id="test_pipeline", + processors=[ + { + "inference": { + "model_id": TRANSFORMER_MODEL_ID, + "field_map": {"query_field": text_field}, + "target_field": "vector_query_field", + } + } + ], + ) + + # creating a new index with the pipeline, + # not relying on langchain to create the index + store.client.indices.create( + index=index, + mappings={ + "properties": { + text_field: {"type": "text_field"}, + "vector_query_field": { + "properties": { + "predicted_value": { + "type": "dense_vector", + "dims": 384, + "index": True, + "similarity": "l2_norm", + } + } + }, + } + }, + settings={"index": {"default_pipeline": "test_pipeline"}}, + ) + + # adding documents to the index + texts = ["foo", "bar", "baz"] + + for i, text in enumerate(texts): + store.client.create( + index=index, + id=str(i), + document={text_field: text, "metadata": {}}, + ) + + store.client.indices.refresh(index=index) + + def assert_query(query_body: dict, query: Optional[str]) -> dict: + assert query_body == { + "knn": { + "filter": [], + "field": "vector_query_field.predicted_value", + "k": 1, + "num_candidates": 50, + "query_vector_builder": { + "text_embedding": { + "model_id": TRANSFORMER_MODEL_ID, + "model_text": "foo", + } + }, + } + } + return query_body + + output = store.search(query="foo", k=1, custom_query=assert_query) + assert [doc["_source"]["text_field"] for doc in output] == ["foo"] + + output = store.search(query="bar", k=1) + assert [doc["_source"]["text_field"] for doc in output] == ["bar"] + + def test_search_with_sparse_infer_instack( + self, sync_client: Elasticsearch, index: str + ) -> None: + """test end to end with sparse retrieval strategy and inference in-stack""" + + if not model_is_deployed(sync_client, ELSER_MODEL_ID): + reason = f"{ELSER_MODEL_ID} model not deployed in ML Node, skipping test" + pytest.skip(reason) + + store = VectorStore( + index=index, + retrieval_strategy=SparseVectorStrategy(model_id=ELSER_MODEL_ID), + client=sync_client, + ) + + texts = ["foo", "bar", "baz"] + store.add_texts(texts) + + output = store.search(query="foo", k=1) + assert [doc["_source"]["text_field"] for doc in output] == ["foo"] + + def test_deployed_model_check_fails_semantic( + self, sync_client: Elasticsearch, index: str + ) -> None: + """test that exceptions are raised if a specified model is not deployed""" + with pytest.raises(NotFoundError): + store = VectorStore( + index=index, + retrieval_strategy=DenseVectorStrategy( + model_id="non-existing model ID" + ), + client=sync_client, + ) + store.add_texts(["foo", "bar", "baz"]) + + def test_search_bm25(self, sync_client: Elasticsearch, index: str) -> None: + """Test end to end using the BM25Strategy retrieval strategy.""" + store = VectorStore( + index=index, + retrieval_strategy=BM25Strategy(), + client=sync_client, + ) + + texts = ["foo", "bar", "baz"] + store.add_texts(texts) + + def assert_query(query_body: dict, query: Optional[str]) -> dict: + assert query_body == { + "query": { + "bool": { + "must": [{"match": {"text_field": {"query": "foo"}}}], + "filter": [], + } + } + } + return query_body + + output = store.search(query="foo", k=1, custom_query=assert_query) + assert [doc["_source"]["text_field"] for doc in output] == ["foo"] + + def test_search_bm25_with_filter( + self, sync_client: Elasticsearch, index: str + ) -> None: + """Test end to using the BM25Strategy retrieval strategy with metadata.""" + store = VectorStore( + index=index, + retrieval_strategy=BM25Strategy(), + client=sync_client, + ) + + texts = ["foo", "foo", "foo"] + metadatas = [{"page": i} for i in range(len(texts))] + store.add_texts(texts=texts, metadatas=metadatas) + + def assert_query(query_body: dict, query: Optional[str]) -> dict: + assert query_body == { + "query": { + "bool": { + "must": [{"match": {"text_field": {"query": "foo"}}}], + "filter": [{"term": {"metadata.page": 1}}], + } + } + } + return query_body + + output = store.search( + query="foo", + k=3, + custom_query=assert_query, + filter=[{"term": {"metadata.page": 1}}], + ) + assert [doc["_source"]["text_field"] for doc in output] == ["foo"] + assert [doc["_source"]["metadata"]["page"] for doc in output] == [1] + + def test_delete(self, sync_client: Elasticsearch, index: str) -> None: + """Test delete methods from vector store.""" + store = VectorStore( + index=index, + retrieval_strategy=DenseVectorStrategy(), + embedding_service=FakeEmbeddings(), + client=sync_client, + ) + + texts = ["foo", "bar", "baz", "gni"] + metadatas = [{"page": i} for i in range(len(texts))] + ids = store.add_texts(texts=texts, metadatas=metadatas) + + output = store.search(query="foo", k=10) + assert len(output) == 4 + + store.delete(ids=ids[1:3]) + output = store.search(query="foo", k=10) + assert len(output) == 2 + + store.delete(ids=["not-existing"]) + output = store.search(query="foo", k=10) + assert len(output) == 2 + + store.delete(ids=[ids[0]]) + output = store.search(query="foo", k=10) + assert len(output) == 1 + + store.delete(ids=[ids[3]]) + output = store.search(query="gni", k=10) + assert len(output) == 0 + + def test_indexing_exception_error( + self, + sync_client: Elasticsearch, + index: str, + caplog: pytest.LogCaptureFixture, + ) -> None: + """Test bulk exception logging is giving better hints.""" + store = VectorStore( + index=index, + retrieval_strategy=BM25Strategy(), + client=sync_client, + ) + + store.client.indices.create( + index=index, + mappings={"properties": {}}, + settings={"index": {"default_pipeline": "not-existing-pipeline"}}, + ) + + texts = ["foo"] + + with pytest.raises(BulkIndexError): + store.add_texts(texts) + + error_reason = "pipeline with id [not-existing-pipeline] does not exist" + log_message = f"First error reason: {error_reason}" + + assert log_message in caplog.text + + def test_user_agent_default( + self, sync_client_request_saving: Elasticsearch, index: str + ) -> None: + """Test to make sure the user-agent is set correctly.""" + store = VectorStore( + index=index, + retrieval_strategy=BM25Strategy(), + client=sync_client_request_saving, + ) + expected_pattern = r"^elasticsearch-py-vs/\d+\.\d+\.\d+$" + + got_agent = store.client._headers["User-Agent"] + assert ( + re.match(expected_pattern, got_agent) is not None + ), f"The user agent '{got_agent}' does not match the expected pattern." + + texts = ["foo", "bob", "baz"] + store.add_texts(texts) + + for request in store.client.transport.requests: # type: ignore + agent = request["headers"]["User-Agent"] + assert ( + re.match(expected_pattern, agent) is not None + ), f"The user agent '{agent}' does not match the expected pattern." + + def test_user_agent_custom( + self, sync_client_request_saving: Elasticsearch, index: str + ) -> None: + """Test to make sure the user-agent is set correctly.""" + user_agent = "this is THE user_agent!" + + store = VectorStore( + user_agent=user_agent, + index=index, + retrieval_strategy=BM25Strategy(), + client=sync_client_request_saving, + ) + + assert store.client._headers["User-Agent"] == user_agent + + texts = ["foo", "bob", "baz"] + store.add_texts(texts) + + for request in store.client.transport.requests: # type: ignore + assert request["headers"]["User-Agent"] == user_agent + + def test_bulk_args(self, sync_client_request_saving: Any, index: str) -> None: + """Test to make sure the bulk arguments work as expected.""" + store = VectorStore( + index=index, + retrieval_strategy=BM25Strategy(), + client=sync_client_request_saving, + ) + + texts = ["foo", "bob", "baz"] + store.add_texts(texts, bulk_kwargs={"chunk_size": 1}) + + # 1 for index exist, 1 for index create, 3 to index docs + assert len(store.client.transport.requests) == 5 # type: ignore + + def test_max_marginal_relevance_search( + self, sync_client: Elasticsearch, index: str + ) -> None: + """Test max marginal relevance search.""" + texts = ["foo", "bar", "baz"] + vector_field = "vector_field" + text_field = "text_field" + embedding_service = ConsistentFakeEmbeddings() + store = VectorStore( + index=index, + retrieval_strategy=DenseVectorScriptScoreStrategy(), + embedding_service=embedding_service, + vector_field=vector_field, + text_field=text_field, + client=sync_client, + ) + store.add_texts(texts) + + mmr_output = store.max_marginal_relevance_search( + embedding_service=embedding_service, + query=texts[0], + vector_field=vector_field, + k=3, + num_candidates=3, + ) + sim_output = store.search(query=texts[0], k=3) + assert mmr_output == sim_output + + mmr_output = store.max_marginal_relevance_search( + embedding_service=embedding_service, + query=texts[0], + vector_field=vector_field, + k=2, + num_candidates=3, + ) + assert len(mmr_output) == 2 + assert mmr_output[0]["_source"][text_field] == texts[0] + assert mmr_output[1]["_source"][text_field] == texts[1] + + mmr_output = store.max_marginal_relevance_search( + embedding_service=embedding_service, + query=texts[0], + vector_field=vector_field, + k=2, + num_candidates=3, + lambda_mult=0.1, # more diversity + ) + assert len(mmr_output) == 2 + assert mmr_output[0]["_source"][text_field] == texts[0] + assert mmr_output[1]["_source"][text_field] == texts[2] + + # if fetch_k < k, then the output will be less than k + mmr_output = store.max_marginal_relevance_search( + embedding_service=embedding_service, + query=texts[0], + vector_field=vector_field, + k=3, + num_candidates=2, + ) + assert len(mmr_output) == 2 + + def test_metadata_mapping(self, sync_client: Elasticsearch, index: str) -> None: + """Test that the metadata mapping is applied.""" + test_mappings = { + "my_field": {"type": "keyword"}, + "another_field": {"type": "text"}, + } + store = VectorStore( + index=index, + retrieval_strategy=DenseVectorStrategy(distance=DistanceMetric.COSINE), + embedding_service=FakeEmbeddings(), + num_dimensions=10, + client=sync_client, + metadata_mappings=test_mappings, + ) + + texts = ["foo", "foo", "foo"] + metadatas = [{"my_field": str(i)} for i in range(len(texts))] + store.add_texts(texts=texts, metadatas=metadatas) + + mapping_response = sync_client.indices.get_mapping(index=index) + mapping_properties = mapping_response[index]["mappings"]["properties"] + assert mapping_properties["vector_field"] == { + "type": "dense_vector", + "dims": 10, + "index": True, + "similarity": "cosine", + } + + assert "metadata" in mapping_properties + for key, val in test_mappings.items(): + assert mapping_properties["metadata"]["properties"][key] == val diff --git a/test_elasticsearch/test_strategies.py b/test_elasticsearch/test_strategies.py new file mode 100644 index 000000000..36ce63e9f --- /dev/null +++ b/test_elasticsearch/test_strategies.py @@ -0,0 +1,90 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest + +from elasticsearch.helpers.vectorstore import ( + DenseVectorScriptScoreStrategy, + DenseVectorStrategy, + SparseVectorStrategy, +) + + +def test_sparse_vector_strategy_raises_errors(): + strategy = SparseVectorStrategy("my_model_id") + + with pytest.raises(ValueError): + # missing query + strategy.es_query( + query=None, + query_vector=None, + text_field="text_field", + vector_field="vector_field", + k=10, + num_candidates=20, + filter=[], + ) + + with pytest.raises(ValueError): + # query vector not allowed + strategy.es_query( + query="hi", + query_vector=[1, 2, 3], + text_field="text_field", + vector_field="vector_field", + k=10, + num_candidates=20, + filter=[], + ) + + +def test_dense_vector_strategy_raises_error(): + with pytest.raises(ValueError): + # unknown distance + DenseVectorStrategy(hybrid=True, text_field=None) + + with pytest.raises(ValueError): + # unknown distance + DenseVectorStrategy(distance="unknown distance").es_mappings_settings( + text_field="text_field", vector_field="vector_field", num_dimensions=10 + ) + + +def test_dense_vector_script_score_strategy_raises_error(): + with pytest.raises(ValueError): + # missing query vector + DenseVectorScriptScoreStrategy().es_query( + query=None, + query_vector=None, + text_field="text_field", + vector_field="vector_field", + k=10, + num_candidates=20, + filter=[], + ) + + with pytest.raises(ValueError): + # unknown distance + DenseVectorScriptScoreStrategy(distance="unknown distance").es_query( + query=None, + query_vector=[1, 2, 3], + text_field="text_field", + vector_field="vector_field", + k=10, + num_candidates=20, + filter=[], + ) diff --git a/utils/run-unasync.py b/utils/run-unasync.py index 122ba621f..4a943c10f 100644 --- a/utils/run-unasync.py +++ b/utils/run-unasync.py @@ -16,42 +16,84 @@ # under the License. import os +import subprocess +from glob import glob from pathlib import Path import unasync -def main(): - # Unasync all the generated async code - additional_replacements = { - # We want to rewrite to 'Transport' instead of 'SyncTransport', etc - "AsyncTransport": "Transport", - "AsyncElasticsearch": "Elasticsearch", - # We don't want to rewrite this class - "AsyncSearchClient": "AsyncSearchClient", - # Handling typing.Awaitable[...] isn't done yet by unasync. - "_TYPE_ASYNC_SNIFF_CALLBACK": "_TYPE_SYNC_SNIFF_CALLBACK", - } - rules = [ - unasync.Rule( - fromdir="/elasticsearch/_async/client/", - todir="/elasticsearch/_sync/client/", - additional_replacements=additional_replacements, - ), - ] +def cleanup(source_dir: Path, output_dir: Path, patterns: list[str]): + for file in glob("*.py", root_dir=source_dir): + path = output_dir / file + for pattern in patterns: + subprocess.check_call(["sed", "-i.bak", pattern, str(path)]) + subprocess.check_call(["rm", f"{path}.bak"]) + + +def run( + rule: unasync.Rule, + cleanup_patterns: list[str] = [], +): + root_dir = Path(__file__).absolute().parent.parent + source_dir = root_dir / rule.fromdir.lstrip("/") + output_dir = root_dir / rule.todir.lstrip("/") filepaths = [] - for root, _, filenames in os.walk( - Path(__file__).absolute().parent.parent / "elasticsearch/_async" - ): + for root, _, filenames in os.walk(source_dir): for filename in filenames: - if filename.rpartition(".")[-1] in ( + if filename.rpartition(".")[-1] in { "py", "pyi", - ) and not filename.startswith("utils.py"): + } and not filename.startswith("utils.py"): filepaths.append(os.path.join(root, filename)) - unasync.unasync_files(filepaths, rules) + unasync.unasync_files(filepaths, [rule]) + + if cleanup_patterns: + cleanup(source_dir, output_dir, cleanup_patterns) + + +def main(): + run( + rule=unasync.Rule( + fromdir="/elasticsearch/_async/client/", + todir="/elasticsearch/_sync/client/", + additional_replacements={ + # We want to rewrite to 'Transport' instead of 'SyncTransport', etc + "AsyncTransport": "Transport", + "AsyncElasticsearch": "Elasticsearch", + # We don't want to rewrite this class + "AsyncSearchClient": "AsyncSearchClient", + # Handling typing.Awaitable[...] isn't done yet by unasync. + "_TYPE_ASYNC_SNIFF_CALLBACK": "_TYPE_SYNC_SNIFF_CALLBACK", + }, + ), + ) + + run( + rule=unasync.Rule( + fromdir="elasticsearch/helpers/vectorstore/_async/", + todir="elasticsearch/helpers/vectorstore/_sync/", + additional_replacements={ + "AsyncBM25Strategy": "BM25Strategy", + "AsyncDenseVectorStrategy": "DenseVectorStrategy", + "AsyncDenseVectorScriptScoreStrategy": "DenseVectorScriptScoreStrategy", + "AsyncElasticsearch": "Elasticsearch", + "AsyncElasticsearchEmbeddings": "ElasticsearchEmbeddings", + "AsyncEmbeddingService": "EmbeddingService", + "AsyncRetrievalStrategy": "RetrievalStrategy", + "AsyncSparseVectorStrategy": "SparseVectorStrategy", + "AsyncTransport": "Transport", + "AsyncVectorStore": "VectorStore", + "async_bulk": "bulk", + "_async": "_sync", + }, + ), + cleanup_patterns=[ + "/^import asyncio$/d", + ], + ) if __name__ == "__main__":