Skip to content

Commit

Permalink
feat: Improve OpenSearchDocumentStore.__init__ arguments (#739)
Browse files Browse the repository at this point in the history
* add max_chunk_bytes parameter to OpenSearchDocumentStore init

* more documentation in opensearch init

* fix tests

* use `max_chunk_bytes` in delete_documents

* re added type ignore

* add new kwargs to to_dict

* test default mappings

* Update document_store.py

* restore C++

---------

Co-authored-by: Massimiliano Pippi <[email protected]>
  • Loading branch information
EdAbati and masci authored May 27, 2024
1 parent 51d0be0 commit 257f992
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,68 +27,93 @@
# all be mapped to scores ~1.
BM25_SCALING_FACTOR = 8

DEFAULT_SETTINGS = {"index.knn": True}
DEFAULT_MAX_CHUNK_BYTES = 100 * 1024 * 1024


class OpenSearchDocumentStore:
def __init__(
self,
*,
hosts: Optional[Hosts] = None,
index: str = "default",
max_chunk_bytes: int = DEFAULT_MAX_CHUNK_BYTES,
embedding_dim: int = 768,
method: Optional[Dict[str, Any]] = None,
mappings: Optional[Dict[str, Any]] = None,
settings: Optional[Dict[str, Any]] = DEFAULT_SETTINGS,
**kwargs,
):
"""
Creates a new OpenSearchDocumentStore instance.
For more information on connection parameters, see the [official OpenSearch documentation](https://opensearch.org/docs/latest/clients/python-low-level/#connecting-to-opensearch)
The ``embeddings_dim``, ``method``, ``mappings``, and ``settings`` arguments are only used if the index does not
exists and needs to be created. If the index already exists, its current configurations will be used.
For the full list of supported kwargs, see the [official OpenSearch reference](https://opensearch-project.github.io/opensearch-py/api-ref/clients/opensearch_client.html)
For more information on connection parameters, see the [official OpenSearch documentation](https://opensearch.org/docs/latest/clients/python-low-level/#connecting-to-opensearch)
:param hosts: List of hosts running the OpenSearch client. Defaults to None
:param index: Name of index in OpenSearch, if it doesn't exist it will be created. Defaults to "default"
:param **kwargs: Optional arguments that ``OpenSearch`` takes.
:param max_chunk_bytes: Maximum size of the requests in bytes. Defaults to 100MB
:param embedding_dim: Dimension of the embeddings. Defaults to 768
:param method: The method definition of the underlying configuration of the approximate k-NN algorithm. Please
see the [official OpenSearch docs](https://opensearch.org/docs/latest/search-plugins/knn/knn-index/#method-definitions)
for more information. Defaults to None
:param mappings: The mapping of how the documents are stored and indexed. Please see the [official OpenSearch docs](https://opensearch.org/docs/latest/field-types/)
for more information. If None, it uses the embedding_dim and method arguments to create default mappings.
Defaults to None
:param settings: The settings of the index to be created. Please see the [official OpenSearch docs](https://opensearch.org/docs/latest/search-plugins/knn/knn-index/#index-settings)
for more information. Defaults to {"index.knn": True}
:param **kwargs: Optional arguments that ``OpenSearch`` takes. For the full list of supported kwargs,
see the [official OpenSearch reference](https://opensearch-project.github.io/opensearch-py/api-ref/clients/opensearch_client.html)
"""
self._client = None
self._hosts = hosts
self._index = index
self._max_chunk_bytes = max_chunk_bytes
self._embedding_dim = embedding_dim
self._method = method
self._mappings = mappings or self._get_default_mappings()
self._settings = settings
self._kwargs = kwargs

def _get_default_mappings(self) -> Dict[str, Any]:
default_mappings: Dict[str, Any] = {
"properties": {
"embedding": {"type": "knn_vector", "index": True, "dimension": self._embedding_dim},
"content": {"type": "text"},
},
"dynamic_templates": [
{
"strings": {
"match_mapping_type": "string",
"mapping": {"type": "keyword"},
}
}
],
}
if self._method:
default_mappings["properties"]["embedding"]["method"] = self._method
return default_mappings

@property
def client(self) -> OpenSearch:
if not self._client:
self._client = OpenSearch(self._hosts, **self._kwargs)
# Check client connection, this will raise if not connected
self._client.info() # type:ignore

if self._client.indices.exists(index=self._index): # type:ignore
logger.debug(
"The index '%s' already exists. The `embedding_dim`, `method`, `mappings`, and "
"`settings` values will be ignored.",
self._index,
)
else:
# Create the index if it doesn't exist
if not self._client.indices.exists(index=self._index): # type:ignore
# configure fallback mapping for the embedding field
method = self._kwargs.get("method", None)
embedding_dim = self._kwargs.get("embedding_dim", 768)
default_mappings: Dict[str, Any] = {
"properties": {
"embedding": {"type": "knn_vector", "index": True, "dimension": embedding_dim},
"content": {"type": "text"},
},
"dynamic_templates": [
{
"strings": {
"match_mapping_type": "string",
"mapping": {
"type": "keyword",
},
}
}
],
}
if method:
default_mappings["properties"]["embedding"]["method"] = method

body = {
"mappings": self._kwargs.get("mappings", default_mappings),
"settings": self._kwargs.get("settings", {"index.knn": True}),
}
self._client.indices.create(index=self._index, body=body) # type:ignore
body = {"mappings": self._mappings, "settings": self._settings}

self._client.indices.create(index=self._index, body=body) # type:ignore
return self._client

def to_dict(self) -> Dict[str, Any]:
Expand All @@ -105,6 +130,11 @@ def to_dict(self) -> Dict[str, Any]:
self,
hosts=self._hosts,
index=self._index,
max_chunk_bytes=self._max_chunk_bytes,
embedding_dim=self._embedding_dim,
method=self._method,
mappings=self._mappings,
settings=self._settings,
**self._kwargs,
)

Expand Down Expand Up @@ -178,6 +208,7 @@ def write_documents(self, documents: List[Document], policy: DuplicatePolicy = D
refresh="wait_for",
index=self._index,
raise_on_error=False,
max_chunk_bytes=self._max_chunk_bytes,
)

if errors:
Expand Down Expand Up @@ -234,6 +265,7 @@ def delete_documents(self, document_ids: List[str]) -> None:
refresh="wait_for",
index=self._index,
raise_on_error=False,
max_chunk_bytes=self._max_chunk_bytes,
)

def _bm25_retrieval(
Expand Down
14 changes: 14 additions & 0 deletions integrations/opensearch/tests/test_bm25_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from haystack.dataclasses import Document
from haystack_integrations.components.retrievers.opensearch import OpenSearchBM25Retriever
from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore
from haystack_integrations.document_stores.opensearch.document_store import DEFAULT_MAX_CHUNK_BYTES


def test_init_default():
Expand All @@ -27,8 +28,21 @@ def test_to_dict(_mock_opensearch_client):
"init_parameters": {
"document_store": {
"init_parameters": {
"embedding_dim": 768,
"hosts": "some fake host",
"index": "default",
"mappings": {
"dynamic_templates": [
{"strings": {"mapping": {"type": "keyword"}, "match_mapping_type": "string"}}
],
"properties": {
"content": {"type": "text"},
"embedding": {"dimension": 768, "index": True, "type": "knn_vector"},
},
},
"max_chunk_bytes": DEFAULT_MAX_CHUNK_BYTES,
"method": None,
"settings": {"index.knn": True},
},
"type": "haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore",
},
Expand Down
53 changes: 49 additions & 4 deletions integrations/opensearch/tests/test_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from haystack.document_stores.types import DuplicatePolicy
from haystack.testing.document_store import DocumentStoreBaseTests
from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore
from haystack_integrations.document_stores.opensearch.document_store import DEFAULT_MAX_CHUNK_BYTES
from opensearchpy.exceptions import RequestError


Expand All @@ -21,8 +22,19 @@ def test_to_dict(_mock_opensearch_client):
assert res == {
"type": "haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore",
"init_parameters": {
"embedding_dim": 768,
"hosts": "some hosts",
"index": "default",
"mappings": {
"dynamic_templates": [{"strings": {"mapping": {"type": "keyword"}, "match_mapping_type": "string"}}],
"properties": {
"content": {"type": "text"},
"embedding": {"dimension": 768, "index": True, "type": "knn_vector"},
},
},
"max_chunk_bytes": DEFAULT_MAX_CHUNK_BYTES,
"method": None,
"settings": {"index.knn": True},
},
}

Expand All @@ -31,14 +43,29 @@ def test_to_dict(_mock_opensearch_client):
def test_from_dict(_mock_opensearch_client):
data = {
"type": "haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore",
"init_parameters": {
"hosts": "some hosts",
"index": "default",
},
"init_parameters": {"hosts": "some hosts", "index": "default", "max_chunk_bytes": 1000, "embedding_dim": 1536},
}
document_store = OpenSearchDocumentStore.from_dict(data)
assert document_store._hosts == "some hosts"
assert document_store._index == "default"
assert document_store._max_chunk_bytes == 1000
assert document_store._embedding_dim == 1536
assert document_store._method is None
assert document_store._mappings == {
"properties": {
"embedding": {"type": "knn_vector", "index": True, "dimension": 1536},
"content": {"type": "text"},
},
"dynamic_templates": [
{
"strings": {
"match_mapping_type": "string",
"mapping": {"type": "keyword"},
}
}
],
}
assert document_store._settings == {"index.knn": True}


@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch")
Expand All @@ -47,6 +74,17 @@ def test_init_is_lazy(_mock_opensearch_client):
_mock_opensearch_client.assert_not_called()


@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch")
def test_get_default_mappings(_mock_opensearch_client):
store = OpenSearchDocumentStore(hosts="testhost", embedding_dim=1536, method={"name": "hnsw"})
assert store._mappings["properties"]["embedding"] == {
"type": "knn_vector",
"index": True,
"dimension": 1536,
"method": {"name": "hnsw"},
}


@pytest.mark.integration
class TestDocumentStore(DocumentStoreBaseTests):
"""
Expand Down Expand Up @@ -339,3 +377,10 @@ def test_write_documents_with_badly_formatted_bulk_errors(self, mock_bulk, docum
with pytest.raises(DocumentStoreError) as e:
document_store.write_documents([Document(content="Hello world")])
e.match(f"{error}")

@patch("haystack_integrations.document_stores.opensearch.document_store.bulk")
def test_write_documents_max_chunk_bytes(self, mock_bulk, document_store):
mock_bulk.return_value = (1, [])
document_store.write_documents([Document(content="Hello world")])

assert mock_bulk.call_args.kwargs["max_chunk_bytes"] == DEFAULT_MAX_CHUNK_BYTES
29 changes: 29 additions & 0 deletions integrations/opensearch/tests/test_embedding_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from haystack.dataclasses import Document
from haystack_integrations.components.retrievers.opensearch import OpenSearchEmbeddingRetriever
from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore
from haystack_integrations.document_stores.opensearch.document_store import DEFAULT_MAX_CHUNK_BYTES


def test_init_default():
Expand All @@ -27,8 +28,36 @@ def test_to_dict(_mock_opensearch_client):
"init_parameters": {
"document_store": {
"init_parameters": {
"embedding_dim": 768,
"hosts": "some fake host",
"index": "default",
"mappings": {
"dynamic_templates": [
{
"strings": {
"mapping": {
"type": "keyword",
},
"match_mapping_type": "string",
},
},
],
"properties": {
"content": {
"type": "text",
},
"embedding": {
"dimension": 768,
"index": True,
"type": "knn_vector",
},
},
},
"max_chunk_bytes": DEFAULT_MAX_CHUNK_BYTES,
"method": None,
"settings": {
"index.knn": True,
},
},
"type": "haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore",
},
Expand Down

0 comments on commit 257f992

Please sign in to comment.