diff --git a/Makefile b/Makefile index 18006fe7d1..9b53752218 100644 --- a/Makefile +++ b/Makefile @@ -310,6 +310,25 @@ test-python-universal-cassandra-no-cloud-providers: not test_snowflake" \ sdk/python/tests + test-python-universal-elasticsearch-online: + PYTHONPATH='.' \ + FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.elasticsearch_repo_configuration \ + PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.elasticsearch \ + python -m pytest -n 8 --integration \ + -k "not test_universal_cli and \ + not test_go_feature_server and \ + not test_feature_logging and \ + not test_reorder_columns and \ + not test_logged_features_validation and \ + not test_lambda_materialization_consistency and \ + not test_offline_write and \ + not test_push_features_to_offline_store and \ + not gcs_registry and \ + not s3_registry and \ + not test_universal_types and \ + not test_snowflake" \ + sdk/python/tests + test-python-universal: python -m pytest -n 8 --integration sdk/python/tests diff --git a/docs/reference/alpha-vector-database.md b/docs/reference/alpha-vector-database.md index 3b0c924d84..37d9b9cdf8 100644 --- a/docs/reference/alpha-vector-database.md +++ b/docs/reference/alpha-vector-database.md @@ -10,7 +10,7 @@ Below are supported vector databases and implemented features: | Vector Database | Retrieval | Indexing | |-----------------|-----------|----------| | Pgvector | [x] | [ ] | -| Elasticsearch | [ ] | [ ] | +| Elasticsearch | [x] | [x] | | Milvus | [ ] | [ ] | | Faiss | [ ] | [ ] | diff --git a/docs/reference/online-stores/elasticsearch.md b/docs/reference/online-stores/elasticsearch.md new file mode 100644 index 0000000000..bf6f9a58db --- /dev/null +++ b/docs/reference/online-stores/elasticsearch.md @@ -0,0 +1,125 @@ +# ElasticSearch online store (contrib) + +## Description + +The ElasticSearch online store provides support for materializing tabular feature values, as well as embedding feature vectors, into an ElasticSearch index for serving online features. \ +The embedding feature vectors are stored as dense vectors, and can be used for similarity search. More information on dense vectors can be found [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/dense-vector.html). + +## Getting started +In order to use this online store, you'll need to run `pip install 'feast[elasticsearch]'`. You can get started by then running `feast init -t elasticsearch`. + +## Example + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: local +online_store: + type: elasticsearch + host: ES_HOST + port: ES_PORT + user: ES_USERNAME + password: ES_PASSWORD + vector_len: 512 + write_batch_size: 1000 +``` +{% endcode %} + +The full set of configuration options is available in [ElasticsearchOnlineStoreConfig](https://rtd.feast.dev/en/master/#feast.infra.online_stores.contrib.elasticsearch.ElasticsearchOnlineStoreConfig). + +## Functionality Matrix + + +| | Postgres | +| :-------------------------------------------------------- | :------- | +| write feature values to the online store | yes | +| read feature values from the online store | yes | +| update infrastructure (e.g. tables) in the online store | yes | +| teardown infrastructure (e.g. tables) in the online store | yes | +| generate a plan of infrastructure changes | no | +| support for on-demand transforms | yes | +| readable by Python SDK | yes | +| readable by Java | no | +| readable by Go | no | +| support for entityless feature views | yes | +| support for concurrent writing to the same key | no | +| support for ttl (time to live) at retrieval | no | +| support for deleting expired data | no | +| collocated by feature view | yes | +| collocated by feature service | no | +| collocated by entity key | no | + +To compare this set of functionality against other online stores, please see the full [functionality matrix](overview.md#functionality-matrix). + +## Retrieving online document vectors + +The ElasticSearch online store supports retrieving document vectors for a given list of entity keys. The document vectors are returned as a dictionary where the key is the entity key and the value is the document vector. The document vector is a dense vector of floats. + +{% code title="python" %} +```python +from feast import FeatureStore + +feature_store = FeatureStore(repo_path="feature_store.yaml") + +query_vector = [1.0, 2.0, 3.0, 4.0, 5.0] +top_k = 5 + +# Retrieve the top k closest features to the query vector + +feature_values = feature_store.retrieve_online_documents( + feature="my_feature", + query=query_vector, + top_k=top_k +) +``` +{% endcode %} + +## Indexing +Currently, the indexing mapping in the ElasticSearch online store is configured as: + +{% code title="indexing_mapping" %} +```json +"properties": { + "entity_key": {"type": "binary"}, + "feature_name": {"type": "keyword"}, + "feature_value": {"type": "binary"}, + "timestamp": {"type": "date"}, + "created_ts": {"type": "date"}, + "vector_value": { + "type": "dense_vector", + "dims": config.online_store.vector_len, + "index": "true", + "similarity": config.online_store.similarity, + }, +} +``` +{% endcode %} +And the online_read API mapping is configured as: + +{% code title="online_read_mapping" %} +```json +"query": { + "bool": { + "must": [ + {"terms": {"entity_key": entity_keys}}, + {"terms": {"feature_name": requested_features}}, + ] + } +}, +``` +{% endcode %} + +And the similarity search API mapping is configured as: + +{% code title="similarity_search_mapping" %} +```json +{ + "field": "vector_value", + "query_vector": embedding_vector, + "k": top_k, +} +``` +{% endcode %} + +These APIs are subject to change in future versions of Feast to improve performance and usability. \ No newline at end of file diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 270ea45a26..2fe885865d 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1886,7 +1886,7 @@ def retrieve_online_documents( feature: str, query: Union[str, List[float]], top_k: int, - distance_metric: str, + distance_metric: Optional[str] = None, ) -> OnlineResponse: """ Retrieves the top k closest document features. Note, embeddings are a subset of features. @@ -1911,7 +1911,7 @@ def _retrieve_online_documents( feature: str, query: Union[str, List[float]], top_k: int, - distance_metric: str = "L2", + distance_metric: Optional[str] = None, ): if isinstance(query, str): raise ValueError( @@ -2209,7 +2209,7 @@ def _retrieve_from_online_store( requested_feature: str, query: List[float], top_k: int, - distance_metric: str, + distance_metric: Optional[str], ) -> List[Tuple[Timestamp, "FieldStatus.ValueType", Value, Value, Value]]: """ Search and return document features from the online document store. diff --git a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py new file mode 100644 index 0000000000..429327e651 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py @@ -0,0 +1,276 @@ +from __future__ import absolute_import + +import base64 +import json +import logging +from datetime import datetime +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple + +import pytz +from elasticsearch import Elasticsearch, helpers + +from feast import Entity, FeatureView, RepoConfig +from feast.infra.key_encoding_utils import get_list_val_str, serialize_entity_key +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel + + +class ElasticSearchOnlineStoreConfig(FeastConfigBaseModel): + """ + Configuration for the ElasticSearch online store. + NOTE: The class *must* end with the `OnlineStoreConfig` suffix. + """ + + type: str = "elasticsearch" + + host: Optional[str] = None + user: Optional[str] = None + password: Optional[str] = None + port: Optional[int] = None + index: Optional[str] = None + scheme: Optional[str] = "http" + + # The number of rows to write in a single batch + write_batch_size: Optional[int] = 40 + + # The length of the vector value + vector_len: Optional[int] = 512 + + # The vector similarity metric to use in KNN search + # more details: https://www.elastic.co/guide/en/elasticsearch/reference/current/dense-vector.html + similarity: Optional[str] = "cosine" + + +class ElasticSearchOnlineStore(OnlineStore): + _client: Optional[Elasticsearch] = None + + def _get_client(self, config: RepoConfig) -> Elasticsearch: + online_store_config = config.online_store + assert isinstance(online_store_config, ElasticSearchOnlineStoreConfig) + + user = online_store_config.user if online_store_config.user is not None else "" + password = ( + online_store_config.password + if online_store_config.password is not None + else "" + ) + + if self._client: + return self._client + else: + self._client = Elasticsearch( + hosts=[ + { + "host": online_store_config.host or "localhost", + "port": online_store_config.port or 9200, + "scheme": online_store_config.scheme or "http", + } + ], + basic_auth=(user, password), + ) + return self._client + + def _bulk_batch_actions(self, table: FeatureView, batch: List[Dict[str, Any]]): + for row in batch: + yield { + "_index": table.name, + "_id": f"{row['entity_key']}_{row['feature_name']}_{row['timestamp']}", + "_source": row, + } + + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + insert_values = [] + for entity_key, values, timestamp, created_ts in data: + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + encoded_entity_key = base64.b64encode(entity_key_bin).decode("utf-8") + timestamp = _to_naive_utc(timestamp) + if created_ts is not None: + created_ts = _to_naive_utc(created_ts) + for feature_name, value in values.items(): + encoded_value = base64.b64encode(value.SerializeToString()).decode( + "utf-8" + ) + vector_val = json.loads(get_list_val_str(value)) + insert_values.append( + { + "entity_key": encoded_entity_key, + "feature_name": feature_name, + "feature_value": encoded_value, + "timestamp": timestamp, + "created_ts": created_ts, + "vector_value": vector_val, + } + ) + + batch_size = config.online_store.write_batch_size + for i in range(0, len(insert_values), batch_size): + batch = insert_values[i : i + batch_size] + actions = self._bulk_batch_actions(table, batch) + helpers.bulk(self._get_client(config), actions) + + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + if not requested_features: + body = { + "_source": {"excludes": ["vector_value"]}, + "query": {"match": {"entity_key": entity_keys}}, + } + else: + body = { + "_source": {"excludes": ["vector_value"]}, + "query": { + "bool": { + "must": [ + {"terms": {"entity_key": entity_keys}}, + {"terms": {"feature_name": requested_features}}, + ] + } + }, + } + response = self._get_client(config).search(index=table.name, body=body) + results: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + for hit in response["hits"]["hits"]: + results.append( + ( + hit["_source"]["timestamp"], + {hit["_source"]["feature_name"]: hit["_source"]["feature_value"]}, + ) + ) + return results + + def create_index(self, config: RepoConfig, table: FeatureView): + """ + Create an index in ElasticSearch for the given table. + TODO: This method can be exposed to users to customize the indexing functionality. + Args: + config: Feast repo configuration object. + table: FeatureView table for which the index needs to be created. + """ + index_mapping = { + "properties": { + "entity_key": {"type": "binary"}, + "feature_name": {"type": "keyword"}, + "feature_value": {"type": "binary"}, + "timestamp": {"type": "date"}, + "created_ts": {"type": "date"}, + "vector_value": { + "type": "dense_vector", + "dims": config.online_store.vector_len, + "index": "true", + "similarity": config.online_store.similarity, + }, + } + } + self._get_client(config).indices.create( + index=table.name, mappings=index_mapping + ) + + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + # implement the update method + for table in tables_to_delete: + self._get_client(config).delete_by_query(index=table.name) + for table in tables_to_keep: + self.create_index(config, table) + + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ): + project = config.project + try: + for table in tables: + self._get_client(config).indices.delete(index=table.name) + except Exception as e: + logging.exception(f"Error deleting index in project {project}: {e}") + raise + + def retrieve_online_documents( + self, + config: RepoConfig, + table: FeatureView, + requested_feature: str, + embedding: List[float], + top_k: int, + *args, + **kwargs, + ) -> List[ + Tuple[ + Optional[datetime], + Optional[ValueProto], + Optional[ValueProto], + Optional[ValueProto], + ] + ]: + result: List[ + Tuple[ + Optional[datetime], + Optional[ValueProto], + Optional[ValueProto], + Optional[ValueProto], + ] + ] = [] + response = self._get_client(config).search( + index=table.name, + knn={ + "field": "vector_value", + "query_vector": embedding, + "k": top_k, + }, + ) + rows = response["hits"]["hits"][0:top_k] + for row in rows: + feature_value = row["_source"]["feature_value"] + vector_value = row["_source"]["vector_value"] + timestamp = row["_source"]["timestamp"] + distance = row["_score"] + timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f") + + feature_value_proto = ValueProto() + feature_value_proto.ParseFromString(base64.b64decode(feature_value)) + + vector_value_proto = ValueProto(string_val=str(vector_value)) + distance_value_proto = ValueProto(float_val=distance) + result.append( + ( + timestamp, + feature_value_proto, + vector_value_proto, + distance_value_proto, + ) + ) + return result + + +def _to_naive_utc(ts: datetime): + if ts.tzinfo is None: + return ts + else: + return ts.astimezone(pytz.utc).replace(tzinfo=None) diff --git a/sdk/python/feast/infra/online_stores/contrib/elasticsearch_repo_configuration.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch_repo_configuration.py new file mode 100644 index 0000000000..4d1f2c3ca1 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch_repo_configuration.py @@ -0,0 +1,13 @@ +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, +) +from tests.integration.feature_repos.universal.online_store.elasticsearch import ( + ElasticSearchOnlineStoreCreator, +) + +FULL_REPO_CONFIGS = [ + IntegrationTestRepoConfig( + online_store="elasticsearch", + online_store_creator=ElasticSearchOnlineStoreCreator, + ), +] diff --git a/sdk/python/feast/infra/online_stores/contrib/postgres.py b/sdk/python/feast/infra/online_stores/contrib/postgres.py index f2c32fdafd..1043208ab3 100644 --- a/sdk/python/feast/infra/online_stores/contrib/postgres.py +++ b/sdk/python/feast/infra/online_stores/contrib/postgres.py @@ -283,7 +283,7 @@ def retrieve_online_documents( requested_feature: str, embedding: List[float], top_k: int, - distance_metric: str = "L2", + distance_metric: Optional[str] = "L2", ) -> List[ Tuple[ Optional[datetime], diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index 7dd03a8417..05983a494c 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -167,6 +167,7 @@ def retrieve_online_documents( requested_feature: str, embedding: List[float], top_k: int, + distance_metric: Optional[str] = None, ) -> List[ Tuple[ Optional[datetime], @@ -179,6 +180,7 @@ def retrieve_online_documents( Retrieves online feature values for the specified embeddings. Args: + distance_metric: distance metric to use for retrieval. config: The config for the current feature store. table: The feature view whose feature values should be read. requested_feature: The name of the feature whose embeddings should be used for retrieval. diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 97c2820d41..48d2f8ef18 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -212,7 +212,7 @@ def retrieve_online_documents( requested_feature: str, query: List[float], top_k: int, - distance_metric: str, + distance_metric: Optional[str] = None, ) -> List: set_usage_attribute("provider", self.__class__.__name__) result = [] diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 68d36da17f..22f6088474 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -327,7 +327,7 @@ def retrieve_online_documents( requested_feature: str, query: List[float], top_k: int, - distance_metric: str = "L2", + distance_metric: Optional[str] = None, ) -> List[ Tuple[ Optional[datetime], @@ -340,6 +340,7 @@ def retrieve_online_documents( Searches for the top-k most similar documents in the online document store. Args: + distance_metric: distance metric to use for the search. config: The config for the current feature store. table: The feature view whose embeddings should be searched. requested_feature: the requested document feature name. diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 5e38fd1775..00cbac1908 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -64,6 +64,7 @@ "rockset": "feast.infra.online_stores.contrib.rockset_online_store.rockset.RocksetOnlineStore", "hazelcast": "feast.infra.online_stores.contrib.hazelcast_online_store.hazelcast_online_store.HazelcastOnlineStore", "ikv": "feast.infra.online_stores.contrib.ikv_online_store.ikv.IKVOnlineStore", + "elasticsearch": "feast.infra.online_stores.contrib.elasticsearch.ElasticSearchOnlineStore", } OFFLINE_STORE_CLASS_FOR_TYPE = { diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index eb7fe5d6ac..3b9146a7b9 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -120,7 +120,7 @@ def retrieve_online_documents( requested_feature: str, query: List[float], top_k: int, - distance_metric: str, + distance_metric: Optional[str] = None, ) -> List[ Tuple[ Optional[datetime], diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py b/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py new file mode 100644 index 0000000000..c62a9009ca --- /dev/null +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py @@ -0,0 +1,28 @@ +from typing import Dict + +from testcontainers.elasticsearch import ElasticSearchContainer + +from tests.integration.feature_repos.universal.online_store_creator import ( + OnlineStoreCreator, +) + + +class ElasticSearchOnlineStoreCreator(OnlineStoreCreator): + def __init__(self, project_name: str, **kwargs): + super().__init__(project_name) + self.container = ElasticSearchContainer( + "elasticsearch:8.3.3", + ).with_exposed_ports(9200) + + def create_online_store(self) -> Dict[str, str]: + self.container.start() + return { + "host": "localhost", + "type": "elasticsearch", + "port": self.container.get_exposed_port(9200), + "vector_len": 2, + "similarity": "cosine", + } + + def teardown(self): + self.container.stop() diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 5d6462e5e3..9beba4d72b 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -789,7 +789,7 @@ def assert_feature_service_entity_mapping_correctness( @pytest.mark.integration -@pytest.mark.universal_online_stores(only=["pgvector"]) +@pytest.mark.universal_online_stores(only=["pgvector", "elasticsearch"]) def test_retrieve_online_documents(environment, fake_document_data): fs = environment.feature_store df, data_source = fake_document_data diff --git a/setup.py b/setup.py index 6cc728ee98..9181e64c2f 100644 --- a/setup.py +++ b/setup.py @@ -150,6 +150,8 @@ DELTA_REQUIRED = ["deltalake"] +ELASTICSEARCH_REQUIRED = ["elasticsearch>=8.13.0"] + CI_REQUIRED = ( [ "build", @@ -211,6 +213,7 @@ + GRPCIO_REQUIRED + DUCKDB_REQUIRED + DELTA_REQUIRED + + ELASTICSEARCH_REQUIRED ) DOCS_REQUIRED = CI_REQUIRED @@ -377,6 +380,7 @@ def run(self): "duckdb": DUCKDB_REQUIRED, "ikv": IKV_REQUIRED, "delta": DELTA_REQUIRED, + "elasticsearch": ELASTICSEARCH_REQUIRED, }, include_package_data=True, license="Apache",