From ee463e3b08d657a9ec97015559d27332c114828a Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 7 Jun 2021 18:48:42 -0700 Subject: [PATCH 01/13] Introduce an OnlineStore interface Signed-off-by: Achal Shah --- sdk/python/feast/errors.py | 7 + sdk/python/feast/feature_store.py | 4 +- sdk/python/feast/infra/gcp.py | 94 +-------- sdk/python/feast/infra/local.py | 78 +------- .../feast/infra/online_stores/__init__.py | 0 .../feast/infra/online_stores/datastore.py | 178 ++++++++++++++++++ .../feast/infra/online_stores/helpers.py | 46 +++++ .../feast/infra/online_stores/online_store.py | 51 +++++ .../feast/infra/online_stores/sqlite.py | 146 ++++++++++++++ sdk/python/feast/infra/provider.py | 6 +- sdk/python/tests/online_read_write_test.py | 4 +- sdk/python/tests/test_online_retrieval.py | 6 +- 12 files changed, 452 insertions(+), 168 deletions(-) create mode 100644 sdk/python/feast/infra/online_stores/__init__.py create mode 100644 sdk/python/feast/infra/online_stores/datastore.py create mode 100644 sdk/python/feast/infra/online_stores/helpers.py create mode 100644 sdk/python/feast/infra/online_stores/online_store.py create mode 100644 sdk/python/feast/infra/online_stores/sqlite.py diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index e57b084b81..80ae5053bf 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -69,6 +69,13 @@ def __init__(self, offline_store_name: str, data_source_name: str): ) +class FeastOnlineStoreUnsupportedDataSource(Exception): + def __init__(self, online_store_name: str, data_source_name: str): + super().__init__( + f"Online Store '{online_store_name}' does not support data source '{data_source_name}'" + ) + + class FeastEntityDFMissingColumnsError(Exception): def __init__(self, expected, missing): super().__init__( diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 5e5a1f5ec1..eaaab5869a 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -541,10 +541,10 @@ def get_online_features( table, union_of_entity_keys, entity_name_to_join_key_map ) read_rows = provider.online_read( - project=self.project, + config=self.config, table=table, entity_keys=entity_keys, - requested_features=requested_features, + requested_features=requested_features ) for row_idx, read_row in enumerate(read_rows): row_ts, feature_data = read_row diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index dce073e1d3..15c3a4d458 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -13,6 +13,7 @@ from feast.feature_view import FeatureView from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.offline_stores.helpers import get_offline_store_from_config +from feast.infra.online_stores.helpers import get_online_store_from_config from feast.infra.provider import ( Provider, RetrievalJob, @@ -47,6 +48,7 @@ def __init__(self, config: RepoConfig): assert config.offline_store is not None self.offline_store = get_offline_store_from_config(config.offline_store) + self.online_store = get_online_store_from_config(config.online_store) def _initialize_client(self): try: @@ -108,46 +110,24 @@ def teardown_infra( def online_write_batch( self, - project: str, + config: RepoConfig, table: Union[FeatureTable, FeatureView], data: List[ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] ], progress: Optional[Callable[[int], Any]], ) -> None: - client = self._initialize_client() - - pool = ThreadPool(processes=self._write_concurrency) - pool.map( - lambda b: _write_minibatch(client, project, table, b, progress), - _to_minibatches(data, batch_size=self._write_batch_size), - ) + self.online_store.online_write_batch(config, table, data, progress) def online_read( self, - project: str, + config: RepoConfig, table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], requested_features: List[str] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: - client = self._initialize_client() + result = self.online_store.online_read(config, table, entity_keys) - result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] - for entity_key in entity_keys: - document_id = compute_datastore_entity_id(entity_key) - key = client.key( - "Project", project, "Table", table.name, "Row", document_id - ) - value = client.get(key) - if value is not None: - res = {} - for feature_name, value_bin in value["values"].items(): - val = ValueProto() - val.ParseFromString(value_bin) - res[feature_name] = val - result.append((value["event_ts"], res)) - else: - result.append((None, None)) return result def materialize_single_feature_view( @@ -216,58 +196,6 @@ def get_historical_features( ] -def _to_minibatches(data: ProtoBatch, batch_size) -> Iterator[ProtoBatch]: - """ - Split data into minibatches, making sure we stay under GCP datastore transaction size - limits. - """ - iterable = iter(data) - - while True: - batch = list(itertools.islice(iterable, batch_size)) - if len(batch) > 0: - yield batch - else: - break - - -def _write_minibatch( - client, - project: str, - table: Union[FeatureTable, FeatureView], - data: Sequence[ - Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] - ], - progress: Optional[Callable[[int], Any]], -): - entities = [] - for entity_key, features, timestamp, created_ts in data: - document_id = compute_datastore_entity_id(entity_key) - - key = client.key("Project", project, "Table", table.name, "Row", document_id,) - - entity = datastore.Entity( - key=key, exclude_from_indexes=("created_ts", "event_ts", "values") - ) - - entity.update( - dict( - key=entity_key.SerializeToString(), - values={k: v.SerializeToString() for k, v in features.items()}, - event_ts=utils.make_tzaware(timestamp), - created_ts=( - utils.make_tzaware(created_ts) if created_ts is not None else None - ), - ) - ) - entities.append(entity) - with client.transaction(): - client.put_multi(entities) - - if progress: - progress(len(entities)) - - def _delete_all_values(client, key) -> None: """ Delete all data under the key path in datastore. @@ -280,13 +208,3 @@ def _delete_all_values(client, key) -> None: for entity in entities: client.delete(entity.key) - - -def compute_datastore_entity_id(entity_key: EntityKeyProto) -> str: - """ - Compute Datastore Entity id given Feast Entity Key. - - Remember that Datastore Entity is a concept from the Datastore data model, that has nothing to - do with the Entity concept we have in Feast. - """ - return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex() diff --git a/sdk/python/feast/infra/local.py b/sdk/python/feast/infra/local.py index 3827e6aea6..ea2dce4f54 100644 --- a/sdk/python/feast/infra/local.py +++ b/sdk/python/feast/infra/local.py @@ -11,8 +11,8 @@ from feast import FeatureTable from feast.entity import Entity from feast.feature_view import FeatureView -from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.offline_stores.helpers import get_offline_store_from_config +from feast.infra.online_stores.helpers import get_online_store_from_config from feast.infra.provider import ( Provider, RetrievalJob, @@ -31,6 +31,7 @@ class LocalProvider(Provider): def __init__(self, config: RepoConfig, repo_path: Path): assert config is not None + self.config = config assert isinstance(config.online_store, SqliteOnlineStoreConfig) assert config.offline_store is not None local_path = Path(config.online_store.path) @@ -39,6 +40,7 @@ def __init__(self, config: RepoConfig, repo_path: Path): else: self._db_path = repo_path.joinpath(local_path) self.offline_store = get_offline_store_from_config(config.offline_store) + self.online_store = get_online_store_from_config(config.online_store) def _get_conn(self): Path(self._db_path).parent.mkdir(exist_ok=True) @@ -77,88 +79,24 @@ def teardown_infra( def online_write_batch( self, - project: str, + config: RepoConfig, table: Union[FeatureTable, FeatureView], data: List[ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] ], progress: Optional[Callable[[int], Any]], ) -> None: - conn = self._get_conn() - - with conn: - for entity_key, values, timestamp, created_ts in data: - entity_key_bin = serialize_entity_key(entity_key) - timestamp = _to_naive_utc(timestamp) - if created_ts is not None: - created_ts = _to_naive_utc(created_ts) - - for feature_name, val in values.items(): - conn.execute( - f""" - UPDATE {_table_id(project, table)} - SET value = ?, event_ts = ?, created_ts = ? - WHERE (entity_key = ? AND feature_name = ?) - """, - ( - # SET - val.SerializeToString(), - timestamp, - created_ts, - # WHERE - entity_key_bin, - feature_name, - ), - ) - - conn.execute( - f"""INSERT OR IGNORE INTO {_table_id(project, table)} - (entity_key, feature_name, value, event_ts, created_ts) - VALUES (?, ?, ?, ?, ?)""", - ( - entity_key_bin, - feature_name, - val.SerializeToString(), - timestamp, - created_ts, - ), - ) - if progress: - progress(1) + self.online_store.online_write_batch(config, table, data, progress) def online_read( self, - project: str, + config: RepoConfig, table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], requested_features: List[str] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + result = self.online_store.online_read(config, table, entity_keys) - conn = self._get_conn() - cur = conn.cursor() - - result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] - - for entity_key in entity_keys: - entity_key_bin = serialize_entity_key(entity_key) - - cur.execute( - f"SELECT feature_name, value, event_ts FROM {_table_id(project, table)} WHERE entity_key = ?", - (entity_key_bin,), - ) - - res = {} - res_ts = None - for feature_name, val_bin, ts in cur.fetchall(): - val = ValueProto() - val.ParseFromString(val_bin) - res[feature_name] = val - res_ts = ts - - if not res: - result.append((None, None)) - else: - result.append((res_ts, res)) return result def materialize_single_feature_view( @@ -199,7 +137,7 @@ def materialize_single_feature_view( with tqdm_builder(len(rows_to_write)) as pbar: self.online_write_batch( - project, feature_view, rows_to_write, lambda x: pbar.update(x) + self.config, feature_view, rows_to_write, lambda x: pbar.update(x) ) def get_historical_features( diff --git a/sdk/python/feast/infra/online_stores/__init__.py b/sdk/python/feast/infra/online_stores/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py new file mode 100644 index 0000000000..4d4ff57809 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -0,0 +1,178 @@ +# Copyright 2021 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import itertools +from datetime import datetime +from multiprocessing.pool import ThreadPool +from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union + +import mmh3 + +from feast import FeatureTable +from feast.feature_view import FeatureView +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import RepoConfig + +try: + from google.auth.exceptions import DefaultCredentialsError + from google.cloud import datastore +except ImportError as e: + from feast.errors import FeastExtrasDependencyImportError, FeastProviderLoginError + + raise FeastExtrasDependencyImportError("gcp", str(e)) + + +ProtoBatch = Sequence[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] +] + + +class DatastoreOnlineStore(OnlineStore): + """ + OnlineStore is an object used for all interaction between Feast and the service used for offline storage of + features. + """ + + @classmethod + def _initialize_client(cls, config: RepoConfig): + try: + return datastore.Client( + project=config.online_store.project_id, + namespace=config.online_store.namespace, + ) + except DefaultCredentialsError as e: + raise FeastProviderLoginError( + str(e) + + '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your ' + "local Google Cloud account " + ) + + @classmethod + def online_write_batch( + cls, + config: RepoConfig, + table: Union[FeatureTable, FeatureView], + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + client = cls._initialize_client(config) + + write_concurrency = config.online_store.write_concurrency + write_batch_size = config.online_store.write_batch_size + feast_project = config.project + + pool = ThreadPool(processes=write_concurrency) + pool.map( + lambda b: cls._write_minibatch(client, feast_project, table, b, progress), + cls._to_minibatches(data, batch_size=write_batch_size), + ) + + @staticmethod + def _to_minibatches(data: ProtoBatch, batch_size) -> Iterator[ProtoBatch]: + """ + Split data into minibatches, making sure we stay under GCP datastore transaction size + limits. + """ + iterable = iter(data) + + while True: + batch = list(itertools.islice(iterable, batch_size)) + if len(batch) > 0: + yield batch + else: + break + + @staticmethod + def _write_minibatch( + client, + project: str, + table: Union[FeatureTable, FeatureView], + data: Sequence[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ): + entities = [] + for entity_key, features, timestamp, created_ts in data: + document_id = compute_datastore_entity_id(entity_key) + + key = client.key( + "Project", project, "Table", table.name, "Row", document_id, + ) + + entity = datastore.Entity( + key=key, exclude_from_indexes=("created_ts", "event_ts", "values") + ) + + entity.update( + dict( + key=entity_key.SerializeToString(), + values={k: v.SerializeToString() for k, v in features.items()}, + event_ts=utils.make_tzaware(timestamp), + created_ts=( + utils.make_tzaware(created_ts) + if created_ts is not None + else None + ), + ) + ) + entities.append(entity) + with client.transaction(): + client.put_multi(entities) + + if progress: + progress(len(entities)) + + @classmethod + def online_read( + cls, + config: RepoConfig, + table: Union[FeatureTable, FeatureView], + entity_keys: List[EntityKeyProto], + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + client = cls._initialize_client(config) + + feast_project = config.project + + result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + for entity_key in entity_keys: + document_id = compute_datastore_entity_id(entity_key) + key = client.key( + "Project", feast_project, "Table", table.name, "Row", document_id + ) + value = client.get(key) + if value is not None: + res = {} + for feature_name, value_bin in value["values"].items(): + val = ValueProto() + val.ParseFromString(value_bin) + res[feature_name] = val + result.append((value["event_ts"], res)) + else: + result.append((None, None)) + return result + + +def compute_datastore_entity_id(entity_key: EntityKeyProto) -> str: + """ + Compute Datastore Entity id given Feast Entity Key. + + Remember that Datastore Entity is a concept from the Datastore data model, that has nothing to + do with the Entity concept we have in Feast. + """ + return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex() diff --git a/sdk/python/feast/infra/online_stores/helpers.py b/sdk/python/feast/infra/online_stores/helpers.py new file mode 100644 index 0000000000..4c37622ed1 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/helpers.py @@ -0,0 +1,46 @@ +from feast.data_source import BigQuerySource, DataSource, FileSource +from feast.errors import FeastOnlineStoreUnsupportedDataSource +from feast.infra.online_stores.datastore import DatastoreOnlineStore +from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.online_stores.sqlite import SqliteOnlineStore +from feast.repo_config import ( + DatastoreOnlineStoreConfig, + OnlineStoreConfig, + SqliteOnlineStoreConfig, +) + + +def get_online_store_from_config( + online_store_config: OnlineStoreConfig, +) -> OnlineStore: + """Get the offline store from offline store config""" + + if isinstance(online_store_config, SqliteOnlineStoreConfig): + from feast.infra.offline_stores.file import FileOfflineStore + + return SqliteOnlineStore() + elif isinstance(online_store_config, DatastoreOnlineStoreConfig): + from feast.infra.offline_stores.bigquery import BigQueryOfflineStore + + return DatastoreOnlineStore() + + raise ValueError(f"Unsupported offline store config '{online_store_config}'") + + +SUPPORTED_DATA_SOURCES_FOR_ONLINE_STORE = { + SqliteOnlineStoreConfig: {FileSource}, + DatastoreOnlineStoreConfig: {BigQuerySource}, +} + + +def assert_online_store_supports_data_source( + online_store_config: OnlineStoreConfig, data_source: DataSource +): + if type(data_source) in SUPPORTED_DATA_SOURCES_FOR_ONLINE_STORE.get( + type(online_store_config), set() + ): + return + + raise FeastOnlineStoreUnsupportedDataSource( + online_store_config.type, data_source.__class__.__name__ + ) diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py new file mode 100644 index 0000000000..3ef0ef8f85 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -0,0 +1,51 @@ +# Copyright 2021 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from datetime import datetime +from typing import Any, Callable, Dict, List, Optional, Tuple, Union + +from feast import FeatureTable +from feast.feature_view import FeatureView +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import RepoConfig + + +class OnlineStore(ABC): + """ + OnlineStore is an object used for all interaction between Feast and the service used for online storage of + features. + """ + + @staticmethod + @abstractmethod + def online_write_batch( + config: RepoConfig, + table: Union[FeatureTable, FeatureView], + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + pass + + @staticmethod + @abstractmethod + def online_read( + config: RepoConfig, + table: Union[FeatureTable, FeatureView], + entity_keys: List[EntityKeyProto], + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + pass diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py new file mode 100644 index 0000000000..d2150d09a7 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -0,0 +1,146 @@ +# Copyright 2021 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import sqlite3 +from datetime import datetime +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional, Tuple, Union + +import pytz + +from feast import FeatureTable +from feast.feature_view import FeatureView +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import RepoConfig + + +class SqliteOnlineStore(OnlineStore): + """ + OnlineStore is an object used for all interaction between Feast and the service used for offline storage of + features. + """ + + @staticmethod + def _get_conn(config: RepoConfig): + assert config.online_store.type == "sqlite" + + Path(config.online_store.path).parent.mkdir(exist_ok=True) + return sqlite3.connect( + config.online_store.path, + detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES, + ) + + @classmethod + def online_write_batch( + cls, + config: RepoConfig, + table: Union[FeatureTable, FeatureView], + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + conn = cls._get_conn(config) + + project = config.project + + with conn: + for entity_key, values, timestamp, created_ts in data: + entity_key_bin = serialize_entity_key(entity_key) + timestamp = _to_naive_utc(timestamp) + if created_ts is not None: + created_ts = _to_naive_utc(created_ts) + + for feature_name, val in values.items(): + conn.execute( + f""" + UPDATE {_table_id(project, table)} + SET value = ?, event_ts = ?, created_ts = ? + WHERE (entity_key = ? AND feature_name = ?) + """, + ( + # SET + val.SerializeToString(), + timestamp, + created_ts, + # WHERE + entity_key_bin, + feature_name, + ), + ) + + conn.execute( + f"""INSERT OR IGNORE INTO {_table_id(project, table)} + (entity_key, feature_name, value, event_ts, created_ts) + VALUES (?, ?, ?, ?, ?)""", + ( + entity_key_bin, + feature_name, + val.SerializeToString(), + timestamp, + created_ts, + ), + ) + if progress: + progress(1) + + @classmethod + def online_read( + cls, + config: RepoConfig, + table: Union[FeatureTable, FeatureView], + entity_keys: List[EntityKeyProto], + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + pass + conn = cls._get_conn(config) + cur = conn.cursor() + + result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + + project = config.project + for entity_key in entity_keys: + entity_key_bin = serialize_entity_key(entity_key) + + cur.execute( + f"SELECT feature_name, value, event_ts FROM {_table_id(project, table)} WHERE entity_key = ?", + (entity_key_bin,), + ) + + res = {} + res_ts = None + for feature_name, val_bin, ts in cur.fetchall(): + val = ValueProto() + val.ParseFromString(val_bin) + res[feature_name] = val + res_ts = ts + + if not res: + result.append((None, None)) + else: + result.append((res_ts, res)) + return result + + +def _table_id(project: str, table: Union[FeatureTable, FeatureView]) -> str: + return f"{project}_{table.name}" + + +def _to_naive_utc(ts: datetime): + if ts.tzinfo is None: + return ts + else: + return ts.astimezone(pytz.utc).replace(tzinfo=None) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 6a378d0759..0ea9bcb2e5 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -71,7 +71,7 @@ def teardown_infra( @abc.abstractmethod def online_write_batch( self, - project: str, + config: RepoConfig, table: Union[FeatureTable, FeatureView], data: List[ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] @@ -85,7 +85,7 @@ def online_write_batch( If a tz-naive timestamp is passed to this method, it is assumed to be UTC. Args: - project: Feast project name + config: The RepoConfig for the current FeatureStore. table: Feast FeatureTable data: a list of quadruplets containing Feature data. Each quadruplet contains an Entity Key, a dict containing feature values, an event timestamp for the row, and @@ -122,7 +122,7 @@ def get_historical_features( @abc.abstractmethod def online_read( self, - project: str, + config: RepoConfig, table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], requested_features: List[str] = None, diff --git a/sdk/python/tests/online_read_write_test.py b/sdk/python/tests/online_read_write_test.py index 2ef8c389ef..99339534f4 100644 --- a/sdk/python/tests/online_read_write_test.py +++ b/sdk/python/tests/online_read_write_test.py @@ -23,7 +23,7 @@ def _driver_rw_test(event_ts, created_ts, write, expect_read): write_lat, write_lon = write expect_lat, expect_lon = expect_read provider.online_write_batch( - project=store.project, + config=store.config, table=table, data=[ ( @@ -40,7 +40,7 @@ def _driver_rw_test(event_ts, created_ts, write, expect_read): ) read_rows = provider.online_read( - project=store.project, table=table, entity_keys=[entity_key] + config=store.config, table=table, entity_keys=[entity_key] ) assert len(read_rows) == 1 _, val = read_rows[0] diff --git a/sdk/python/tests/test_online_retrieval.py b/sdk/python/tests/test_online_retrieval.py index e68316f15b..b29b0c946f 100644 --- a/sdk/python/tests/test_online_retrieval.py +++ b/sdk/python/tests/test_online_retrieval.py @@ -34,7 +34,7 @@ def test_online() -> None: join_keys=["driver"], entity_values=[ValueProto(int64_val=1)] ) provider.online_write_batch( - project=store.project, + config=store.config, table=driver_locations_fv, data=[ ( @@ -54,7 +54,7 @@ def test_online() -> None: join_keys=["customer"], entity_values=[ValueProto(int64_val=5)] ) provider.online_write_batch( - project=store.project, + config=store.config, table=customer_profile_fv, data=[ ( @@ -76,7 +76,7 @@ def test_online() -> None: entity_values=[ValueProto(int64_val=5), ValueProto(int64_val=1)], ) provider.online_write_batch( - project=store.project, + config=store.config, table=customer_driver_combined_fv, data=[ ( From 136c1dc8653715edd43efbe88e293689cb77534b Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 8 Jun 2021 11:35:49 -0700 Subject: [PATCH 02/13] format and lint Signed-off-by: Achal Shah --- sdk/python/feast/infra/gcp.py | 11 ++++----- .../feast/infra/online_stores/datastore.py | 24 ++++++++++++------- .../feast/infra/online_stores/helpers.py | 20 +++++++--------- .../feast/infra/online_stores/sqlite.py | 1 - sdk/python/tests/foo_provider.py | 4 ++-- 5 files changed, 29 insertions(+), 31 deletions(-) diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index 15c3a4d458..30516292d3 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -1,17 +1,13 @@ -import itertools from datetime import datetime -from multiprocessing.pool import ThreadPool -from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union -import mmh3 import pandas from tqdm import tqdm -from feast import FeatureTable, utils +from feast import FeatureTable from feast.entity import Entity from feast.errors import FeastProviderLoginError from feast.feature_view import FeatureView -from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.offline_stores.helpers import get_offline_store_from_config from feast.infra.online_stores.helpers import get_online_store_from_config from feast.infra.provider import ( @@ -47,6 +43,7 @@ def __init__(self, config: RepoConfig): self._write_batch_size = config.online_store.write_batch_size assert config.offline_store is not None + self.repo_config = config self.offline_store = get_offline_store_from_config(config.offline_store) self.online_store = get_online_store_from_config(config.online_store) @@ -168,7 +165,7 @@ def materialize_single_feature_view( with tqdm_builder(len(rows_to_write)) as pbar: self.online_write_batch( - project, feature_view, rows_to_write, lambda x: pbar.update(x) + self.repo_config, feature_view, rows_to_write, lambda x: pbar.update(x) ) def get_historical_features( diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py index 4d4ff57809..4a6c9c77b8 100644 --- a/sdk/python/feast/infra/online_stores/datastore.py +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -18,13 +18,13 @@ import mmh3 -from feast import FeatureTable +from feast import FeatureTable, utils from feast.feature_view import FeatureView from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto -from feast.repo_config import RepoConfig +from feast.repo_config import DatastoreOnlineStoreConfig, RepoConfig try: from google.auth.exceptions import DefaultCredentialsError @@ -47,11 +47,11 @@ class DatastoreOnlineStore(OnlineStore): """ @classmethod - def _initialize_client(cls, config: RepoConfig): + def _initialize_client(cls, online_config: DatastoreOnlineStoreConfig): + try: return datastore.Client( - project=config.online_store.project_id, - namespace=config.online_store.namespace, + project=online_config.project_id, namespace=online_config.namespace, ) except DefaultCredentialsError as e: raise FeastProviderLoginError( @@ -70,10 +70,13 @@ def online_write_batch( ], progress: Optional[Callable[[int], Any]], ) -> None: - client = cls._initialize_client(config) - write_concurrency = config.online_store.write_concurrency - write_batch_size = config.online_store.write_batch_size + online_config = config.online_store + assert isinstance(online_config, DatastoreOnlineStoreConfig) + client = cls._initialize_client(online_config) + + write_concurrency = online_config.write_concurrency + write_batch_size = online_config.write_batch_size feast_project = config.project pool = ThreadPool(processes=write_concurrency) @@ -145,7 +148,10 @@ def online_read( table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: - client = cls._initialize_client(config) + + online_config = config.online_store + assert isinstance(online_config, DatastoreOnlineStoreConfig) + client = cls._initialize_client(online_config) feast_project = config.project diff --git a/sdk/python/feast/infra/online_stores/helpers.py b/sdk/python/feast/infra/online_stores/helpers.py index 4c37622ed1..cf486d760f 100644 --- a/sdk/python/feast/infra/online_stores/helpers.py +++ b/sdk/python/feast/infra/online_stores/helpers.py @@ -1,8 +1,6 @@ from feast.data_source import BigQuerySource, DataSource, FileSource from feast.errors import FeastOnlineStoreUnsupportedDataSource -from feast.infra.online_stores.datastore import DatastoreOnlineStore from feast.infra.online_stores.online_store import OnlineStore -from feast.infra.online_stores.sqlite import SqliteOnlineStore from feast.repo_config import ( DatastoreOnlineStoreConfig, OnlineStoreConfig, @@ -16,28 +14,26 @@ def get_online_store_from_config( """Get the offline store from offline store config""" if isinstance(online_store_config, SqliteOnlineStoreConfig): - from feast.infra.offline_stores.file import FileOfflineStore + from feast.infra.online_stores.sqlite import SqliteOnlineStore return SqliteOnlineStore() elif isinstance(online_store_config, DatastoreOnlineStoreConfig): - from feast.infra.offline_stores.bigquery import BigQueryOfflineStore + from feast.infra.online_stores.datastore import DatastoreOnlineStore return DatastoreOnlineStore() raise ValueError(f"Unsupported offline store config '{online_store_config}'") -SUPPORTED_DATA_SOURCES_FOR_ONLINE_STORE = { - SqliteOnlineStoreConfig: {FileSource}, - DatastoreOnlineStoreConfig: {BigQuerySource}, -} - - def assert_online_store_supports_data_source( online_store_config: OnlineStoreConfig, data_source: DataSource ): - if type(data_source) in SUPPORTED_DATA_SOURCES_FOR_ONLINE_STORE.get( - type(online_store_config), set() + if ( + isinstance(online_store_config, SqliteOnlineStoreConfig) + and isinstance(data_source, FileSource) + ) or ( + isinstance(online_store_config, DatastoreOnlineStoreConfig) + and isinstance(data_source, BigQuerySource) ): return diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index d2150d09a7..2ef5a4ed66 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import os import sqlite3 from datetime import datetime from pathlib import Path diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index b313ad9cc7..8b7e5f4d36 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -34,7 +34,7 @@ def teardown_infra( def online_write_batch( self, - project: str, + config: RepoConfig, table: Union[FeatureTable, FeatureView], data: List[ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] @@ -67,7 +67,7 @@ def get_historical_features( def online_read( self, - project: str, + config: RepoConfig, table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], requested_features: List[str] = None, From 6d591cfaa955fee4f9e8cabb9face33a2ff2b296 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 10 Jun 2021 09:21:01 -0700 Subject: [PATCH 03/13] refactor redis into its own class too Signed-off-by: Achal Shah --- sdk/python/feast/feature_store.py | 2 +- .../feast/infra/online_stores/helpers.py | 55 +++- sdk/python/feast/infra/online_stores/redis.py | 140 +++++++++ .../feast/infra/online_stores/sqlite.py | 10 +- sdk/python/feast/infra/provider.py | 4 - sdk/python/feast/infra/redis.py | 281 ------------------ sdk/python/feast/repo_config.py | 6 +- sdk/python/tests/test_cli_redis.py | 2 +- .../test_offline_online_store_consistency.py | 2 +- 9 files changed, 201 insertions(+), 301 deletions(-) create mode 100644 sdk/python/feast/infra/online_stores/redis.py delete mode 100644 sdk/python/feast/infra/redis.py diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index eaaab5869a..8503632d5c 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -544,7 +544,7 @@ def get_online_features( config=self.config, table=table, entity_keys=entity_keys, - requested_features=requested_features + requested_features=requested_features, ) for row_idx, read_row in enumerate(read_rows): row_ts, feature_data = read_row diff --git a/sdk/python/feast/infra/online_stores/helpers.py b/sdk/python/feast/infra/online_stores/helpers.py index cf486d760f..564ff5b049 100644 --- a/sdk/python/feast/infra/online_stores/helpers.py +++ b/sdk/python/feast/infra/online_stores/helpers.py @@ -1,9 +1,17 @@ +import struct +from typing import Dict, Set + +import mmh3 + from feast.data_source import BigQuerySource, DataSource, FileSource from feast.errors import FeastOnlineStoreUnsupportedDataSource from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.storage.Redis_pb2 import RedisKeyV2 as RedisKeyProto +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.repo_config import ( DatastoreOnlineStoreConfig, OnlineStoreConfig, + RedisOnlineStoreConfig, SqliteOnlineStoreConfig, ) @@ -21,22 +29,51 @@ def get_online_store_from_config( from feast.infra.online_stores.datastore import DatastoreOnlineStore return DatastoreOnlineStore() + elif isinstance(online_store_config, RedisOnlineStoreConfig): + from feast.infra.online_stores.redis import RedisOnlineStore + return RedisOnlineStore() raise ValueError(f"Unsupported offline store config '{online_store_config}'") +SUPPORTED_SOURCES = { + SqliteOnlineStoreConfig: {FileSource}, + DatastoreOnlineStoreConfig: {BigQuerySource}, + RedisOnlineStoreConfig: {FileSource, BigQuerySource}, +} + + def assert_online_store_supports_data_source( online_store_config: OnlineStoreConfig, data_source: DataSource ): - if ( - isinstance(online_store_config, SqliteOnlineStoreConfig) - and isinstance(data_source, FileSource) - ) or ( - isinstance(online_store_config, DatastoreOnlineStoreConfig) - and isinstance(data_source, BigQuerySource) - ): - return - + supported_sources = SUPPORTED_SOURCES.get(online_store_config.__class__, {}) + # This is needed because checking for `in` with Union types breaks mypy. + # https://github.com/python/mypy/issues/4954 + # We can replace this with `data_source.__class__ in SUPPORTED_SOURCES[online_store_config.__class__]` + # Once ^ is resolved. + if supported_sources: + for source in supported_sources: + if source == data_source.__class__: + return raise FeastOnlineStoreUnsupportedDataSource( online_store_config.type, data_source.__class__.__name__ ) + + +def _redis_key(project: str, entity_key: EntityKeyProto): + redis_key = RedisKeyProto( + project=project, + entity_names=entity_key.join_keys, + entity_values=entity_key.entity_values, + ) + return redis_key.SerializeToString() + + +def _mmh3(key: str): + """ + Calculate murmur3_32 hash which is equal to scala version which is using little endian: + https://stackoverflow.com/questions/29932956/murmur3-hash-different-result-between-python-and-java-implementation + https://stackoverflow.com/questions/13141787/convert-decimal-int-to-little-endian-string-x-x + """ + key_hash = mmh3.hash(key, signed=False) + return bytes.fromhex(struct.pack(" None: + pass + + @classmethod + def online_read( + cls, + config: RepoConfig, + table: Union[FeatureTable, FeatureView], + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + online_store_config = config.online_store + assert isinstance(online_store_config, RedisOnlineStoreConfig) + + client = cls._get_client(online_store_config) + feature_view = table.name + project = config.project + + result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + + if not entity_keys: + entity_keys = [f.name for f in table.features] + + for entity_key in entity_keys: + redis_key_bin = _redis_key(project, entity_key) + hset_keys = [_mmh3(f"{feature_view}:{k}") for k in entity_keys] + ts_key = f"_ts:{feature_view}" + hset_keys.append(ts_key) + values = client.hmget(redis_key_bin, hset_keys) + entity_keys.append(ts_key) + res_val = dict(zip(entity_keys, values)) + + res_ts = Timestamp() + ts_val = res_val.pop(ts_key) + if ts_val: + res_ts.ParseFromString(ts_val) + + res = {} + for feature_name, val_bin in res_val.items(): + val = ValueProto() + if val_bin: + val.ParseFromString(val_bin) + res[feature_name] = val + + if not res: + result.append((None, None)) + else: + timestamp = datetime.fromtimestamp(res_ts.seconds) + result.append((timestamp, res)) + return result diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 2ef5a4ed66..f7393a420b 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -37,10 +37,14 @@ class SqliteOnlineStore(OnlineStore): def _get_conn(config: RepoConfig): assert config.online_store.type == "sqlite" - Path(config.online_store.path).parent.mkdir(exist_ok=True) + if config.repo_path: + db_path = config.repo_path / config.online_store.path + else: + db_path = config.online_store.path + + Path(db_path).parent.mkdir(exist_ok=True) return sqlite3.connect( - config.online_store.path, - detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES, + db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES, ) @classmethod diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 0ea9bcb2e5..be51208fb3 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -145,10 +145,6 @@ def get_provider(config: RepoConfig, repo_path: Path) -> Provider: from feast.infra.gcp import GcpProvider return GcpProvider(config) - elif config.provider == "redis": - from feast.infra.redis import RedisProvider - - return RedisProvider(config) elif config.provider == "local": from feast.infra.local import LocalProvider diff --git a/sdk/python/feast/infra/redis.py b/sdk/python/feast/infra/redis.py deleted file mode 100644 index f4200918b3..0000000000 --- a/sdk/python/feast/infra/redis.py +++ /dev/null @@ -1,281 +0,0 @@ -import json -import struct -from datetime import datetime -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union - -import mmh3 -import pandas as pd -from google.protobuf.timestamp_pb2 import Timestamp - -try: - from redis import Redis - from rediscluster import RedisCluster -except ImportError as e: - from feast.errors import FeastExtrasDependencyImportError - - raise FeastExtrasDependencyImportError("redis", str(e)) - -from tqdm import tqdm - -from feast import FeatureTable, utils -from feast.entity import Entity -from feast.feature_view import FeatureView -from feast.infra.offline_stores.helpers import get_offline_store_from_config -from feast.infra.provider import ( - Provider, - RetrievalJob, - _convert_arrow_to_proto, - _get_column_names, - _run_field_mapping, -) -from feast.protos.feast.storage.Redis_pb2 import RedisKeyV2 as RedisKeyProto -from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto -from feast.protos.feast.types.Value_pb2 import Value as ValueProto -from feast.registry import Registry -from feast.repo_config import RedisOnlineStoreConfig, RedisType, RepoConfig - -EX_SECONDS = 253402300799 - - -class RedisProvider(Provider): - _redis_type: Optional[RedisType] - _connection_string: str - - def __init__(self, config: RepoConfig): - assert isinstance(config.online_store, RedisOnlineStoreConfig) - if config.online_store.redis_type: - self._redis_type = config.online_store.redis_type - if config.online_store.connection_string: - self._connection_string = config.online_store.connection_string - self.offline_store = get_offline_store_from_config(config.offline_store) - - def update_infra( - self, - project: str, - tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], - tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], - entities_to_delete: Sequence[Entity], - entities_to_keep: Sequence[Entity], - partial: bool, - ): - pass - - def teardown_infra( - self, - project: str, - tables: Sequence[Union[FeatureTable, FeatureView]], - entities: Sequence[Entity], - ) -> None: - # according to the repos_operations.py we can delete the whole project - client = self._get_client() - - tables_join_keys = [[e for e in t.entities] for t in tables] - for table_join_keys in tables_join_keys: - redis_key_bin = _redis_key( - project, EntityKeyProto(join_keys=table_join_keys) - ) - keys = [k for k in client.scan_iter(match=f"{redis_key_bin}*", count=100)] - if keys: - client.unlink(*keys) - - def online_write_batch( - self, - project: str, - table: Union[FeatureTable, FeatureView], - data: List[ - Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] - ], - progress: Optional[Callable[[int], Any]], - ) -> None: - client = self._get_client() - - entity_hset = {} - feature_view = table.name - - ex = Timestamp() - ex.seconds = EX_SECONDS - ex_str = ex.SerializeToString() - - for entity_key, values, timestamp, created_ts in data: - redis_key_bin = _redis_key(project, entity_key) - ts = Timestamp() - ts.seconds = int(utils.make_tzaware(timestamp).timestamp()) - entity_hset[f"_ts:{feature_view}"] = ts.SerializeToString() - entity_hset[f"_ex:{feature_view}"] = ex_str - - for feature_name, val in values.items(): - f_key = _mmh3(f"{feature_view}:{feature_name}") - entity_hset[f_key] = val.SerializeToString() - - client.hset(redis_key_bin, mapping=entity_hset) - if progress: - progress(1) - - def online_read( - self, - project: str, - table: Union[FeatureTable, FeatureView], - entity_keys: List[EntityKeyProto], - requested_features: List[str] = None, - ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: - - client = self._get_client() - feature_view = table.name - - result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] - - if not requested_features: - requested_features = [f.name for f in table.features] - - for entity_key in entity_keys: - redis_key_bin = _redis_key(project, entity_key) - hset_keys = [_mmh3(f"{feature_view}:{k}") for k in requested_features] - ts_key = f"_ts:{feature_view}" - hset_keys.append(ts_key) - values = client.hmget(redis_key_bin, hset_keys) - requested_features.append(ts_key) - res_val = dict(zip(requested_features, values)) - - res_ts = Timestamp() - ts_val = res_val.pop(ts_key) - if ts_val: - res_ts.ParseFromString(ts_val) - - res = {} - for feature_name, val_bin in res_val.items(): - val = ValueProto() - if val_bin: - val.ParseFromString(val_bin) - res[feature_name] = val - - if not res: - result.append((None, None)) - else: - timestamp = datetime.fromtimestamp(res_ts.seconds) - result.append((timestamp, res)) - return result - - def materialize_single_feature_view( - self, - feature_view: FeatureView, - start_date: datetime, - end_date: datetime, - registry: Registry, - project: str, - tqdm_builder: Callable[[int], tqdm], - ) -> None: - entities = [] - for entity_name in feature_view.entities: - entities.append(registry.get_entity(entity_name, project)) - - ( - join_key_columns, - feature_name_columns, - event_timestamp_column, - created_timestamp_column, - ) = _get_column_names(feature_view, entities) - - start_date = utils.make_tzaware(start_date) - end_date = utils.make_tzaware(end_date) - - table = self.offline_store.pull_latest_from_table_or_query( - data_source=feature_view.input, - join_key_columns=join_key_columns, - feature_name_columns=feature_name_columns, - event_timestamp_column=event_timestamp_column, - created_timestamp_column=created_timestamp_column, - start_date=start_date, - end_date=end_date, - ) - - if feature_view.input.field_mapping is not None: - table = _run_field_mapping(table, feature_view.input.field_mapping) - - join_keys = [entity.join_key for entity in entities] - rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) - - with tqdm_builder(len(rows_to_write)) as pbar: - self.online_write_batch( - project, feature_view, rows_to_write, lambda x: pbar.update(x) - ) - - feature_view.materialization_intervals.append((start_date, end_date)) - registry.apply_feature_view(feature_view, project) - - def _parse_connection_string(self): - """ - Reads Redis connections string using format - for RedisCluster: - redis1:6379,redis2:6379,decode_responses=true,skip_full_coverage_check=true,ssl=true,password=... - for Redis: - redis_master:6379,db=0,ssl=true,password=... - """ - connection_string = self._connection_string - startup_nodes = [ - dict(zip(["host", "port"], c.split(":"))) - for c in connection_string.split(",") - if "=" not in c - ] - params = {} - for c in connection_string.split(","): - if "=" in c: - kv = c.split("=") - try: - kv[1] = json.loads(kv[1]) - except json.JSONDecodeError: - ... - - it = iter(kv) - params.update(dict(zip(it, it))) - - return startup_nodes, params - - def _get_client(self): - """ - Creates the Redis client RedisCluster or Redis depending on configuration - """ - startup_nodes, kwargs = self._parse_connection_string() - if self._redis_type == RedisType.redis_cluster: - kwargs["startup_nodes"] = startup_nodes - return RedisCluster(**kwargs) - else: - kwargs["host"] = startup_nodes[0]["host"] - kwargs["port"] = startup_nodes[0]["port"] - return Redis(**kwargs) - - def get_historical_features( - self, - config: RepoConfig, - feature_views: List[FeatureView], - feature_refs: List[str], - entity_df: Union[pd.DataFrame, str], - registry: Registry, - project: str, - ) -> RetrievalJob: - return self.offline_store.get_historical_features( - config=config, - feature_views=feature_views, - feature_refs=feature_refs, - entity_df=entity_df, - registry=registry, - project=project, - ) - - -def _redis_key(project: str, entity_key: EntityKeyProto): - redis_key = RedisKeyProto( - project=project, - entity_names=entity_key.join_keys, - entity_values=entity_key.entity_values, - ) - return redis_key.SerializeToString() - - -def _mmh3(key: str): - """ - Calculate murmur3_32 hash which is equal to scala version which is using little endian: - https://stackoverflow.com/questions/29932956/murmur3-hash-different-result-between-python-and-java-implementation - https://stackoverflow.com/questions/13141787/convert-decimal-int-to-little-endian-string-x-x - """ - key_hash = mmh3.hash(key, signed=False) - return bytes.fromhex(struct.pack(" RepoConfig: with open(config_path) as f: raw_config = yaml.safe_load(f) try: - return RepoConfig(**raw_config) + c = RepoConfig(**raw_config) + c.repo_path = repo_path + return c except ValidationError as e: raise FeastConfigError(e, config_path) diff --git a/sdk/python/tests/test_cli_redis.py b/sdk/python/tests/test_cli_redis.py index ed33046155..b51ab6878e 100644 --- a/sdk/python/tests/test_cli_redis.py +++ b/sdk/python/tests/test_cli_redis.py @@ -29,7 +29,7 @@ def test_basic() -> None: f""" project: {project_id} registry: {data_path / "registry.db"} - provider: redis + provider: local offline_store: type: bigquery online_store: diff --git a/sdk/python/tests/test_offline_online_store_consistency.py b/sdk/python/tests/test_offline_online_store_consistency.py index b6d2e399e0..b124b3be7e 100644 --- a/sdk/python/tests/test_offline_online_store_consistency.py +++ b/sdk/python/tests/test_offline_online_store_consistency.py @@ -173,7 +173,7 @@ def prep_redis_fs_and_fv() -> Iterator[Tuple[FeatureStore, FeatureView]]: config = RepoConfig( registry=str(Path(repo_dir_name) / "registry.db"), project=f"test_bq_correctness_{str(uuid.uuid4()).replace('-', '')}", - provider="redis", + provider="local", online_store=RedisOnlineStoreConfig( redis_type=RedisType.redis, connection_string="localhost:6379,db=0", ), From 2f9e074e92c66ad7544977182327f1d6cd0c08b3 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 10 Jun 2021 09:30:28 -0700 Subject: [PATCH 04/13] tests and lint Signed-off-by: Achal Shah --- sdk/python/feast/infra/online_stores/datastore.py | 1 + sdk/python/feast/infra/online_stores/helpers.py | 8 +++++--- sdk/python/feast/infra/online_stores/online_store.py | 1 + sdk/python/feast/infra/online_stores/redis.py | 12 ++++++------ sdk/python/feast/infra/online_stores/sqlite.py | 3 ++- 5 files changed, 15 insertions(+), 10 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py index 4a6c9c77b8..67e444b40d 100644 --- a/sdk/python/feast/infra/online_stores/datastore.py +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -147,6 +147,7 @@ def online_read( config: RepoConfig, table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: online_config = config.online_store diff --git a/sdk/python/feast/infra/online_stores/helpers.py b/sdk/python/feast/infra/online_stores/helpers.py index 564ff5b049..391794d20e 100644 --- a/sdk/python/feast/infra/online_stores/helpers.py +++ b/sdk/python/feast/infra/online_stores/helpers.py @@ -1,5 +1,5 @@ import struct -from typing import Dict, Set +from typing import Any, Dict, Set import mmh3 @@ -36,7 +36,7 @@ def get_online_store_from_config( raise ValueError(f"Unsupported offline store config '{online_store_config}'") -SUPPORTED_SOURCES = { +SUPPORTED_SOURCES: Dict[Any, Set[Any]] = { SqliteOnlineStoreConfig: {FileSource}, DatastoreOnlineStoreConfig: {BigQuerySource}, RedisOnlineStoreConfig: {FileSource, BigQuerySource}, @@ -46,7 +46,9 @@ def get_online_store_from_config( def assert_online_store_supports_data_source( online_store_config: OnlineStoreConfig, data_source: DataSource ): - supported_sources = SUPPORTED_SOURCES.get(online_store_config.__class__, {}) + supported_sources: Set[Any] = SUPPORTED_SOURCES.get( + online_store_config.__class__, set() + ) # This is needed because checking for `in` with Union types breaks mypy. # https://github.com/python/mypy/issues/4954 # We can replace this with `data_source.__class__ in SUPPORTED_SOURCES[online_store_config.__class__]` diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index 3ef0ef8f85..628f678b19 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -47,5 +47,6 @@ def online_read( config: RepoConfig, table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: pass diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 793496f255..9c73fbb679 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -97,7 +97,7 @@ def online_read( config: RepoConfig, table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], - requested_features: Optional[List[str]] = None + requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: online_store_config = config.online_store assert isinstance(online_store_config, RedisOnlineStoreConfig) @@ -108,17 +108,17 @@ def online_read( result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] - if not entity_keys: - entity_keys = [f.name for f in table.features] + if not requested_features: + requested_features = [f.name for f in table.features] for entity_key in entity_keys: redis_key_bin = _redis_key(project, entity_key) - hset_keys = [_mmh3(f"{feature_view}:{k}") for k in entity_keys] + hset_keys = [_mmh3(f"{feature_view}:{k}") for k in requested_features] ts_key = f"_ts:{feature_view}" hset_keys.append(ts_key) values = client.hmget(redis_key_bin, hset_keys) - entity_keys.append(ts_key) - res_val = dict(zip(entity_keys, values)) + requested_features.append(ts_key) + res_val = dict(zip(requested_features, values)) res_ts = Timestamp() ts_val = res_val.pop(ts_key) diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index f7393a420b..915f4209fb 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -38,7 +38,7 @@ def _get_conn(config: RepoConfig): assert config.online_store.type == "sqlite" if config.repo_path: - db_path = config.repo_path / config.online_store.path + db_path = str(config.repo_path / config.online_store.path) else: db_path = config.online_store.path @@ -107,6 +107,7 @@ def online_read( config: RepoConfig, table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: pass conn = cls._get_conn(config) From 8f55084c11718b9200eeab8f16fab2a182b6f2ff Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 10 Jun 2021 09:31:01 -0700 Subject: [PATCH 05/13] remove import Signed-off-by: Achal Shah --- sdk/python/feast/infra/online_stores/redis.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 9c73fbb679..962e514f0c 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import json -import struct from datetime import datetime from typing import Any, Callable, Dict, List, Optional, Tuple, Union From 56590f16158744fca6c3aaea69edf8093871f8f3 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 10 Jun 2021 14:24:51 -0700 Subject: [PATCH 06/13] add docs, refactor to add a setup and teardown to the online store and invoke it from the providers Signed-off-by: Achal Shah --- sdk/python/feast/infra/gcp.py | 66 ++---------------- sdk/python/feast/infra/local.py | 36 +++------- .../feast/infra/online_stores/datastore.py | 69 ++++++++++++++++++- .../feast/infra/online_stores/online_store.py | 58 ++++++++++++++-- sdk/python/feast/infra/online_stores/redis.py | 20 +++++- .../feast/infra/online_stores/sqlite.py | 48 +++++++++++-- 6 files changed, 201 insertions(+), 96 deletions(-) diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index 30516292d3..ba130c885d 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -47,18 +47,6 @@ def __init__(self, config: RepoConfig): self.offline_store = get_offline_store_from_config(config.offline_store) self.online_store = get_online_store_from_config(config.online_store) - def _initialize_client(self): - try: - return datastore.Client( - project=self._gcp_project_id, namespace=self._namespace - ) - except DefaultCredentialsError as e: - raise FeastProviderLoginError( - str(e) - + '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your ' - "local Google Cloud account " - ) - def update_infra( self, project: str, @@ -68,25 +56,12 @@ def update_infra( entities_to_keep: Sequence[Entity], partial: bool, ): - - client = self._initialize_client() - - for table in tables_to_keep: - key = client.key("Project", project, "Table", table.name) - entity = datastore.Entity( - key=key, exclude_from_indexes=("created_ts", "event_ts", "values") - ) - entity.update({"created_ts": datetime.utcnow()}) - client.put(entity) - - for table in tables_to_delete: - _delete_all_values( - client, client.key("Project", project, "Table", table.name) - ) - - # Delete the table metadata datastore entity - key = client.key("Project", project, "Table", table.name) - client.delete(key) + self.online_store.setup(config=self.repo_config, + tables_to_delete=tables_to_delete, + tables_to_keep=tables_to_keep, + entities_to_keep=entities_to_keep, + entities_to_delete=entities_to_delete, + partial=partial) def teardown_infra( self, @@ -94,16 +69,7 @@ def teardown_infra( tables: Sequence[Union[FeatureTable, FeatureView]], entities: Sequence[Entity], ) -> None: - client = self._initialize_client() - - for table in tables: - _delete_all_values( - client, client.key("Project", project, "Table", table.name) - ) - - # Delete the table metadata datastore entity - key = client.key("Project", project, "Table", table.name) - client.delete(key) + self.online_store.teardown(self.repo_config, tables, entities) def online_write_batch( self, @@ -187,21 +153,3 @@ def get_historical_features( ) return job - -ProtoBatch = Sequence[ - Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] -] - - -def _delete_all_values(client, key) -> None: - """ - Delete all data under the key path in datastore. - """ - while True: - query = client.query(kind="Row", ancestor=key) - entities = list(query.fetch(limit=1000)) - if not entities: - return - - for entity in entities: - client.delete(entity.key) diff --git a/sdk/python/feast/infra/local.py b/sdk/python/feast/infra/local.py index ea2dce4f54..1baf36db42 100644 --- a/sdk/python/feast/infra/local.py +++ b/sdk/python/feast/infra/local.py @@ -23,31 +23,18 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.registry import Registry -from feast.repo_config import RepoConfig, SqliteOnlineStoreConfig +from feast.repo_config import RepoConfig, SqliteOnlineStoreConfig, RedisOnlineStoreConfig class LocalProvider(Provider): - _db_path: Path def __init__(self, config: RepoConfig, repo_path: Path): assert config is not None self.config = config - assert isinstance(config.online_store, SqliteOnlineStoreConfig) assert config.offline_store is not None - local_path = Path(config.online_store.path) - if local_path.is_absolute(): - self._db_path = local_path - else: - self._db_path = repo_path.joinpath(local_path) self.offline_store = get_offline_store_from_config(config.offline_store) self.online_store = get_online_store_from_config(config.online_store) - def _get_conn(self): - Path(self._db_path).parent.mkdir(exist_ok=True) - return sqlite3.connect( - self._db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES - ) - def update_infra( self, project: str, @@ -57,17 +44,12 @@ def update_infra( entities_to_keep: Sequence[Entity], partial: bool, ): - conn = self._get_conn() - for table in tables_to_keep: - conn.execute( - f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key BLOB, feature_name TEXT, value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))" - ) - conn.execute( - f"CREATE INDEX IF NOT EXISTS {_table_id(project, table)}_ek ON {_table_id(project, table)} (entity_key);" - ) - - for table in tables_to_delete: - conn.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}") + self.online_store.setup(self.config, + tables_to_delete, + tables_to_keep, + entities_to_delete, + entities_to_keep, + partial) def teardown_infra( self, @@ -75,7 +57,9 @@ def teardown_infra( tables: Sequence[Union[FeatureTable, FeatureView]], entities: Sequence[Entity], ) -> None: - os.unlink(self._db_path) + self.online_store.teardown(self.config, + tables, + entities) def online_write_batch( self, diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py index 67e444b40d..426a38faa2 100644 --- a/sdk/python/feast/infra/online_stores/datastore.py +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -18,7 +18,7 @@ import mmh3 -from feast import FeatureTable, utils +from feast import FeatureTable, utils, Entity from feast.feature_view import FeatureView from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore @@ -46,6 +46,59 @@ class DatastoreOnlineStore(OnlineStore): features. """ + @classmethod + def setup(cls, + config: RepoConfig, + tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], + tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], partial: bool): + """ + """ + online_config = config.online_store + assert isinstance(online_config, DatastoreOnlineStoreConfig) + client = cls._initialize_client(online_config) + feast_project = config.project + + for table in tables_to_keep: + key = client.key("Project", feast_project, "Table", table.name) + entity = datastore.Entity( + key=key, exclude_from_indexes=("created_ts", "event_ts", "values") + ) + entity.update({"created_ts": datetime.utcnow()}) + client.put(entity) + + for table in tables_to_delete: + _delete_all_values( + client, client.key("Project", feast_project, "Table", table.name) + ) + + # Delete the table metadata datastore entity + key = client.key("Project", feast_project, "Table", table.name) + client.delete(key) + + @classmethod + def teardown(cls, + config: RepoConfig, + tables: Sequence[Union[FeatureTable, FeatureView]], + entities: Sequence[Entity]): + """ + There's currently no teardown done for Datastore. + """ + online_config = config.online_store + assert isinstance(online_config, DatastoreOnlineStoreConfig) + client = cls._initialize_client(online_config) + feast_project = config.project + + for table in tables: + _delete_all_values( + client, client.key("Project", feast_project, "Table", table.name) + ) + + # Delete the table metadata datastore entity + key = client.key("Project", feast_project, "Table", table.name) + client.delete(key) + @classmethod def _initialize_client(cls, online_config: DatastoreOnlineStoreConfig): @@ -183,3 +236,17 @@ def compute_datastore_entity_id(entity_key: EntityKeyProto) -> str: do with the Entity concept we have in Feast. """ return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex() + + +def _delete_all_values(client, key) -> None: + """ + Delete all data under the key path in datastore. + """ + while True: + query = client.query(kind="Row", ancestor=key) + entities = list(query.fetch(limit=1000)) + if not entities: + return + + for entity in entities: + client.delete(entity.key) diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index 628f678b19..bb1a341e1f 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -14,9 +14,9 @@ from abc import ABC, abstractmethod from datetime import datetime -from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Sequence -from feast import FeatureTable +from feast import FeatureTable, Entity from feast.feature_view import FeatureView from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto @@ -39,7 +39,22 @@ def online_write_batch( ], progress: Optional[Callable[[int], Any]], ) -> None: - pass + """ + Write a batch of feature rows to the online store. This is a low level interface, not + expected to be used by the users directly. + + If a tz-naive timestamp is passed to this method, it should be assumed to be UTC by implementors. + + Args: + config: The RepoConfig for the current FeatureStore. + table: Feast FeatureTable or FeatureView + data: a list of quadruplets containing Feature data. Each quadruplet contains an Entity Key, + a dict containing feature values, an event timestamp for the row, and + the created timestamp for the row if it exists. + progress: Optional function to be called once every mini-batch of rows is written to + the online store. Can be used to display progress. + """ + ... @staticmethod @abstractmethod @@ -49,4 +64,39 @@ def online_read( entity_keys: List[EntityKeyProto], requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: - pass + """ + Read feature values given an Entity Key. This is a low level interface, not + expected to be used by the users directly. + + Args: + config: The RepoConfig for the current FeatureStore. + table: Feast FeatureTable or FeatureView + entity_keys: a list of entity keys that should be read from the FeatureStore. + requested_features: (Optional) A subset of the features that should be read from the FeatureStore. + Returns: + Data is returned as a list, one item per entity key. Each item in the list is a tuple + of event_ts for the row, and the feature data as a dict from feature names to values. + Values are returned as Value proto message. + """ + ... + + @staticmethod + @abstractmethod + def setup( + config: RepoConfig, + tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], + tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + ... + + @staticmethod + @abstractmethod + def teardown( + config: RepoConfig, + tables: Sequence[Union[FeatureTable, FeatureView]], + entities: Sequence[Entity], + ): + ... diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 962e514f0c..9b3ac46ebd 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -13,11 +13,11 @@ # limitations under the License. import json from datetime import datetime -from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Sequence from google.protobuf.timestamp_pb2 import Timestamp -from feast import FeatureTable, FeatureView, RepoConfig +from feast import FeatureTable, FeatureView, RepoConfig, Entity from feast.infra.online_stores.helpers import _mmh3, _redis_key from feast.infra.online_stores.online_store import OnlineStore from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -34,6 +34,22 @@ class RedisOnlineStore(OnlineStore): + @staticmethod + def setup(config: RepoConfig, tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], + tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], partial: bool): + """ + There's currently no setup done for Redis. + """ + pass + + @staticmethod + def teardown(config: RepoConfig, tables: Sequence[Union[FeatureTable, FeatureView]], entities: Sequence[Entity]): + """ + There's currently no teardown done for Redis. + """ + pass + @classmethod def _parse_connection_string(cls, connection_string: str): """ diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 915f4209fb..599554ef31 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -11,14 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +import os import sqlite3 from datetime import datetime from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Sequence import pytz -from feast import FeatureTable +from feast import FeatureTable, Entity from feast.feature_view import FeatureView from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore @@ -34,13 +36,18 @@ class SqliteOnlineStore(OnlineStore): """ @staticmethod - def _get_conn(config: RepoConfig): + def _get_db_path(config: RepoConfig) -> str: assert config.online_store.type == "sqlite" - if config.repo_path: + if config.repo_path and not Path(config.online_store.path).is_absolute(): db_path = str(config.repo_path / config.online_store.path) else: db_path = config.online_store.path + return db_path + + @classmethod + def _get_conn(cls, config: RepoConfig): + db_path = cls._get_db_path(config) Path(db_path).parent.mkdir(exist_ok=True) return sqlite3.connect( @@ -138,6 +145,39 @@ def online_read( result.append((res_ts, res)) return result + @classmethod + def setup( + cls, + config: RepoConfig, + tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], + tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + conn = cls._get_conn(config) + project = config.project + + for table in tables_to_keep: + conn.execute( + f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key BLOB, feature_name TEXT, value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))" + ) + conn.execute( + f"CREATE INDEX IF NOT EXISTS {_table_id(project, table)}_ek ON {_table_id(project, table)} (entity_key);" + ) + + for table in tables_to_delete: + conn.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}") + + @classmethod + def teardown( + cls, + config: RepoConfig, + tables: Sequence[Union[FeatureTable, FeatureView]], + entities: Sequence[Entity], + ): + os.unlink(cls._get_db_path(config)) + def _table_id(project: str, table: Union[FeatureTable, FeatureView]) -> str: return f"{project}_{table.name}" From e35e6fe3c50141765630148df4fa34a337bdc965 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 10 Jun 2021 14:29:38 -0700 Subject: [PATCH 07/13] more simplifications Signed-off-by: Achal Shah --- sdk/python/feast/infra/gcp.py | 9 +-------- sdk/python/feast/infra/local.py | 5 ++--- sdk/python/feast/infra/provider.py | 2 +- 3 files changed, 4 insertions(+), 12 deletions(-) diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index ba130c885d..b54e571f7f 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -20,7 +20,7 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.registry import Registry -from feast.repo_config import DatastoreOnlineStoreConfig, RepoConfig +from feast.repo_config import RepoConfig try: from google.auth.exceptions import DefaultCredentialsError @@ -36,13 +36,6 @@ class GcpProvider(Provider): _namespace: Optional[str] def __init__(self, config: RepoConfig): - assert isinstance(config.online_store, DatastoreOnlineStoreConfig) - self._gcp_project_id = config.online_store.project_id - self._namespace = config.online_store.namespace - self._write_concurrency = config.online_store.write_concurrency - self._write_batch_size = config.online_store.write_batch_size - - assert config.offline_store is not None self.repo_config = config self.offline_store = get_offline_store_from_config(config.offline_store) self.online_store = get_online_store_from_config(config.online_store) diff --git a/sdk/python/feast/infra/local.py b/sdk/python/feast/infra/local.py index 1baf36db42..bf1dd828f8 100644 --- a/sdk/python/feast/infra/local.py +++ b/sdk/python/feast/infra/local.py @@ -23,15 +23,14 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.registry import Registry -from feast.repo_config import RepoConfig, SqliteOnlineStoreConfig, RedisOnlineStoreConfig +from feast.repo_config import RepoConfig class LocalProvider(Provider): - def __init__(self, config: RepoConfig, repo_path: Path): + def __init__(self, config: RepoConfig): assert config is not None self.config = config - assert config.offline_store is not None self.offline_store = get_offline_store_from_config(config.offline_store) self.online_store = get_online_store_from_config(config.online_store) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index be51208fb3..5d4f8d6cf0 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -148,7 +148,7 @@ def get_provider(config: RepoConfig, repo_path: Path) -> Provider: elif config.provider == "local": from feast.infra.local import LocalProvider - return LocalProvider(config, repo_path) + return LocalProvider(config) else: raise errors.FeastProviderNotImplementedError(config.provider) else: From cd1d2833df65814193a61be7622af0e550bd3822 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 10 Jun 2021 14:30:10 -0700 Subject: [PATCH 08/13] make format Signed-off-by: Achal Shah --- sdk/python/feast/infra/gcp.py | 15 ++++++----- sdk/python/feast/infra/local.py | 19 +++++++------ .../feast/infra/online_stores/datastore.py | 27 +++++++++++-------- .../feast/infra/online_stores/online_store.py | 4 +-- sdk/python/feast/infra/online_stores/redis.py | 21 ++++++++++----- .../feast/infra/online_stores/sqlite.py | 4 +-- 6 files changed, 52 insertions(+), 38 deletions(-) diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index b54e571f7f..1ae6165cf9 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -49,12 +49,14 @@ def update_infra( entities_to_keep: Sequence[Entity], partial: bool, ): - self.online_store.setup(config=self.repo_config, - tables_to_delete=tables_to_delete, - tables_to_keep=tables_to_keep, - entities_to_keep=entities_to_keep, - entities_to_delete=entities_to_delete, - partial=partial) + self.online_store.setup( + config=self.repo_config, + tables_to_delete=tables_to_delete, + tables_to_keep=tables_to_keep, + entities_to_keep=entities_to_keep, + entities_to_delete=entities_to_delete, + partial=partial, + ) def teardown_infra( self, @@ -145,4 +147,3 @@ def get_historical_features( project=project, ) return job - diff --git a/sdk/python/feast/infra/local.py b/sdk/python/feast/infra/local.py index bf1dd828f8..8d23b1cfa8 100644 --- a/sdk/python/feast/infra/local.py +++ b/sdk/python/feast/infra/local.py @@ -27,7 +27,6 @@ class LocalProvider(Provider): - def __init__(self, config: RepoConfig): assert config is not None self.config = config @@ -43,12 +42,14 @@ def update_infra( entities_to_keep: Sequence[Entity], partial: bool, ): - self.online_store.setup(self.config, - tables_to_delete, - tables_to_keep, - entities_to_delete, - entities_to_keep, - partial) + self.online_store.setup( + self.config, + tables_to_delete, + tables_to_keep, + entities_to_delete, + entities_to_keep, + partial, + ) def teardown_infra( self, @@ -56,9 +57,7 @@ def teardown_infra( tables: Sequence[Union[FeatureTable, FeatureView]], entities: Sequence[Entity], ) -> None: - self.online_store.teardown(self.config, - tables, - entities) + self.online_store.teardown(self.config, tables, entities) def online_write_batch( self, diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py index 426a38faa2..d69adff193 100644 --- a/sdk/python/feast/infra/online_stores/datastore.py +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -18,7 +18,7 @@ import mmh3 -from feast import FeatureTable, utils, Entity +from feast import Entity, FeatureTable, utils from feast.feature_view import FeatureView from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore @@ -47,12 +47,15 @@ class DatastoreOnlineStore(OnlineStore): """ @classmethod - def setup(cls, - config: RepoConfig, - tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], - tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], - entities_to_delete: Sequence[Entity], - entities_to_keep: Sequence[Entity], partial: bool): + def setup( + cls, + config: RepoConfig, + tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], + tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): """ """ online_config = config.online_store @@ -78,10 +81,12 @@ def setup(cls, client.delete(key) @classmethod - def teardown(cls, - config: RepoConfig, - tables: Sequence[Union[FeatureTable, FeatureView]], - entities: Sequence[Entity]): + def teardown( + cls, + config: RepoConfig, + tables: Sequence[Union[FeatureTable, FeatureView]], + entities: Sequence[Entity], + ): """ There's currently no teardown done for Datastore. """ diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index bb1a341e1f..5bfcf956a2 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -14,9 +14,9 @@ from abc import ABC, abstractmethod from datetime import datetime -from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Sequence +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union -from feast import FeatureTable, Entity +from feast import Entity, FeatureTable from feast.feature_view import FeatureView from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 9b3ac46ebd..209328bcb5 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -13,11 +13,11 @@ # limitations under the License. import json from datetime import datetime -from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Sequence +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union from google.protobuf.timestamp_pb2 import Timestamp -from feast import FeatureTable, FeatureView, RepoConfig, Entity +from feast import Entity, FeatureTable, FeatureView, RepoConfig from feast.infra.online_stores.helpers import _mmh3, _redis_key from feast.infra.online_stores.online_store import OnlineStore from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -35,16 +35,25 @@ class RedisOnlineStore(OnlineStore): @staticmethod - def setup(config: RepoConfig, tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], - tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], entities_to_delete: Sequence[Entity], - entities_to_keep: Sequence[Entity], partial: bool): + def setup( + config: RepoConfig, + tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], + tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): """ There's currently no setup done for Redis. """ pass @staticmethod - def teardown(config: RepoConfig, tables: Sequence[Union[FeatureTable, FeatureView]], entities: Sequence[Entity]): + def teardown( + config: RepoConfig, + tables: Sequence[Union[FeatureTable, FeatureView]], + entities: Sequence[Entity], + ): """ There's currently no teardown done for Redis. """ diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 599554ef31..b314f88663 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -16,11 +16,11 @@ import sqlite3 from datetime import datetime from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Sequence +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union import pytz -from feast import FeatureTable, Entity +from feast import Entity, FeatureTable from feast.feature_view import FeatureView from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore From 3146cac1c47a5c22258890b42da4f1aa01794f19 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 10 Jun 2021 14:42:12 -0700 Subject: [PATCH 09/13] make lint Signed-off-by: Achal Shah --- sdk/python/feast/infra/gcp.py | 9 --------- sdk/python/feast/infra/local.py | 3 --- 2 files changed, 12 deletions(-) diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index 1ae6165cf9..c1f4ed8b97 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -6,7 +6,6 @@ from feast import FeatureTable from feast.entity import Entity -from feast.errors import FeastProviderLoginError from feast.feature_view import FeatureView from feast.infra.offline_stores.helpers import get_offline_store_from_config from feast.infra.online_stores.helpers import get_online_store_from_config @@ -22,14 +21,6 @@ from feast.registry import Registry from feast.repo_config import RepoConfig -try: - from google.auth.exceptions import DefaultCredentialsError - from google.cloud import datastore -except ImportError as e: - from feast.errors import FeastExtrasDependencyImportError - - raise FeastExtrasDependencyImportError("gcp", str(e)) - class GcpProvider(Provider): _gcp_project_id: Optional[str] diff --git a/sdk/python/feast/infra/local.py b/sdk/python/feast/infra/local.py index 8d23b1cfa8..7be5e0fc0e 100644 --- a/sdk/python/feast/infra/local.py +++ b/sdk/python/feast/infra/local.py @@ -1,7 +1,4 @@ -import os -import sqlite3 from datetime import datetime -from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union import pandas as pd From 871e2685478c5f3cfe2c7de98b7e92ccb1bb424a Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 10 Jun 2021 16:48:29 -0700 Subject: [PATCH 10/13] Add redis write path Signed-off-by: Achal Shah --- sdk/python/feast/infra/online_stores/redis.py | 33 +++++++++++++++++-- sdk/python/feast/repo_config.py | 2 -- sdk/python/tests/test_cli_redis.py | 2 +- .../test_offline_online_store_consistency.py | 4 ++- 4 files changed, 35 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 209328bcb5..0cbacefaaf 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -17,7 +17,7 @@ from google.protobuf.timestamp_pb2 import Timestamp -from feast import Entity, FeatureTable, FeatureView, RepoConfig +from feast import Entity, FeatureTable, FeatureView, RepoConfig, utils from feast.infra.online_stores.helpers import _mmh3, _redis_key from feast.infra.online_stores.online_store import OnlineStore from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -32,6 +32,8 @@ raise FeastExtrasDependencyImportError("redis", str(e)) +EX_SECONDS = 253402300799 + class RedisOnlineStore(OnlineStore): @staticmethod @@ -95,6 +97,7 @@ def _get_client(cls, online_store_config: RedisOnlineStoreConfig): startup_nodes, kwargs = cls._parse_connection_string( online_store_config.connection_string ) + print(f"Startup nodes: {startup_nodes}, {kwargs}") if online_store_config.type == RedisType.redis_cluster: kwargs["startup_nodes"] = startup_nodes return RedisCluster(**kwargs) @@ -113,7 +116,33 @@ def online_write_batch( ], progress: Optional[Callable[[int], Any]], ) -> None: - pass + online_store_config = config.online_store + assert isinstance(online_store_config, RedisOnlineStoreConfig) + + client = cls._get_client(online_store_config) + project = config.project + + entity_hset = {} + feature_view = table.name + + ex = Timestamp() + ex.seconds = EX_SECONDS + ex_str = ex.SerializeToString() + + for entity_key, values, timestamp, created_ts in data: + redis_key_bin = _redis_key(project, entity_key) + ts = Timestamp() + ts.seconds = int(utils.make_tzaware(timestamp).timestamp()) + entity_hset[f"_ts:{feature_view}"] = ts.SerializeToString() + entity_hset[f"_ex:{feature_view}"] = ex_str + + for feature_name, val in values.items(): + f_key = _mmh3(f"{feature_view}:{feature_name}") + entity_hset[f_key] = val.SerializeToString() + + client.hset(redis_key_bin, mapping=entity_hset) + if progress: + progress(1) @classmethod def online_read( diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 7fd2e87d01..2bf440977d 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -165,8 +165,6 @@ def _validate_online_store_config(cls, values): values["online_store"]["type"] = "sqlite" elif values["provider"] == "gcp": values["online_store"]["type"] = "datastore" - elif values["provider"] == "redis": - values["online_store"]["type"] = "redis" online_store_type = values["online_store"]["type"] diff --git a/sdk/python/tests/test_cli_redis.py b/sdk/python/tests/test_cli_redis.py index b51ab6878e..e948bffd25 100644 --- a/sdk/python/tests/test_cli_redis.py +++ b/sdk/python/tests/test_cli_redis.py @@ -33,7 +33,7 @@ def test_basic() -> None: offline_store: type: bigquery online_store: - redis_type: redis + type: redis connection_string: localhost:6379,db=0 """ ) diff --git a/sdk/python/tests/test_offline_online_store_consistency.py b/sdk/python/tests/test_offline_online_store_consistency.py index b124b3be7e..02943fd2eb 100644 --- a/sdk/python/tests/test_offline_online_store_consistency.py +++ b/sdk/python/tests/test_offline_online_store_consistency.py @@ -175,7 +175,9 @@ def prep_redis_fs_and_fv() -> Iterator[Tuple[FeatureStore, FeatureView]]: project=f"test_bq_correctness_{str(uuid.uuid4()).replace('-', '')}", provider="local", online_store=RedisOnlineStoreConfig( - redis_type=RedisType.redis, connection_string="localhost:6379,db=0", + type="redis", + redis_type=RedisType.redis, + connection_string="localhost:6379,db=0", ), ) fs = FeatureStore(config=config) From 6aba5007a047a42338d73d65247ca9843a6953be Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 10 Jun 2021 17:15:56 -0700 Subject: [PATCH 11/13] Make instance methods instead of classmethods, and cache clients/connections Signed-off-by: Achal Shah --- .../feast/infra/online_stores/datastore.py | 50 +++++++++---------- sdk/python/feast/infra/online_stores/redis.py | 49 +++++++++--------- .../feast/infra/online_stores/sqlite.py | 38 +++++++------- 3 files changed, 67 insertions(+), 70 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py index d69adff193..d2d5e1f6ad 100644 --- a/sdk/python/feast/infra/online_stores/datastore.py +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -45,10 +45,10 @@ class DatastoreOnlineStore(OnlineStore): OnlineStore is an object used for all interaction between Feast and the service used for offline storage of features. """ + _client: Optional[datastore.Client] = None - @classmethod def setup( - cls, + self, config: RepoConfig, tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], @@ -60,7 +60,7 @@ def setup( """ online_config = config.online_store assert isinstance(online_config, DatastoreOnlineStoreConfig) - client = cls._initialize_client(online_config) + client = self._get_client(online_config) feast_project = config.project for table in tables_to_keep: @@ -80,9 +80,8 @@ def setup( key = client.key("Project", feast_project, "Table", table.name) client.delete(key) - @classmethod def teardown( - cls, + self, config: RepoConfig, tables: Sequence[Union[FeatureTable, FeatureView]], entities: Sequence[Entity], @@ -92,7 +91,7 @@ def teardown( """ online_config = config.online_store assert isinstance(online_config, DatastoreOnlineStoreConfig) - client = cls._initialize_client(online_config) + client = self._get_client(online_config) feast_project = config.project for table in tables: @@ -104,23 +103,23 @@ def teardown( key = client.key("Project", feast_project, "Table", table.name) client.delete(key) - @classmethod - def _initialize_client(cls, online_config: DatastoreOnlineStoreConfig): + def _get_client(self, online_config: DatastoreOnlineStoreConfig): - try: - return datastore.Client( - project=online_config.project_id, namespace=online_config.namespace, - ) - except DefaultCredentialsError as e: - raise FeastProviderLoginError( - str(e) - + '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your ' - "local Google Cloud account " - ) + if not self._client: + try: + self._client = datastore.Client( + project=online_config.project_id, namespace=online_config.namespace, + ) + except DefaultCredentialsError as e: + raise FeastProviderLoginError( + str(e) + + '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your ' + "local Google Cloud account " + ) + return self._client - @classmethod def online_write_batch( - cls, + self, config: RepoConfig, table: Union[FeatureTable, FeatureView], data: List[ @@ -131,7 +130,7 @@ def online_write_batch( online_config = config.online_store assert isinstance(online_config, DatastoreOnlineStoreConfig) - client = cls._initialize_client(online_config) + client = self._get_client(online_config) write_concurrency = online_config.write_concurrency write_batch_size = online_config.write_batch_size @@ -139,8 +138,8 @@ def online_write_batch( pool = ThreadPool(processes=write_concurrency) pool.map( - lambda b: cls._write_minibatch(client, feast_project, table, b, progress), - cls._to_minibatches(data, batch_size=write_batch_size), + lambda b: self._write_minibatch(client, feast_project, table, b, progress), + self._to_minibatches(data, batch_size=write_batch_size), ) @staticmethod @@ -199,9 +198,8 @@ def _write_minibatch( if progress: progress(len(entities)) - @classmethod def online_read( - cls, + self, config: RepoConfig, table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], @@ -210,7 +208,7 @@ def online_read( online_config = config.online_store assert isinstance(online_config, DatastoreOnlineStoreConfig) - client = cls._initialize_client(online_config) + client = self._get_client(online_config) feast_project = config.project diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 0cbacefaaf..e2e73d0c55 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -36,8 +36,10 @@ class RedisOnlineStore(OnlineStore): - @staticmethod + _client: Optional[Union[Redis, RedisCluster]] = None + def setup( + self, config: RepoConfig, tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], @@ -50,8 +52,8 @@ def setup( """ pass - @staticmethod def teardown( + self, config: RepoConfig, tables: Sequence[Union[FeatureTable, FeatureView]], entities: Sequence[Entity], @@ -61,8 +63,8 @@ def teardown( """ pass - @classmethod - def _parse_connection_string(cls, connection_string: str): + @staticmethod + def _parse_connection_string(connection_string: str): """ Reads Redis connections string using format for RedisCluster: @@ -89,26 +91,26 @@ def _parse_connection_string(cls, connection_string: str): return startup_nodes, params - @classmethod - def _get_client(cls, online_store_config: RedisOnlineStoreConfig): + def _get_client(self, online_store_config: RedisOnlineStoreConfig): """ Creates the Redis client RedisCluster or Redis depending on configuration """ - startup_nodes, kwargs = cls._parse_connection_string( - online_store_config.connection_string - ) - print(f"Startup nodes: {startup_nodes}, {kwargs}") - if online_store_config.type == RedisType.redis_cluster: - kwargs["startup_nodes"] = startup_nodes - return RedisCluster(**kwargs) - else: - kwargs["host"] = startup_nodes[0]["host"] - kwargs["port"] = startup_nodes[0]["port"] - return Redis(**kwargs) - - @classmethod + if not self._client: + startup_nodes, kwargs = self._parse_connection_string( + online_store_config.connection_string + ) + print(f"Startup nodes: {startup_nodes}, {kwargs}") + if online_store_config.type == RedisType.redis_cluster: + kwargs["startup_nodes"] = startup_nodes + self._client = RedisCluster(**kwargs) + else: + kwargs["host"] = startup_nodes[0]["host"] + kwargs["port"] = startup_nodes[0]["port"] + self._client = Redis(**kwargs) + return self._client + def online_write_batch( - cls, + self, config: RepoConfig, table: Union[FeatureTable, FeatureView], data: List[ @@ -119,7 +121,7 @@ def online_write_batch( online_store_config = config.online_store assert isinstance(online_store_config, RedisOnlineStoreConfig) - client = cls._get_client(online_store_config) + client = self._get_client(online_store_config) project = config.project entity_hset = {} @@ -144,9 +146,8 @@ def online_write_batch( if progress: progress(1) - @classmethod def online_read( - cls, + self, config: RepoConfig, table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], @@ -155,7 +156,7 @@ def online_read( online_store_config = config.online_store assert isinstance(online_store_config, RedisOnlineStoreConfig) - client = cls._get_client(online_store_config) + client = self._get_client(online_store_config) feature_view = table.name project = config.project diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index b314f88663..d5905e04f3 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -35,6 +35,8 @@ class SqliteOnlineStore(OnlineStore): features. """ + _conn: Optional[sqlite3.Connection] = None + @staticmethod def _get_db_path(config: RepoConfig) -> str: assert config.online_store.type == "sqlite" @@ -45,18 +47,17 @@ def _get_db_path(config: RepoConfig) -> str: db_path = config.online_store.path return db_path - @classmethod - def _get_conn(cls, config: RepoConfig): - db_path = cls._get_db_path(config) - - Path(db_path).parent.mkdir(exist_ok=True) - return sqlite3.connect( - db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES, - ) + def _get_conn(self, config: RepoConfig): + if not self._conn: + db_path = self._get_db_path(config) + Path(db_path).parent.mkdir(exist_ok=True) + self._conn = sqlite3.connect( + db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES, + ) + return self._conn - @classmethod def online_write_batch( - cls, + self, config: RepoConfig, table: Union[FeatureTable, FeatureView], data: List[ @@ -64,7 +65,7 @@ def online_write_batch( ], progress: Optional[Callable[[int], Any]], ) -> None: - conn = cls._get_conn(config) + conn = self._get_conn(config) project = config.project @@ -108,16 +109,15 @@ def online_write_batch( if progress: progress(1) - @classmethod def online_read( - cls, + self, config: RepoConfig, table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: pass - conn = cls._get_conn(config) + conn = self._get_conn(config) cur = conn.cursor() result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] @@ -145,9 +145,8 @@ def online_read( result.append((res_ts, res)) return result - @classmethod def setup( - cls, + self, config: RepoConfig, tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], @@ -155,7 +154,7 @@ def setup( entities_to_keep: Sequence[Entity], partial: bool, ): - conn = cls._get_conn(config) + conn = self._get_conn(config) project = config.project for table in tables_to_keep: @@ -169,14 +168,13 @@ def setup( for table in tables_to_delete: conn.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}") - @classmethod def teardown( - cls, + self, config: RepoConfig, tables: Sequence[Union[FeatureTable, FeatureView]], entities: Sequence[Entity], ): - os.unlink(cls._get_db_path(config)) + os.unlink(self._get_db_path(config)) def _table_id(project: str, table: Union[FeatureTable, FeatureView]) -> str: From 6367767b94b2f609348e00a79002f4b8bc455111 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 10 Jun 2021 17:17:17 -0700 Subject: [PATCH 12/13] make format Signed-off-by: Achal Shah --- sdk/python/feast/infra/online_stores/datastore.py | 1 + sdk/python/feast/infra/online_stores/online_store.py | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py index d2d5e1f6ad..65404f5973 100644 --- a/sdk/python/feast/infra/online_stores/datastore.py +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -45,6 +45,7 @@ class DatastoreOnlineStore(OnlineStore): OnlineStore is an object used for all interaction between Feast and the service used for offline storage of features. """ + _client: Optional[datastore.Client] = None def setup( diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index 5bfcf956a2..fc2a4e38f4 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -29,9 +29,9 @@ class OnlineStore(ABC): features. """ - @staticmethod @abstractmethod def online_write_batch( + self, config: RepoConfig, table: Union[FeatureTable, FeatureView], data: List[ @@ -56,9 +56,9 @@ def online_write_batch( """ ... - @staticmethod @abstractmethod def online_read( + self, config: RepoConfig, table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], @@ -80,9 +80,9 @@ def online_read( """ ... - @staticmethod @abstractmethod def setup( + self, config: RepoConfig, tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], @@ -92,9 +92,9 @@ def setup( ): ... - @staticmethod @abstractmethod def teardown( + self, config: RepoConfig, tables: Sequence[Union[FeatureTable, FeatureView]], entities: Sequence[Entity], From 9d9b12b9bd77873097519f35552ced660ca28a1e Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Fri, 11 Jun 2021 13:18:33 -0700 Subject: [PATCH 13/13] Use update instead of setup Signed-off-by: Achal Shah --- sdk/python/feast/infra/gcp.py | 2 +- sdk/python/feast/infra/local.py | 2 +- sdk/python/feast/infra/online_stores/datastore.py | 2 +- sdk/python/feast/infra/online_stores/online_store.py | 2 +- sdk/python/feast/infra/online_stores/redis.py | 2 +- sdk/python/feast/infra/online_stores/sqlite.py | 2 +- sdk/python/feast/type_map.py | 4 ++-- 7 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index c1f4ed8b97..f33b501d62 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -40,7 +40,7 @@ def update_infra( entities_to_keep: Sequence[Entity], partial: bool, ): - self.online_store.setup( + self.online_store.update( config=self.repo_config, tables_to_delete=tables_to_delete, tables_to_keep=tables_to_keep, diff --git a/sdk/python/feast/infra/local.py b/sdk/python/feast/infra/local.py index 7be5e0fc0e..a76f49b2c4 100644 --- a/sdk/python/feast/infra/local.py +++ b/sdk/python/feast/infra/local.py @@ -39,7 +39,7 @@ def update_infra( entities_to_keep: Sequence[Entity], partial: bool, ): - self.online_store.setup( + self.online_store.update( self.config, tables_to_delete, tables_to_keep, diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py index 65404f5973..1f2c5abac4 100644 --- a/sdk/python/feast/infra/online_stores/datastore.py +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -48,7 +48,7 @@ class DatastoreOnlineStore(OnlineStore): _client: Optional[datastore.Client] = None - def setup( + def update( self, config: RepoConfig, tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index fc2a4e38f4..8050d07f00 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -81,7 +81,7 @@ def online_read( ... @abstractmethod - def setup( + def update( self, config: RepoConfig, tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index e2e73d0c55..abab030dce 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -38,7 +38,7 @@ class RedisOnlineStore(OnlineStore): _client: Optional[Union[Redis, RedisCluster]] = None - def setup( + def update( self, config: RepoConfig, tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index d5905e04f3..9c2e5cd251 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -145,7 +145,7 @@ def online_read( result.append((res_ts, res)) return result - def setup( + def update( self, config: RepoConfig, tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 0da7b57b6a..576a0b7f35 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -212,7 +212,7 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue: float_list_val=FloatList( val=[ item - if type(item) in [np.float32, np.float64] + if type(item) in [np.float32, np.float64, float] else _type_err(item, np.float32) for item in value ] @@ -224,7 +224,7 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue: double_list_val=DoubleList( val=[ item - if type(item) in [np.float64, np.float32] + if type(item) in [np.float64, np.float32, float] else _type_err(item, np.float64) for item in value ]