diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index e57b084b81..80ae5053bf 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -69,6 +69,13 @@ def __init__(self, offline_store_name: str, data_source_name: str): ) +class FeastOnlineStoreUnsupportedDataSource(Exception): + def __init__(self, online_store_name: str, data_source_name: str): + super().__init__( + f"Online Store '{online_store_name}' does not support data source '{data_source_name}'" + ) + + class FeastEntityDFMissingColumnsError(Exception): def __init__(self, expected, missing): super().__init__( diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 5e5a1f5ec1..8503632d5c 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -541,7 +541,7 @@ def get_online_features( table, union_of_entity_keys, entity_name_to_join_key_map ) read_rows = provider.online_read( - project=self.project, + config=self.config, table=table, entity_keys=entity_keys, requested_features=requested_features, diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index dce073e1d3..f33b501d62 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -1,18 +1,14 @@ -import itertools from datetime import datetime -from multiprocessing.pool import ThreadPool -from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union -import mmh3 import pandas from tqdm import tqdm -from feast import FeatureTable, utils +from feast import FeatureTable from feast.entity import Entity -from feast.errors import FeastProviderLoginError from feast.feature_view import FeatureView -from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.offline_stores.helpers import get_offline_store_from_config +from feast.infra.online_stores.helpers import get_online_store_from_config from feast.infra.provider import ( Provider, RetrievalJob, @@ -23,15 +19,7 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.registry import Registry -from feast.repo_config import DatastoreOnlineStoreConfig, RepoConfig - -try: - from google.auth.exceptions import DefaultCredentialsError - from google.cloud import datastore -except ImportError as e: - from feast.errors import FeastExtrasDependencyImportError - - raise FeastExtrasDependencyImportError("gcp", str(e)) +from feast.repo_config import RepoConfig class GcpProvider(Provider): @@ -39,26 +27,9 @@ class GcpProvider(Provider): _namespace: Optional[str] def __init__(self, config: RepoConfig): - assert isinstance(config.online_store, DatastoreOnlineStoreConfig) - self._gcp_project_id = config.online_store.project_id - self._namespace = config.online_store.namespace - self._write_concurrency = config.online_store.write_concurrency - self._write_batch_size = config.online_store.write_batch_size - - assert config.offline_store is not None + self.repo_config = config self.offline_store = get_offline_store_from_config(config.offline_store) - - def _initialize_client(self): - try: - return datastore.Client( - project=self._gcp_project_id, namespace=self._namespace - ) - except DefaultCredentialsError as e: - raise FeastProviderLoginError( - str(e) - + '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your ' - "local Google Cloud account " - ) + self.online_store = get_online_store_from_config(config.online_store) def update_infra( self, @@ -69,25 +40,14 @@ def update_infra( entities_to_keep: Sequence[Entity], partial: bool, ): - - client = self._initialize_client() - - for table in tables_to_keep: - key = client.key("Project", project, "Table", table.name) - entity = datastore.Entity( - key=key, exclude_from_indexes=("created_ts", "event_ts", "values") - ) - entity.update({"created_ts": datetime.utcnow()}) - client.put(entity) - - for table in tables_to_delete: - _delete_all_values( - client, client.key("Project", project, "Table", table.name) - ) - - # Delete the table metadata datastore entity - key = client.key("Project", project, "Table", table.name) - client.delete(key) + self.online_store.update( + config=self.repo_config, + tables_to_delete=tables_to_delete, + tables_to_keep=tables_to_keep, + entities_to_keep=entities_to_keep, + entities_to_delete=entities_to_delete, + partial=partial, + ) def teardown_infra( self, @@ -95,59 +55,28 @@ def teardown_infra( tables: Sequence[Union[FeatureTable, FeatureView]], entities: Sequence[Entity], ) -> None: - client = self._initialize_client() - - for table in tables: - _delete_all_values( - client, client.key("Project", project, "Table", table.name) - ) - - # Delete the table metadata datastore entity - key = client.key("Project", project, "Table", table.name) - client.delete(key) + self.online_store.teardown(self.repo_config, tables, entities) def online_write_batch( self, - project: str, + config: RepoConfig, table: Union[FeatureTable, FeatureView], data: List[ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] ], progress: Optional[Callable[[int], Any]], ) -> None: - client = self._initialize_client() - - pool = ThreadPool(processes=self._write_concurrency) - pool.map( - lambda b: _write_minibatch(client, project, table, b, progress), - _to_minibatches(data, batch_size=self._write_batch_size), - ) + self.online_store.online_write_batch(config, table, data, progress) def online_read( self, - project: str, + config: RepoConfig, table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], requested_features: List[str] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: - client = self._initialize_client() + result = self.online_store.online_read(config, table, entity_keys) - result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] - for entity_key in entity_keys: - document_id = compute_datastore_entity_id(entity_key) - key = client.key( - "Project", project, "Table", table.name, "Row", document_id - ) - value = client.get(key) - if value is not None: - res = {} - for feature_name, value_bin in value["values"].items(): - val = ValueProto() - val.ParseFromString(value_bin) - res[feature_name] = val - result.append((value["event_ts"], res)) - else: - result.append((None, None)) return result def materialize_single_feature_view( @@ -188,7 +117,7 @@ def materialize_single_feature_view( with tqdm_builder(len(rows_to_write)) as pbar: self.online_write_batch( - project, feature_view, rows_to_write, lambda x: pbar.update(x) + self.repo_config, feature_view, rows_to_write, lambda x: pbar.update(x) ) def get_historical_features( @@ -209,84 +138,3 @@ def get_historical_features( project=project, ) return job - - -ProtoBatch = Sequence[ - Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] -] - - -def _to_minibatches(data: ProtoBatch, batch_size) -> Iterator[ProtoBatch]: - """ - Split data into minibatches, making sure we stay under GCP datastore transaction size - limits. - """ - iterable = iter(data) - - while True: - batch = list(itertools.islice(iterable, batch_size)) - if len(batch) > 0: - yield batch - else: - break - - -def _write_minibatch( - client, - project: str, - table: Union[FeatureTable, FeatureView], - data: Sequence[ - Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] - ], - progress: Optional[Callable[[int], Any]], -): - entities = [] - for entity_key, features, timestamp, created_ts in data: - document_id = compute_datastore_entity_id(entity_key) - - key = client.key("Project", project, "Table", table.name, "Row", document_id,) - - entity = datastore.Entity( - key=key, exclude_from_indexes=("created_ts", "event_ts", "values") - ) - - entity.update( - dict( - key=entity_key.SerializeToString(), - values={k: v.SerializeToString() for k, v in features.items()}, - event_ts=utils.make_tzaware(timestamp), - created_ts=( - utils.make_tzaware(created_ts) if created_ts is not None else None - ), - ) - ) - entities.append(entity) - with client.transaction(): - client.put_multi(entities) - - if progress: - progress(len(entities)) - - -def _delete_all_values(client, key) -> None: - """ - Delete all data under the key path in datastore. - """ - while True: - query = client.query(kind="Row", ancestor=key) - entities = list(query.fetch(limit=1000)) - if not entities: - return - - for entity in entities: - client.delete(entity.key) - - -def compute_datastore_entity_id(entity_key: EntityKeyProto) -> str: - """ - Compute Datastore Entity id given Feast Entity Key. - - Remember that Datastore Entity is a concept from the Datastore data model, that has nothing to - do with the Entity concept we have in Feast. - """ - return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex() diff --git a/sdk/python/feast/infra/local.py b/sdk/python/feast/infra/local.py index 3827e6aea6..a76f49b2c4 100644 --- a/sdk/python/feast/infra/local.py +++ b/sdk/python/feast/infra/local.py @@ -1,7 +1,4 @@ -import os -import sqlite3 from datetime import datetime -from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union import pandas as pd @@ -11,8 +8,8 @@ from feast import FeatureTable from feast.entity import Entity from feast.feature_view import FeatureView -from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.offline_stores.helpers import get_offline_store_from_config +from feast.infra.online_stores.helpers import get_online_store_from_config from feast.infra.provider import ( Provider, RetrievalJob, @@ -23,28 +20,15 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.registry import Registry -from feast.repo_config import RepoConfig, SqliteOnlineStoreConfig +from feast.repo_config import RepoConfig class LocalProvider(Provider): - _db_path: Path - - def __init__(self, config: RepoConfig, repo_path: Path): + def __init__(self, config: RepoConfig): assert config is not None - assert isinstance(config.online_store, SqliteOnlineStoreConfig) - assert config.offline_store is not None - local_path = Path(config.online_store.path) - if local_path.is_absolute(): - self._db_path = local_path - else: - self._db_path = repo_path.joinpath(local_path) + self.config = config self.offline_store = get_offline_store_from_config(config.offline_store) - - def _get_conn(self): - Path(self._db_path).parent.mkdir(exist_ok=True) - return sqlite3.connect( - self._db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES - ) + self.online_store = get_online_store_from_config(config.online_store) def update_infra( self, @@ -55,17 +39,14 @@ def update_infra( entities_to_keep: Sequence[Entity], partial: bool, ): - conn = self._get_conn() - for table in tables_to_keep: - conn.execute( - f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key BLOB, feature_name TEXT, value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))" - ) - conn.execute( - f"CREATE INDEX IF NOT EXISTS {_table_id(project, table)}_ek ON {_table_id(project, table)} (entity_key);" - ) - - for table in tables_to_delete: - conn.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}") + self.online_store.update( + self.config, + tables_to_delete, + tables_to_keep, + entities_to_delete, + entities_to_keep, + partial, + ) def teardown_infra( self, @@ -73,92 +54,28 @@ def teardown_infra( tables: Sequence[Union[FeatureTable, FeatureView]], entities: Sequence[Entity], ) -> None: - os.unlink(self._db_path) + self.online_store.teardown(self.config, tables, entities) def online_write_batch( self, - project: str, + config: RepoConfig, table: Union[FeatureTable, FeatureView], data: List[ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] ], progress: Optional[Callable[[int], Any]], ) -> None: - conn = self._get_conn() - - with conn: - for entity_key, values, timestamp, created_ts in data: - entity_key_bin = serialize_entity_key(entity_key) - timestamp = _to_naive_utc(timestamp) - if created_ts is not None: - created_ts = _to_naive_utc(created_ts) - - for feature_name, val in values.items(): - conn.execute( - f""" - UPDATE {_table_id(project, table)} - SET value = ?, event_ts = ?, created_ts = ? - WHERE (entity_key = ? AND feature_name = ?) - """, - ( - # SET - val.SerializeToString(), - timestamp, - created_ts, - # WHERE - entity_key_bin, - feature_name, - ), - ) - - conn.execute( - f"""INSERT OR IGNORE INTO {_table_id(project, table)} - (entity_key, feature_name, value, event_ts, created_ts) - VALUES (?, ?, ?, ?, ?)""", - ( - entity_key_bin, - feature_name, - val.SerializeToString(), - timestamp, - created_ts, - ), - ) - if progress: - progress(1) + self.online_store.online_write_batch(config, table, data, progress) def online_read( self, - project: str, + config: RepoConfig, table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], requested_features: List[str] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + result = self.online_store.online_read(config, table, entity_keys) - conn = self._get_conn() - cur = conn.cursor() - - result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] - - for entity_key in entity_keys: - entity_key_bin = serialize_entity_key(entity_key) - - cur.execute( - f"SELECT feature_name, value, event_ts FROM {_table_id(project, table)} WHERE entity_key = ?", - (entity_key_bin,), - ) - - res = {} - res_ts = None - for feature_name, val_bin, ts in cur.fetchall(): - val = ValueProto() - val.ParseFromString(val_bin) - res[feature_name] = val - res_ts = ts - - if not res: - result.append((None, None)) - else: - result.append((res_ts, res)) return result def materialize_single_feature_view( @@ -199,7 +116,7 @@ def materialize_single_feature_view( with tqdm_builder(len(rows_to_write)) as pbar: self.online_write_batch( - project, feature_view, rows_to_write, lambda x: pbar.update(x) + self.config, feature_view, rows_to_write, lambda x: pbar.update(x) ) def get_historical_features( diff --git a/sdk/python/feast/infra/online_stores/__init__.py b/sdk/python/feast/infra/online_stores/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py new file mode 100644 index 0000000000..1f2c5abac4 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -0,0 +1,256 @@ +# Copyright 2021 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import itertools +from datetime import datetime +from multiprocessing.pool import ThreadPool +from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union + +import mmh3 + +from feast import Entity, FeatureTable, utils +from feast.feature_view import FeatureView +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import DatastoreOnlineStoreConfig, RepoConfig + +try: + from google.auth.exceptions import DefaultCredentialsError + from google.cloud import datastore +except ImportError as e: + from feast.errors import FeastExtrasDependencyImportError, FeastProviderLoginError + + raise FeastExtrasDependencyImportError("gcp", str(e)) + + +ProtoBatch = Sequence[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] +] + + +class DatastoreOnlineStore(OnlineStore): + """ + OnlineStore is an object used for all interaction between Feast and the service used for offline storage of + features. + """ + + _client: Optional[datastore.Client] = None + + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], + tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + """ + """ + online_config = config.online_store + assert isinstance(online_config, DatastoreOnlineStoreConfig) + client = self._get_client(online_config) + feast_project = config.project + + for table in tables_to_keep: + key = client.key("Project", feast_project, "Table", table.name) + entity = datastore.Entity( + key=key, exclude_from_indexes=("created_ts", "event_ts", "values") + ) + entity.update({"created_ts": datetime.utcnow()}) + client.put(entity) + + for table in tables_to_delete: + _delete_all_values( + client, client.key("Project", feast_project, "Table", table.name) + ) + + # Delete the table metadata datastore entity + key = client.key("Project", feast_project, "Table", table.name) + client.delete(key) + + def teardown( + self, + config: RepoConfig, + tables: Sequence[Union[FeatureTable, FeatureView]], + entities: Sequence[Entity], + ): + """ + There's currently no teardown done for Datastore. + """ + online_config = config.online_store + assert isinstance(online_config, DatastoreOnlineStoreConfig) + client = self._get_client(online_config) + feast_project = config.project + + for table in tables: + _delete_all_values( + client, client.key("Project", feast_project, "Table", table.name) + ) + + # Delete the table metadata datastore entity + key = client.key("Project", feast_project, "Table", table.name) + client.delete(key) + + def _get_client(self, online_config: DatastoreOnlineStoreConfig): + + if not self._client: + try: + self._client = datastore.Client( + project=online_config.project_id, namespace=online_config.namespace, + ) + except DefaultCredentialsError as e: + raise FeastProviderLoginError( + str(e) + + '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your ' + "local Google Cloud account " + ) + return self._client + + def online_write_batch( + self, + config: RepoConfig, + table: Union[FeatureTable, FeatureView], + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + + online_config = config.online_store + assert isinstance(online_config, DatastoreOnlineStoreConfig) + client = self._get_client(online_config) + + write_concurrency = online_config.write_concurrency + write_batch_size = online_config.write_batch_size + feast_project = config.project + + pool = ThreadPool(processes=write_concurrency) + pool.map( + lambda b: self._write_minibatch(client, feast_project, table, b, progress), + self._to_minibatches(data, batch_size=write_batch_size), + ) + + @staticmethod + def _to_minibatches(data: ProtoBatch, batch_size) -> Iterator[ProtoBatch]: + """ + Split data into minibatches, making sure we stay under GCP datastore transaction size + limits. + """ + iterable = iter(data) + + while True: + batch = list(itertools.islice(iterable, batch_size)) + if len(batch) > 0: + yield batch + else: + break + + @staticmethod + def _write_minibatch( + client, + project: str, + table: Union[FeatureTable, FeatureView], + data: Sequence[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ): + entities = [] + for entity_key, features, timestamp, created_ts in data: + document_id = compute_datastore_entity_id(entity_key) + + key = client.key( + "Project", project, "Table", table.name, "Row", document_id, + ) + + entity = datastore.Entity( + key=key, exclude_from_indexes=("created_ts", "event_ts", "values") + ) + + entity.update( + dict( + key=entity_key.SerializeToString(), + values={k: v.SerializeToString() for k, v in features.items()}, + event_ts=utils.make_tzaware(timestamp), + created_ts=( + utils.make_tzaware(created_ts) + if created_ts is not None + else None + ), + ) + ) + entities.append(entity) + with client.transaction(): + client.put_multi(entities) + + if progress: + progress(len(entities)) + + def online_read( + self, + config: RepoConfig, + table: Union[FeatureTable, FeatureView], + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + + online_config = config.online_store + assert isinstance(online_config, DatastoreOnlineStoreConfig) + client = self._get_client(online_config) + + feast_project = config.project + + result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + for entity_key in entity_keys: + document_id = compute_datastore_entity_id(entity_key) + key = client.key( + "Project", feast_project, "Table", table.name, "Row", document_id + ) + value = client.get(key) + if value is not None: + res = {} + for feature_name, value_bin in value["values"].items(): + val = ValueProto() + val.ParseFromString(value_bin) + res[feature_name] = val + result.append((value["event_ts"], res)) + else: + result.append((None, None)) + return result + + +def compute_datastore_entity_id(entity_key: EntityKeyProto) -> str: + """ + Compute Datastore Entity id given Feast Entity Key. + + Remember that Datastore Entity is a concept from the Datastore data model, that has nothing to + do with the Entity concept we have in Feast. + """ + return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex() + + +def _delete_all_values(client, key) -> None: + """ + Delete all data under the key path in datastore. + """ + while True: + query = client.query(kind="Row", ancestor=key) + entities = list(query.fetch(limit=1000)) + if not entities: + return + + for entity in entities: + client.delete(entity.key) diff --git a/sdk/python/feast/infra/online_stores/helpers.py b/sdk/python/feast/infra/online_stores/helpers.py new file mode 100644 index 0000000000..391794d20e --- /dev/null +++ b/sdk/python/feast/infra/online_stores/helpers.py @@ -0,0 +1,81 @@ +import struct +from typing import Any, Dict, Set + +import mmh3 + +from feast.data_source import BigQuerySource, DataSource, FileSource +from feast.errors import FeastOnlineStoreUnsupportedDataSource +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.storage.Redis_pb2 import RedisKeyV2 as RedisKeyProto +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.repo_config import ( + DatastoreOnlineStoreConfig, + OnlineStoreConfig, + RedisOnlineStoreConfig, + SqliteOnlineStoreConfig, +) + + +def get_online_store_from_config( + online_store_config: OnlineStoreConfig, +) -> OnlineStore: + """Get the offline store from offline store config""" + + if isinstance(online_store_config, SqliteOnlineStoreConfig): + from feast.infra.online_stores.sqlite import SqliteOnlineStore + + return SqliteOnlineStore() + elif isinstance(online_store_config, DatastoreOnlineStoreConfig): + from feast.infra.online_stores.datastore import DatastoreOnlineStore + + return DatastoreOnlineStore() + elif isinstance(online_store_config, RedisOnlineStoreConfig): + from feast.infra.online_stores.redis import RedisOnlineStore + + return RedisOnlineStore() + raise ValueError(f"Unsupported offline store config '{online_store_config}'") + + +SUPPORTED_SOURCES: Dict[Any, Set[Any]] = { + SqliteOnlineStoreConfig: {FileSource}, + DatastoreOnlineStoreConfig: {BigQuerySource}, + RedisOnlineStoreConfig: {FileSource, BigQuerySource}, +} + + +def assert_online_store_supports_data_source( + online_store_config: OnlineStoreConfig, data_source: DataSource +): + supported_sources: Set[Any] = SUPPORTED_SOURCES.get( + online_store_config.__class__, set() + ) + # This is needed because checking for `in` with Union types breaks mypy. + # https://github.com/python/mypy/issues/4954 + # We can replace this with `data_source.__class__ in SUPPORTED_SOURCES[online_store_config.__class__]` + # Once ^ is resolved. + if supported_sources: + for source in supported_sources: + if source == data_source.__class__: + return + raise FeastOnlineStoreUnsupportedDataSource( + online_store_config.type, data_source.__class__.__name__ + ) + + +def _redis_key(project: str, entity_key: EntityKeyProto): + redis_key = RedisKeyProto( + project=project, + entity_names=entity_key.join_keys, + entity_values=entity_key.entity_values, + ) + return redis_key.SerializeToString() + + +def _mmh3(key: str): + """ + Calculate murmur3_32 hash which is equal to scala version which is using little endian: + https://stackoverflow.com/questions/29932956/murmur3-hash-different-result-between-python-and-java-implementation + https://stackoverflow.com/questions/13141787/convert-decimal-int-to-little-endian-string-x-x + """ + key_hash = mmh3.hash(key, signed=False) + return bytes.fromhex(struct.pack(" None: + """ + Write a batch of feature rows to the online store. This is a low level interface, not + expected to be used by the users directly. + + If a tz-naive timestamp is passed to this method, it should be assumed to be UTC by implementors. + + Args: + config: The RepoConfig for the current FeatureStore. + table: Feast FeatureTable or FeatureView + data: a list of quadruplets containing Feature data. Each quadruplet contains an Entity Key, + a dict containing feature values, an event timestamp for the row, and + the created timestamp for the row if it exists. + progress: Optional function to be called once every mini-batch of rows is written to + the online store. Can be used to display progress. + """ + ... + + @abstractmethod + def online_read( + self, + config: RepoConfig, + table: Union[FeatureTable, FeatureView], + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + """ + Read feature values given an Entity Key. This is a low level interface, not + expected to be used by the users directly. + + Args: + config: The RepoConfig for the current FeatureStore. + table: Feast FeatureTable or FeatureView + entity_keys: a list of entity keys that should be read from the FeatureStore. + requested_features: (Optional) A subset of the features that should be read from the FeatureStore. + Returns: + Data is returned as a list, one item per entity key. Each item in the list is a tuple + of event_ts for the row, and the feature data as a dict from feature names to values. + Values are returned as Value proto message. + """ + ... + + @abstractmethod + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], + tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + ... + + @abstractmethod + def teardown( + self, + config: RepoConfig, + tables: Sequence[Union[FeatureTable, FeatureView]], + entities: Sequence[Entity], + ): + ... diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py new file mode 100644 index 0000000000..abab030dce --- /dev/null +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -0,0 +1,194 @@ +# Copyright 2021 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json +from datetime import datetime +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union + +from google.protobuf.timestamp_pb2 import Timestamp + +from feast import Entity, FeatureTable, FeatureView, RepoConfig, utils +from feast.infra.online_stores.helpers import _mmh3, _redis_key +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import RedisOnlineStoreConfig, RedisType + +try: + from redis import Redis + from rediscluster import RedisCluster +except ImportError as e: + from feast.errors import FeastExtrasDependencyImportError + + raise FeastExtrasDependencyImportError("redis", str(e)) + +EX_SECONDS = 253402300799 + + +class RedisOnlineStore(OnlineStore): + _client: Optional[Union[Redis, RedisCluster]] = None + + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], + tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + """ + There's currently no setup done for Redis. + """ + pass + + def teardown( + self, + config: RepoConfig, + tables: Sequence[Union[FeatureTable, FeatureView]], + entities: Sequence[Entity], + ): + """ + There's currently no teardown done for Redis. + """ + pass + + @staticmethod + def _parse_connection_string(connection_string: str): + """ + Reads Redis connections string using format + for RedisCluster: + redis1:6379,redis2:6379,decode_responses=true,skip_full_coverage_check=true,ssl=true,password=... + for Redis: + redis_master:6379,db=0,ssl=true,password=... + """ + startup_nodes = [ + dict(zip(["host", "port"], c.split(":"))) + for c in connection_string.split(",") + if "=" not in c + ] + params = {} + for c in connection_string.split(","): + if "=" in c: + kv = c.split("=") + try: + kv[1] = json.loads(kv[1]) + except json.JSONDecodeError: + ... + + it = iter(kv) + params.update(dict(zip(it, it))) + + return startup_nodes, params + + def _get_client(self, online_store_config: RedisOnlineStoreConfig): + """ + Creates the Redis client RedisCluster or Redis depending on configuration + """ + if not self._client: + startup_nodes, kwargs = self._parse_connection_string( + online_store_config.connection_string + ) + print(f"Startup nodes: {startup_nodes}, {kwargs}") + if online_store_config.type == RedisType.redis_cluster: + kwargs["startup_nodes"] = startup_nodes + self._client = RedisCluster(**kwargs) + else: + kwargs["host"] = startup_nodes[0]["host"] + kwargs["port"] = startup_nodes[0]["port"] + self._client = Redis(**kwargs) + return self._client + + def online_write_batch( + self, + config: RepoConfig, + table: Union[FeatureTable, FeatureView], + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + online_store_config = config.online_store + assert isinstance(online_store_config, RedisOnlineStoreConfig) + + client = self._get_client(online_store_config) + project = config.project + + entity_hset = {} + feature_view = table.name + + ex = Timestamp() + ex.seconds = EX_SECONDS + ex_str = ex.SerializeToString() + + for entity_key, values, timestamp, created_ts in data: + redis_key_bin = _redis_key(project, entity_key) + ts = Timestamp() + ts.seconds = int(utils.make_tzaware(timestamp).timestamp()) + entity_hset[f"_ts:{feature_view}"] = ts.SerializeToString() + entity_hset[f"_ex:{feature_view}"] = ex_str + + for feature_name, val in values.items(): + f_key = _mmh3(f"{feature_view}:{feature_name}") + entity_hset[f_key] = val.SerializeToString() + + client.hset(redis_key_bin, mapping=entity_hset) + if progress: + progress(1) + + def online_read( + self, + config: RepoConfig, + table: Union[FeatureTable, FeatureView], + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + online_store_config = config.online_store + assert isinstance(online_store_config, RedisOnlineStoreConfig) + + client = self._get_client(online_store_config) + feature_view = table.name + project = config.project + + result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + + if not requested_features: + requested_features = [f.name for f in table.features] + + for entity_key in entity_keys: + redis_key_bin = _redis_key(project, entity_key) + hset_keys = [_mmh3(f"{feature_view}:{k}") for k in requested_features] + ts_key = f"_ts:{feature_view}" + hset_keys.append(ts_key) + values = client.hmget(redis_key_bin, hset_keys) + requested_features.append(ts_key) + res_val = dict(zip(requested_features, values)) + + res_ts = Timestamp() + ts_val = res_val.pop(ts_key) + if ts_val: + res_ts.ParseFromString(ts_val) + + res = {} + for feature_name, val_bin in res_val.items(): + val = ValueProto() + if val_bin: + val.ParseFromString(val_bin) + res[feature_name] = val + + if not res: + result.append((None, None)) + else: + timestamp = datetime.fromtimestamp(res_ts.seconds) + result.append((timestamp, res)) + return result diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py new file mode 100644 index 0000000000..9c2e5cd251 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -0,0 +1,188 @@ +# Copyright 2021 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sqlite3 +from datetime import datetime +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union + +import pytz + +from feast import Entity, FeatureTable +from feast.feature_view import FeatureView +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import RepoConfig + + +class SqliteOnlineStore(OnlineStore): + """ + OnlineStore is an object used for all interaction between Feast and the service used for offline storage of + features. + """ + + _conn: Optional[sqlite3.Connection] = None + + @staticmethod + def _get_db_path(config: RepoConfig) -> str: + assert config.online_store.type == "sqlite" + + if config.repo_path and not Path(config.online_store.path).is_absolute(): + db_path = str(config.repo_path / config.online_store.path) + else: + db_path = config.online_store.path + return db_path + + def _get_conn(self, config: RepoConfig): + if not self._conn: + db_path = self._get_db_path(config) + Path(db_path).parent.mkdir(exist_ok=True) + self._conn = sqlite3.connect( + db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES, + ) + return self._conn + + def online_write_batch( + self, + config: RepoConfig, + table: Union[FeatureTable, FeatureView], + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + conn = self._get_conn(config) + + project = config.project + + with conn: + for entity_key, values, timestamp, created_ts in data: + entity_key_bin = serialize_entity_key(entity_key) + timestamp = _to_naive_utc(timestamp) + if created_ts is not None: + created_ts = _to_naive_utc(created_ts) + + for feature_name, val in values.items(): + conn.execute( + f""" + UPDATE {_table_id(project, table)} + SET value = ?, event_ts = ?, created_ts = ? + WHERE (entity_key = ? AND feature_name = ?) + """, + ( + # SET + val.SerializeToString(), + timestamp, + created_ts, + # WHERE + entity_key_bin, + feature_name, + ), + ) + + conn.execute( + f"""INSERT OR IGNORE INTO {_table_id(project, table)} + (entity_key, feature_name, value, event_ts, created_ts) + VALUES (?, ?, ?, ?, ?)""", + ( + entity_key_bin, + feature_name, + val.SerializeToString(), + timestamp, + created_ts, + ), + ) + if progress: + progress(1) + + def online_read( + self, + config: RepoConfig, + table: Union[FeatureTable, FeatureView], + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + pass + conn = self._get_conn(config) + cur = conn.cursor() + + result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + + project = config.project + for entity_key in entity_keys: + entity_key_bin = serialize_entity_key(entity_key) + + cur.execute( + f"SELECT feature_name, value, event_ts FROM {_table_id(project, table)} WHERE entity_key = ?", + (entity_key_bin,), + ) + + res = {} + res_ts = None + for feature_name, val_bin, ts in cur.fetchall(): + val = ValueProto() + val.ParseFromString(val_bin) + res[feature_name] = val + res_ts = ts + + if not res: + result.append((None, None)) + else: + result.append((res_ts, res)) + return result + + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], + tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + conn = self._get_conn(config) + project = config.project + + for table in tables_to_keep: + conn.execute( + f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key BLOB, feature_name TEXT, value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))" + ) + conn.execute( + f"CREATE INDEX IF NOT EXISTS {_table_id(project, table)}_ek ON {_table_id(project, table)} (entity_key);" + ) + + for table in tables_to_delete: + conn.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}") + + def teardown( + self, + config: RepoConfig, + tables: Sequence[Union[FeatureTable, FeatureView]], + entities: Sequence[Entity], + ): + os.unlink(self._get_db_path(config)) + + +def _table_id(project: str, table: Union[FeatureTable, FeatureView]) -> str: + return f"{project}_{table.name}" + + +def _to_naive_utc(ts: datetime): + if ts.tzinfo is None: + return ts + else: + return ts.astimezone(pytz.utc).replace(tzinfo=None) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 6a378d0759..5d4f8d6cf0 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -71,7 +71,7 @@ def teardown_infra( @abc.abstractmethod def online_write_batch( self, - project: str, + config: RepoConfig, table: Union[FeatureTable, FeatureView], data: List[ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] @@ -85,7 +85,7 @@ def online_write_batch( If a tz-naive timestamp is passed to this method, it is assumed to be UTC. Args: - project: Feast project name + config: The RepoConfig for the current FeatureStore. table: Feast FeatureTable data: a list of quadruplets containing Feature data. Each quadruplet contains an Entity Key, a dict containing feature values, an event timestamp for the row, and @@ -122,7 +122,7 @@ def get_historical_features( @abc.abstractmethod def online_read( self, - project: str, + config: RepoConfig, table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], requested_features: List[str] = None, @@ -145,14 +145,10 @@ def get_provider(config: RepoConfig, repo_path: Path) -> Provider: from feast.infra.gcp import GcpProvider return GcpProvider(config) - elif config.provider == "redis": - from feast.infra.redis import RedisProvider - - return RedisProvider(config) elif config.provider == "local": from feast.infra.local import LocalProvider - return LocalProvider(config, repo_path) + return LocalProvider(config) else: raise errors.FeastProviderNotImplementedError(config.provider) else: diff --git a/sdk/python/feast/infra/redis.py b/sdk/python/feast/infra/redis.py deleted file mode 100644 index f4200918b3..0000000000 --- a/sdk/python/feast/infra/redis.py +++ /dev/null @@ -1,281 +0,0 @@ -import json -import struct -from datetime import datetime -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union - -import mmh3 -import pandas as pd -from google.protobuf.timestamp_pb2 import Timestamp - -try: - from redis import Redis - from rediscluster import RedisCluster -except ImportError as e: - from feast.errors import FeastExtrasDependencyImportError - - raise FeastExtrasDependencyImportError("redis", str(e)) - -from tqdm import tqdm - -from feast import FeatureTable, utils -from feast.entity import Entity -from feast.feature_view import FeatureView -from feast.infra.offline_stores.helpers import get_offline_store_from_config -from feast.infra.provider import ( - Provider, - RetrievalJob, - _convert_arrow_to_proto, - _get_column_names, - _run_field_mapping, -) -from feast.protos.feast.storage.Redis_pb2 import RedisKeyV2 as RedisKeyProto -from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto -from feast.protos.feast.types.Value_pb2 import Value as ValueProto -from feast.registry import Registry -from feast.repo_config import RedisOnlineStoreConfig, RedisType, RepoConfig - -EX_SECONDS = 253402300799 - - -class RedisProvider(Provider): - _redis_type: Optional[RedisType] - _connection_string: str - - def __init__(self, config: RepoConfig): - assert isinstance(config.online_store, RedisOnlineStoreConfig) - if config.online_store.redis_type: - self._redis_type = config.online_store.redis_type - if config.online_store.connection_string: - self._connection_string = config.online_store.connection_string - self.offline_store = get_offline_store_from_config(config.offline_store) - - def update_infra( - self, - project: str, - tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], - tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], - entities_to_delete: Sequence[Entity], - entities_to_keep: Sequence[Entity], - partial: bool, - ): - pass - - def teardown_infra( - self, - project: str, - tables: Sequence[Union[FeatureTable, FeatureView]], - entities: Sequence[Entity], - ) -> None: - # according to the repos_operations.py we can delete the whole project - client = self._get_client() - - tables_join_keys = [[e for e in t.entities] for t in tables] - for table_join_keys in tables_join_keys: - redis_key_bin = _redis_key( - project, EntityKeyProto(join_keys=table_join_keys) - ) - keys = [k for k in client.scan_iter(match=f"{redis_key_bin}*", count=100)] - if keys: - client.unlink(*keys) - - def online_write_batch( - self, - project: str, - table: Union[FeatureTable, FeatureView], - data: List[ - Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] - ], - progress: Optional[Callable[[int], Any]], - ) -> None: - client = self._get_client() - - entity_hset = {} - feature_view = table.name - - ex = Timestamp() - ex.seconds = EX_SECONDS - ex_str = ex.SerializeToString() - - for entity_key, values, timestamp, created_ts in data: - redis_key_bin = _redis_key(project, entity_key) - ts = Timestamp() - ts.seconds = int(utils.make_tzaware(timestamp).timestamp()) - entity_hset[f"_ts:{feature_view}"] = ts.SerializeToString() - entity_hset[f"_ex:{feature_view}"] = ex_str - - for feature_name, val in values.items(): - f_key = _mmh3(f"{feature_view}:{feature_name}") - entity_hset[f_key] = val.SerializeToString() - - client.hset(redis_key_bin, mapping=entity_hset) - if progress: - progress(1) - - def online_read( - self, - project: str, - table: Union[FeatureTable, FeatureView], - entity_keys: List[EntityKeyProto], - requested_features: List[str] = None, - ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: - - client = self._get_client() - feature_view = table.name - - result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] - - if not requested_features: - requested_features = [f.name for f in table.features] - - for entity_key in entity_keys: - redis_key_bin = _redis_key(project, entity_key) - hset_keys = [_mmh3(f"{feature_view}:{k}") for k in requested_features] - ts_key = f"_ts:{feature_view}" - hset_keys.append(ts_key) - values = client.hmget(redis_key_bin, hset_keys) - requested_features.append(ts_key) - res_val = dict(zip(requested_features, values)) - - res_ts = Timestamp() - ts_val = res_val.pop(ts_key) - if ts_val: - res_ts.ParseFromString(ts_val) - - res = {} - for feature_name, val_bin in res_val.items(): - val = ValueProto() - if val_bin: - val.ParseFromString(val_bin) - res[feature_name] = val - - if not res: - result.append((None, None)) - else: - timestamp = datetime.fromtimestamp(res_ts.seconds) - result.append((timestamp, res)) - return result - - def materialize_single_feature_view( - self, - feature_view: FeatureView, - start_date: datetime, - end_date: datetime, - registry: Registry, - project: str, - tqdm_builder: Callable[[int], tqdm], - ) -> None: - entities = [] - for entity_name in feature_view.entities: - entities.append(registry.get_entity(entity_name, project)) - - ( - join_key_columns, - feature_name_columns, - event_timestamp_column, - created_timestamp_column, - ) = _get_column_names(feature_view, entities) - - start_date = utils.make_tzaware(start_date) - end_date = utils.make_tzaware(end_date) - - table = self.offline_store.pull_latest_from_table_or_query( - data_source=feature_view.input, - join_key_columns=join_key_columns, - feature_name_columns=feature_name_columns, - event_timestamp_column=event_timestamp_column, - created_timestamp_column=created_timestamp_column, - start_date=start_date, - end_date=end_date, - ) - - if feature_view.input.field_mapping is not None: - table = _run_field_mapping(table, feature_view.input.field_mapping) - - join_keys = [entity.join_key for entity in entities] - rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) - - with tqdm_builder(len(rows_to_write)) as pbar: - self.online_write_batch( - project, feature_view, rows_to_write, lambda x: pbar.update(x) - ) - - feature_view.materialization_intervals.append((start_date, end_date)) - registry.apply_feature_view(feature_view, project) - - def _parse_connection_string(self): - """ - Reads Redis connections string using format - for RedisCluster: - redis1:6379,redis2:6379,decode_responses=true,skip_full_coverage_check=true,ssl=true,password=... - for Redis: - redis_master:6379,db=0,ssl=true,password=... - """ - connection_string = self._connection_string - startup_nodes = [ - dict(zip(["host", "port"], c.split(":"))) - for c in connection_string.split(",") - if "=" not in c - ] - params = {} - for c in connection_string.split(","): - if "=" in c: - kv = c.split("=") - try: - kv[1] = json.loads(kv[1]) - except json.JSONDecodeError: - ... - - it = iter(kv) - params.update(dict(zip(it, it))) - - return startup_nodes, params - - def _get_client(self): - """ - Creates the Redis client RedisCluster or Redis depending on configuration - """ - startup_nodes, kwargs = self._parse_connection_string() - if self._redis_type == RedisType.redis_cluster: - kwargs["startup_nodes"] = startup_nodes - return RedisCluster(**kwargs) - else: - kwargs["host"] = startup_nodes[0]["host"] - kwargs["port"] = startup_nodes[0]["port"] - return Redis(**kwargs) - - def get_historical_features( - self, - config: RepoConfig, - feature_views: List[FeatureView], - feature_refs: List[str], - entity_df: Union[pd.DataFrame, str], - registry: Registry, - project: str, - ) -> RetrievalJob: - return self.offline_store.get_historical_features( - config=config, - feature_views=feature_views, - feature_refs=feature_refs, - entity_df=entity_df, - registry=registry, - project=project, - ) - - -def _redis_key(project: str, entity_key: EntityKeyProto): - redis_key = RedisKeyProto( - project=project, - entity_names=entity_key.join_keys, - entity_values=entity_key.entity_values, - ) - return redis_key.SerializeToString() - - -def _mmh3(key: str): - """ - Calculate murmur3_32 hash which is equal to scala version which is using little endian: - https://stackoverflow.com/questions/29932956/murmur3-hash-different-result-between-python-and-java-implementation - https://stackoverflow.com/questions/13141787/convert-decimal-int-to-little-endian-string-x-x - """ - key_hash = mmh3.hash(key, signed=False) - return bytes.fromhex(struct.pack(" RepoConfig: with open(config_path) as f: raw_config = yaml.safe_load(f) try: - return RepoConfig(**raw_config) + c = RepoConfig(**raw_config) + c.repo_path = repo_path + return c except ValidationError as e: raise FeastConfigError(e, config_path) diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 0da7b57b6a..576a0b7f35 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -212,7 +212,7 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue: float_list_val=FloatList( val=[ item - if type(item) in [np.float32, np.float64] + if type(item) in [np.float32, np.float64, float] else _type_err(item, np.float32) for item in value ] @@ -224,7 +224,7 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue: double_list_val=DoubleList( val=[ item - if type(item) in [np.float64, np.float32] + if type(item) in [np.float64, np.float32, float] else _type_err(item, np.float64) for item in value ] diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index b313ad9cc7..8b7e5f4d36 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -34,7 +34,7 @@ def teardown_infra( def online_write_batch( self, - project: str, + config: RepoConfig, table: Union[FeatureTable, FeatureView], data: List[ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] @@ -67,7 +67,7 @@ def get_historical_features( def online_read( self, - project: str, + config: RepoConfig, table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], requested_features: List[str] = None, diff --git a/sdk/python/tests/online_read_write_test.py b/sdk/python/tests/online_read_write_test.py index 2ef8c389ef..99339534f4 100644 --- a/sdk/python/tests/online_read_write_test.py +++ b/sdk/python/tests/online_read_write_test.py @@ -23,7 +23,7 @@ def _driver_rw_test(event_ts, created_ts, write, expect_read): write_lat, write_lon = write expect_lat, expect_lon = expect_read provider.online_write_batch( - project=store.project, + config=store.config, table=table, data=[ ( @@ -40,7 +40,7 @@ def _driver_rw_test(event_ts, created_ts, write, expect_read): ) read_rows = provider.online_read( - project=store.project, table=table, entity_keys=[entity_key] + config=store.config, table=table, entity_keys=[entity_key] ) assert len(read_rows) == 1 _, val = read_rows[0] diff --git a/sdk/python/tests/test_cli_redis.py b/sdk/python/tests/test_cli_redis.py index ed33046155..e948bffd25 100644 --- a/sdk/python/tests/test_cli_redis.py +++ b/sdk/python/tests/test_cli_redis.py @@ -29,11 +29,11 @@ def test_basic() -> None: f""" project: {project_id} registry: {data_path / "registry.db"} - provider: redis + provider: local offline_store: type: bigquery online_store: - redis_type: redis + type: redis connection_string: localhost:6379,db=0 """ ) diff --git a/sdk/python/tests/test_offline_online_store_consistency.py b/sdk/python/tests/test_offline_online_store_consistency.py index b6d2e399e0..02943fd2eb 100644 --- a/sdk/python/tests/test_offline_online_store_consistency.py +++ b/sdk/python/tests/test_offline_online_store_consistency.py @@ -173,9 +173,11 @@ def prep_redis_fs_and_fv() -> Iterator[Tuple[FeatureStore, FeatureView]]: config = RepoConfig( registry=str(Path(repo_dir_name) / "registry.db"), project=f"test_bq_correctness_{str(uuid.uuid4()).replace('-', '')}", - provider="redis", + provider="local", online_store=RedisOnlineStoreConfig( - redis_type=RedisType.redis, connection_string="localhost:6379,db=0", + type="redis", + redis_type=RedisType.redis, + connection_string="localhost:6379,db=0", ), ) fs = FeatureStore(config=config) diff --git a/sdk/python/tests/test_online_retrieval.py b/sdk/python/tests/test_online_retrieval.py index e68316f15b..b29b0c946f 100644 --- a/sdk/python/tests/test_online_retrieval.py +++ b/sdk/python/tests/test_online_retrieval.py @@ -34,7 +34,7 @@ def test_online() -> None: join_keys=["driver"], entity_values=[ValueProto(int64_val=1)] ) provider.online_write_batch( - project=store.project, + config=store.config, table=driver_locations_fv, data=[ ( @@ -54,7 +54,7 @@ def test_online() -> None: join_keys=["customer"], entity_values=[ValueProto(int64_val=5)] ) provider.online_write_batch( - project=store.project, + config=store.config, table=customer_profile_fv, data=[ ( @@ -76,7 +76,7 @@ def test_online() -> None: entity_values=[ValueProto(int64_val=5), ValueProto(int64_val=1)], ) provider.online_write_batch( - project=store.project, + config=store.config, table=customer_driver_combined_fv, data=[ (