diff --git a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py index 70402f1dc..f0f53781f 100644 --- a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py +++ b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py @@ -27,6 +27,9 @@ # all be mapped to scores ~1. BM25_SCALING_FACTOR = 8 +DEFAULT_SETTINGS = {"index.knn": True} +DEFAULT_MAX_CHUNK_BYTES = 100 * 1024 * 1024 + class OpenSearchDocumentStore: def __init__( @@ -34,24 +37,65 @@ def __init__( *, hosts: Optional[Hosts] = None, index: str = "default", + max_chunk_bytes: int = DEFAULT_MAX_CHUNK_BYTES, + embedding_dim: int = 768, + method: Optional[Dict[str, Any]] = None, + mappings: Optional[Dict[str, Any]] = None, + settings: Optional[Dict[str, Any]] = DEFAULT_SETTINGS, **kwargs, ): """ Creates a new OpenSearchDocumentStore instance. - For more information on connection parameters, see the [official OpenSearch documentation](https://opensearch.org/docs/latest/clients/python-low-level/#connecting-to-opensearch) + The ``embeddings_dim``, ``method``, ``mappings``, and ``settings`` arguments are only used if the index does not + exists and needs to be created. If the index already exists, its current configurations will be used. - For the full list of supported kwargs, see the [official OpenSearch reference](https://opensearch-project.github.io/opensearch-py/api-ref/clients/opensearch_client.html) + For more information on connection parameters, see the [official OpenSearch documentation](https://opensearch.org/docs/latest/clients/python-low-level/#connecting-to-opensearch) :param hosts: List of hosts running the OpenSearch client. Defaults to None :param index: Name of index in OpenSearch, if it doesn't exist it will be created. Defaults to "default" - :param **kwargs: Optional arguments that ``OpenSearch`` takes. + :param max_chunk_bytes: Maximum size of the requests in bytes. Defaults to 100MB + :param embedding_dim: Dimension of the embeddings. Defaults to 768 + :param method: The method definition of the underlying configuration of the approximate k-NN algorithm. Please + see the [official OpenSearch docs](https://opensearch.org/docs/latest/search-plugins/knn/knn-index/#method-definitions) + for more information. Defaults to None + :param mappings: The mapping of how the documents are stored and indexed. Please see the [official OpenSearch docs](https://opensearch.org/docs/latest/field-types/) + for more information. If None, it uses the embedding_dim and method arguments to create default mappings. + Defaults to None + :param settings: The settings of the index to be created. Please see the [official OpenSearch docs](https://opensearch.org/docs/latest/search-plugins/knn/knn-index/#index-settings) + for more information. Defaults to {"index.knn": True} + :param **kwargs: Optional arguments that ``OpenSearch`` takes. For the full list of supported kwargs, + see the [official OpenSearch reference](https://opensearch-project.github.io/opensearch-py/api-ref/clients/opensearch_client.html) """ self._client = None self._hosts = hosts self._index = index + self._max_chunk_bytes = max_chunk_bytes + self._embedding_dim = embedding_dim + self._method = method + self._mappings = mappings or self._get_default_mappings() + self._settings = settings self._kwargs = kwargs + def _get_default_mappings(self) -> Dict[str, Any]: + default_mappings: Dict[str, Any] = { + "properties": { + "embedding": {"type": "knn_vector", "index": True, "dimension": self._embedding_dim}, + "content": {"type": "text"}, + }, + "dynamic_templates": [ + { + "strings": { + "match_mapping_type": "string", + "mapping": {"type": "keyword"}, + } + } + ], + } + if self._method: + default_mappings["properties"]["embedding"]["method"] = self._method + return default_mappings + @property def client(self) -> OpenSearch: if not self._client: @@ -59,36 +103,17 @@ def client(self) -> OpenSearch: # Check client connection, this will raise if not connected self._client.info() # type:ignore + if self._client.indices.exists(index=self._index): # type:ignore + logger.debug( + "The index '%s' already exists. The `embedding_dim`, `method`, `mappings`, and " + "`settings` values will be ignored.", + self._index, + ) + else: # Create the index if it doesn't exist - if not self._client.indices.exists(index=self._index): # type:ignore - # configure fallback mapping for the embedding field - method = self._kwargs.get("method", None) - embedding_dim = self._kwargs.get("embedding_dim", 768) - default_mappings: Dict[str, Any] = { - "properties": { - "embedding": {"type": "knn_vector", "index": True, "dimension": embedding_dim}, - "content": {"type": "text"}, - }, - "dynamic_templates": [ - { - "strings": { - "match_mapping_type": "string", - "mapping": { - "type": "keyword", - }, - } - } - ], - } - if method: - default_mappings["properties"]["embedding"]["method"] = method - - body = { - "mappings": self._kwargs.get("mappings", default_mappings), - "settings": self._kwargs.get("settings", {"index.knn": True}), - } - self._client.indices.create(index=self._index, body=body) # type:ignore + body = {"mappings": self._mappings, "settings": self._settings} + self._client.indices.create(index=self._index, body=body) # type:ignore return self._client def to_dict(self) -> Dict[str, Any]: @@ -105,6 +130,11 @@ def to_dict(self) -> Dict[str, Any]: self, hosts=self._hosts, index=self._index, + max_chunk_bytes=self._max_chunk_bytes, + embedding_dim=self._embedding_dim, + method=self._method, + mappings=self._mappings, + settings=self._settings, **self._kwargs, ) @@ -178,6 +208,7 @@ def write_documents(self, documents: List[Document], policy: DuplicatePolicy = D refresh="wait_for", index=self._index, raise_on_error=False, + max_chunk_bytes=self._max_chunk_bytes, ) if errors: @@ -234,6 +265,7 @@ def delete_documents(self, document_ids: List[str]) -> None: refresh="wait_for", index=self._index, raise_on_error=False, + max_chunk_bytes=self._max_chunk_bytes, ) def _bm25_retrieval( diff --git a/integrations/opensearch/tests/test_bm25_retriever.py b/integrations/opensearch/tests/test_bm25_retriever.py index 3f84f41a9..71fc19c6a 100644 --- a/integrations/opensearch/tests/test_bm25_retriever.py +++ b/integrations/opensearch/tests/test_bm25_retriever.py @@ -6,6 +6,7 @@ from haystack.dataclasses import Document from haystack_integrations.components.retrievers.opensearch import OpenSearchBM25Retriever from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore +from haystack_integrations.document_stores.opensearch.document_store import DEFAULT_MAX_CHUNK_BYTES def test_init_default(): @@ -27,8 +28,21 @@ def test_to_dict(_mock_opensearch_client): "init_parameters": { "document_store": { "init_parameters": { + "embedding_dim": 768, "hosts": "some fake host", "index": "default", + "mappings": { + "dynamic_templates": [ + {"strings": {"mapping": {"type": "keyword"}, "match_mapping_type": "string"}} + ], + "properties": { + "content": {"type": "text"}, + "embedding": {"dimension": 768, "index": True, "type": "knn_vector"}, + }, + }, + "max_chunk_bytes": DEFAULT_MAX_CHUNK_BYTES, + "method": None, + "settings": {"index.knn": True}, }, "type": "haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore", }, diff --git a/integrations/opensearch/tests/test_document_store.py b/integrations/opensearch/tests/test_document_store.py index e3557ba59..283c6a69a 100644 --- a/integrations/opensearch/tests/test_document_store.py +++ b/integrations/opensearch/tests/test_document_store.py @@ -11,6 +11,7 @@ from haystack.document_stores.types import DuplicatePolicy from haystack.testing.document_store import DocumentStoreBaseTests from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore +from haystack_integrations.document_stores.opensearch.document_store import DEFAULT_MAX_CHUNK_BYTES from opensearchpy.exceptions import RequestError @@ -21,8 +22,19 @@ def test_to_dict(_mock_opensearch_client): assert res == { "type": "haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore", "init_parameters": { + "embedding_dim": 768, "hosts": "some hosts", "index": "default", + "mappings": { + "dynamic_templates": [{"strings": {"mapping": {"type": "keyword"}, "match_mapping_type": "string"}}], + "properties": { + "content": {"type": "text"}, + "embedding": {"dimension": 768, "index": True, "type": "knn_vector"}, + }, + }, + "max_chunk_bytes": DEFAULT_MAX_CHUNK_BYTES, + "method": None, + "settings": {"index.knn": True}, }, } @@ -31,14 +43,29 @@ def test_to_dict(_mock_opensearch_client): def test_from_dict(_mock_opensearch_client): data = { "type": "haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore", - "init_parameters": { - "hosts": "some hosts", - "index": "default", - }, + "init_parameters": {"hosts": "some hosts", "index": "default", "max_chunk_bytes": 1000, "embedding_dim": 1536}, } document_store = OpenSearchDocumentStore.from_dict(data) assert document_store._hosts == "some hosts" assert document_store._index == "default" + assert document_store._max_chunk_bytes == 1000 + assert document_store._embedding_dim == 1536 + assert document_store._method is None + assert document_store._mappings == { + "properties": { + "embedding": {"type": "knn_vector", "index": True, "dimension": 1536}, + "content": {"type": "text"}, + }, + "dynamic_templates": [ + { + "strings": { + "match_mapping_type": "string", + "mapping": {"type": "keyword"}, + } + } + ], + } + assert document_store._settings == {"index.knn": True} @patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch") @@ -47,6 +74,17 @@ def test_init_is_lazy(_mock_opensearch_client): _mock_opensearch_client.assert_not_called() +@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch") +def test_get_default_mappings(_mock_opensearch_client): + store = OpenSearchDocumentStore(hosts="testhost", embedding_dim=1536, method={"name": "hnsw"}) + assert store._mappings["properties"]["embedding"] == { + "type": "knn_vector", + "index": True, + "dimension": 1536, + "method": {"name": "hnsw"}, + } + + @pytest.mark.integration class TestDocumentStore(DocumentStoreBaseTests): """ @@ -339,3 +377,10 @@ def test_write_documents_with_badly_formatted_bulk_errors(self, mock_bulk, docum with pytest.raises(DocumentStoreError) as e: document_store.write_documents([Document(content="Hello world")]) e.match(f"{error}") + + @patch("haystack_integrations.document_stores.opensearch.document_store.bulk") + def test_write_documents_max_chunk_bytes(self, mock_bulk, document_store): + mock_bulk.return_value = (1, []) + document_store.write_documents([Document(content="Hello world")]) + + assert mock_bulk.call_args.kwargs["max_chunk_bytes"] == DEFAULT_MAX_CHUNK_BYTES diff --git a/integrations/opensearch/tests/test_embedding_retriever.py b/integrations/opensearch/tests/test_embedding_retriever.py index 0190ca208..c1015ca33 100644 --- a/integrations/opensearch/tests/test_embedding_retriever.py +++ b/integrations/opensearch/tests/test_embedding_retriever.py @@ -6,6 +6,7 @@ from haystack.dataclasses import Document from haystack_integrations.components.retrievers.opensearch import OpenSearchEmbeddingRetriever from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore +from haystack_integrations.document_stores.opensearch.document_store import DEFAULT_MAX_CHUNK_BYTES def test_init_default(): @@ -27,8 +28,36 @@ def test_to_dict(_mock_opensearch_client): "init_parameters": { "document_store": { "init_parameters": { + "embedding_dim": 768, "hosts": "some fake host", "index": "default", + "mappings": { + "dynamic_templates": [ + { + "strings": { + "mapping": { + "type": "keyword", + }, + "match_mapping_type": "string", + }, + }, + ], + "properties": { + "content": { + "type": "text", + }, + "embedding": { + "dimension": 768, + "index": True, + "type": "knn_vector", + }, + }, + }, + "max_chunk_bytes": DEFAULT_MAX_CHUNK_BYTES, + "method": None, + "settings": { + "index.knn": True, + }, }, "type": "haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore", },