From 25eabd0ce2f661e56411bd9d39e82e9bfc64c78a Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Sun, 18 Apr 2021 14:08:31 -0700 Subject: [PATCH 01/11] Add logging to apply method Signed-off-by: Willem Pienaar --- sdk/python/feast/repo_operations.py | 32 ++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index ee1d4a9255..d463e547d3 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -7,6 +7,8 @@ from pathlib import Path from typing import List, NamedTuple, Union +import click + from feast import Entity, FeatureTable from feast.feature_view import FeatureView from feast.infra.provider import get_provider @@ -39,8 +41,6 @@ def parse_repo(repo_root: Path) -> ParsedRepo: for repo_file in repo_files: module_path = py_path_to_module(repo_file, repo_root) - - print(f"Processing {repo_file} as {module_path}") module = importlib.import_module(module_path) for attr_name in dir(module): @@ -55,6 +55,8 @@ def parse_repo(repo_root: Path) -> ParsedRepo: def apply_total(repo_config: RepoConfig, repo_path: Path): + from colorama import Fore, Style + os.chdir(repo_path) sys.path.append("") registry_config = repo_config.get_registry_config() @@ -66,9 +68,11 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): sys.dont_write_bytecode = True repo = parse_repo(repo_path) sys.dont_write_bytecode = False - for entity in repo.entities: registry.apply_entity(entity, project=project) + click.echo( + f"Registered entity {Style.BRIGHT + Fore.GREEN}{entity.name}{Style.RESET_ALL}" + ) repo_table_names = set(t.name for t in repo.feature_tables) @@ -88,18 +92,30 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): # Delete tables that should not exist for registry_table in tables_to_delete: registry.delete_feature_table(registry_table.name, project=project) + click.echo( + f"Deleted feature table {Style.BRIGHT + Fore.GREEN}{registry_table.name}{Style.RESET_ALL} from registry" + ) # Create tables that should for table in repo.feature_tables: registry.apply_feature_table(table, project) + click.echo( + f"Registered feature table {Style.BRIGHT + Fore.GREEN}{registry_table.name}{Style.RESET_ALL}" + ) # Delete views that should not exist for registry_view in views_to_delete: registry.delete_feature_view(registry_view.name, project=project) + click.echo( + f"Deleted feature view {Style.BRIGHT + Fore.GREEN}{registry_view.name}{Style.RESET_ALL} from registry" + ) # Create views that should for view in repo.feature_views: registry.apply_feature_view(view, project) + click.echo( + f"Registered feature view {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL}" + ) infra_provider = get_provider(repo_config) @@ -111,6 +127,14 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): all_to_keep.extend(repo.feature_tables) all_to_keep.extend(repo.feature_views) + for name in [view.name for view in repo.feature_tables] + [table.name for table in repo.feature_views]: + click.echo( + f"Deploying infrastructure for {Style.BRIGHT + Fore.GREEN}{name}{Style.RESET_ALL}" + ) + for name in [view.name for view in views_to_delete] + [table.name for table in tables_to_delete]: + click.echo( + f"Removing infrastructure for {Style.BRIGHT + Fore.GREEN}{name}{Style.RESET_ALL}" + ) infra_provider.update_infra( project, tables_to_delete=all_to_delete, @@ -118,8 +142,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): partial=False, ) - print("Done!") - def teardown(repo_config: RepoConfig, repo_path: Path): registry_config = repo_config.get_registry_config() From 5f70cff5918196dba37bd8a2e45d0df413031337 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Sun, 18 Apr 2021 14:21:57 -0700 Subject: [PATCH 02/11] Linting changes Signed-off-by: Willem Pienaar --- sdk/python/feast/repo_operations.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index d463e547d3..74ee038814 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -127,11 +127,15 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): all_to_keep.extend(repo.feature_tables) all_to_keep.extend(repo.feature_views) - for name in [view.name for view in repo.feature_tables] + [table.name for table in repo.feature_views]: + for name in [view.name for view in repo.feature_tables] + [ + table.name for table in repo.feature_views + ]: click.echo( f"Deploying infrastructure for {Style.BRIGHT + Fore.GREEN}{name}{Style.RESET_ALL}" ) - for name in [view.name for view in views_to_delete] + [table.name for table in tables_to_delete]: + for name in [view.name for view in views_to_delete] + [ + table.name for table in tables_to_delete + ]: click.echo( f"Removing infrastructure for {Style.BRIGHT + Fore.GREEN}{name}{Style.RESET_ALL}" ) From 1204f6e0e8582fccdc6cbda7c05d42e64020ed68 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Sun, 18 Apr 2021 14:40:39 -0700 Subject: [PATCH 03/11] Add error message when missing authentication for GCP provider Signed-off-by: Willem Pienaar --- sdk/python/feast/cli.py | 8 +++++--- sdk/python/feast/errors.py | 4 ++++ sdk/python/feast/infra/gcp.py | 17 +++++++++++++---- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index c22fd7da2d..e5c988006d 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -21,7 +21,7 @@ import pkg_resources import yaml -from feast.errors import FeastObjectNotFoundException +from feast.errors import FeastObjectNotFoundException, FeastProviderLoginError from feast.feature_store import FeatureStore from feast.repo_config import load_repo_config from feast.repo_operations import ( @@ -156,8 +156,10 @@ def apply_total_command(): """ cli_check_repo(Path.cwd()) repo_config = load_repo_config(Path.cwd()) - - apply_total(repo_config, Path.cwd()) + try: + apply_total(repo_config, Path.cwd()) + except FeastProviderLoginError as e: + print(str(e)) @cli.command("teardown") diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 0f659bb3d2..8e7b1c042e 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -15,3 +15,7 @@ def __init__(self, project, name): class FeatureTableNotFoundException(FeastObjectNotFoundException): def __init__(self, project, name): super().__init__(f"Feature table {name} does not exist in project {project}") + + +class FeastProviderLoginError(Exception): + """Error class that indicates a user has not authenticated with their provider.""" diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index 781de297e5..a88dad2a29 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -6,8 +6,10 @@ import mmh3 import pandas import pyarrow +from google.auth.exceptions import DefaultCredentialsError from feast import FeatureTable, utils +from feast.errors import FeastProviderLoginError from feast.feature_view import FeatureView from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.offline_stores.helpers import get_offline_store_from_sources @@ -37,10 +39,17 @@ def __init__(self, config: RepoConfig): def _initialize_client(self): from google.cloud import datastore - if self._gcp_project_id is not None: - return datastore.Client(self._gcp_project_id) - else: - return datastore.Client() + try: + if self._gcp_project_id is not None: + return datastore.Client(self._gcp_project_id) + else: + return datastore.Client() + except DefaultCredentialsError as e: + assert isinstance(e, DefaultCredentialsError) + raise FeastProviderLoginError( + str(e) + + '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your local Google Cloud account' + ) def update_infra( self, From 2996fb1c111c2d543a3a9ba5bf96e074100d2e8d Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Sun, 18 Apr 2021 15:15:41 -0700 Subject: [PATCH 04/11] Add proper error message for missing feature views in historical retrieval Signed-off-by: Willem Pienaar --- sdk/python/feast/errors.py | 23 +++++++++++++++++------ sdk/python/feast/feature_store.py | 12 ++++++++++-- sdk/python/feast/registry.py | 4 ++-- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 8e7b1c042e..a09a6899c1 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -3,18 +3,29 @@ class FeastObjectNotFoundException(Exception): class EntityNotFoundException(FeastObjectNotFoundException): - def __init__(self, project, name): - super().__init__(f"Entity {name} does not exist in project {project}") + def __init__(self, name, project=None): + if project: + super().__init__(f"Entity {name} does not exist in project {project}") + else: + super().__init__(f"Entity {name} does not exist") class FeatureViewNotFoundException(FeastObjectNotFoundException): - def __init__(self, project, name): - super().__init__(f"Feature view {name} does not exist in project {project}") + def __init__(self, name, project=None): + if project: + super().__init__(f"Feature view {name} does not exist in project {project}") + else: + super().__init__(f"Feature view {name} does not exist") class FeatureTableNotFoundException(FeastObjectNotFoundException): - def __init__(self, project, name): - super().__init__(f"Feature table {name} does not exist in project {project}") + def __init__(self, name, project=None): + if project: + super().__init__( + f"Feature table {name} does not exist in project {project}" + ) + else: + super().__init__(f"Feature table {name} does not exist") class FeastProviderLoginError(Exception): diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 5db0d71a19..9e3ed53052 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import sys from collections import OrderedDict, defaultdict from datetime import datetime, timedelta from pathlib import Path @@ -21,6 +22,7 @@ from feast import utils from feast.entity import Entity +from feast.errors import FeatureViewNotFoundException from feast.feature_view import FeatureView from feast.infra.provider import Provider, RetrievalJob, get_provider from feast.online_response import OnlineResponse, _infer_online_entity_rows @@ -271,7 +273,13 @@ def get_historical_features( all_feature_views = self._registry.list_feature_views( project=self.config.project ) - feature_views = _get_requested_feature_views(feature_refs, all_feature_views) + try: + feature_views = _get_requested_feature_views( + feature_refs, all_feature_views + ) + except FeatureViewNotFoundException as e: + sys.exit(e) + provider = self._get_provider() job = provider.get_historical_features( self.config, @@ -529,7 +537,7 @@ def _group_refs( for ref in feature_refs: view_name, feat_name = ref.split(":") if view_name not in view_index: - raise ValueError(f"Could not find feature view from reference {ref}") + raise FeatureViewNotFoundException(view_name) views_features[view_name].append(feat_name) result = [] diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 61259b2407..e1b151da1d 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -264,7 +264,7 @@ def get_feature_view(self, name: str, project: str) -> FeatureView: and feature_view_proto.spec.project == project ): return FeatureView.from_proto(feature_view_proto) - raise FeatureViewNotFoundException(project, name) + raise FeatureViewNotFoundException(name, project) def delete_feature_table(self, name: str, project: str): """ @@ -309,7 +309,7 @@ def updater(registry_proto: RegistryProto): ): del registry_proto.feature_views[idx] return registry_proto - raise FeatureViewNotFoundException(project, name) + raise FeatureViewNotFoundException(name, project) self._registry_store.update_registry_proto(updater) From 6d44edce9aa2b9775d90ee61537969bb8c29d7e3 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Sun, 18 Apr 2021 15:27:37 -0700 Subject: [PATCH 05/11] Add proper error message for credential error for bigquery client Signed-off-by: Willem Pienaar --- sdk/python/feast/feature_store.py | 22 ++++++----- .../feast/infra/offline_stores/bigquery.py | 39 ++++++++++++++----- sdk/python/tests/test_online_retrieval.py | 3 +- 3 files changed, 44 insertions(+), 20 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 9e3ed53052..a4c7773bd6 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -22,7 +22,7 @@ from feast import utils from feast.entity import Entity -from feast.errors import FeatureViewNotFoundException +from feast.errors import FeatureViewNotFoundException, FeastProviderLoginError from feast.feature_view import FeatureView from feast.infra.provider import Provider, RetrievalJob, get_provider from feast.online_response import OnlineResponse, _infer_online_entity_rows @@ -281,14 +281,18 @@ def get_historical_features( sys.exit(e) provider = self._get_provider() - job = provider.get_historical_features( - self.config, - feature_views, - feature_refs, - entity_df, - self._registry, - self.project, - ) + try: + job = provider.get_historical_features( + self.config, + feature_views, + feature_refs, + entity_df, + self._registry, + self.project, + ) + except FeastProviderLoginError as e: + sys.exit(e) + return job def materialize_incremental( diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 53dbe03368..6c3ec279b5 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -5,10 +5,12 @@ import pandas import pyarrow +from google.auth.exceptions import DefaultCredentialsError from google.cloud import bigquery from jinja2 import BaseLoader, Environment from feast.data_source import BigQuerySource, DataSource +from feast.errors import FeastProviderLoginError from feast.feature_view import FeatureView from feast.infra.offline_stores.offline_store import OfflineStore from feast.infra.provider import ( @@ -59,12 +61,25 @@ def pull_latest_from_table_or_query( @staticmethod def _pull_query(query: str) -> pyarrow.Table: - from google.cloud import bigquery - - client = bigquery.Client() + client = BigQueryOfflineStore._get_bigquery_client() query_job = client.query(query) return query_job.to_arrow() + @staticmethod + def _get_bigquery_client(): + try: + from google.cloud import bigquery + + client = bigquery.Client() + except DefaultCredentialsError as e: + raise FeastProviderLoginError( + str(e) + + '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your ' + "local Google Cloud account" + ) + + return client + @staticmethod def get_historical_features( config: RepoConfig, @@ -76,6 +91,8 @@ def get_historical_features( ) -> RetrievalJob: # TODO: Add entity_df validation in order to fail before interacting with BigQuery + client = BigQueryOfflineStore._get_bigquery_client() + if type(entity_df) is str: entity_df_sql_table = f"({entity_df})" elif isinstance(entity_df, pandas.DataFrame): @@ -83,7 +100,9 @@ def get_historical_features( raise ValueError( "Please provide an entity_df with a column named event_timestamp representing the time of events." ) - table_id = _upload_entity_df_into_bigquery(config.project, entity_df) + table_id = _upload_entity_df_into_bigquery( + config.project, entity_df, client + ) entity_df_sql_table = f"`{table_id}`" else: raise ValueError( @@ -104,18 +123,19 @@ def get_historical_features( max_timestamp=datetime.now() + timedelta(days=1), left_table_query_string=entity_df_sql_table, ) - job = BigQueryRetrievalJob(query=query) + + job = BigQueryRetrievalJob(query=query, client=client) return job class BigQueryRetrievalJob(RetrievalJob): - def __init__(self, query): + def __init__(self, query, client): self.query = query + self.client = client def to_df(self): # TODO: Ideally only start this job when the user runs "get_historical_features", not when they run to_df() - client = bigquery.Client() - df = client.query(self.query).to_dataframe(create_bqstorage_client=True) + df = self.client.query(self.query).to_dataframe(create_bqstorage_client=True) return df @@ -135,9 +155,8 @@ class FeatureViewQueryContext: entity_selections: List[str] -def _upload_entity_df_into_bigquery(project, entity_df) -> str: +def _upload_entity_df_into_bigquery(project, entity_df, client) -> str: """Uploads a Pandas entity dataframe into a BigQuery table and returns a reference to the resulting table""" - client = bigquery.Client() # First create the BigQuery dataset if it doesn't exist dataset = bigquery.Dataset(f"{client.project}.feast_{project}") diff --git a/sdk/python/tests/test_online_retrieval.py b/sdk/python/tests/test_online_retrieval.py index 013897b7c8..856595188d 100644 --- a/sdk/python/tests/test_online_retrieval.py +++ b/sdk/python/tests/test_online_retrieval.py @@ -5,6 +5,7 @@ import pytest from feast import FeatureStore, RepoConfig +from feast.errors import FeatureViewNotFoundException from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import RegistryConfig @@ -116,7 +117,7 @@ def test_online() -> None: assert "customer_driver_combined__trips" in result # invalid table reference - with pytest.raises(ValueError): + with pytest.raises(FeatureViewNotFoundException): store.get_online_features( feature_refs=["driver_locations_bad:lon"], entity_rows=[{"driver": 1}], ) From 56fed8390cc2cb6b45febc5e02871ca2b96ff4e7 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Sun, 18 Apr 2021 18:55:58 -0700 Subject: [PATCH 06/11] Add proper gcp project missing error for bigquery client Signed-off-by: Willem Pienaar --- sdk/python/feast/feature_store.py | 2 +- sdk/python/feast/infra/offline_stores/bigquery.py | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index a4c7773bd6..e36a40836e 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -22,7 +22,7 @@ from feast import utils from feast.entity import Entity -from feast.errors import FeatureViewNotFoundException, FeastProviderLoginError +from feast.errors import FeastProviderLoginError, FeatureViewNotFoundException from feast.feature_view import FeatureView from feast.infra.provider import Provider, RetrievalJob, get_provider from feast.online_response import OnlineResponse, _infer_online_entity_rows diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 6c3ec279b5..9131f8433a 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -77,6 +77,13 @@ def _get_bigquery_client(): + '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your ' "local Google Cloud account" ) + except EnvironmentError as e: + raise FeastProviderLoginError( + "GCP error: " + + str(e) + + "\nIt may be necessary to set a default GCP project by running " + '"gcloud config set project your-project"' + ) return client From 7d23ee14193fe7fcc73872e859ff26dbaad0d5d0 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Mon, 19 Apr 2021 12:56:23 -0700 Subject: [PATCH 07/11] Ensure all apply commands initialize a feature repository Signed-off-by: Willem Pienaar --- sdk/python/feast/cli.py | 8 -------- sdk/python/feast/registry.py | 8 ++++++++ sdk/python/feast/repo_operations.py | 1 + 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index e5c988006d..6466d358ab 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -246,14 +246,6 @@ def init_command(project_directory, minimal: bool, template: str): """Create a new Feast repository""" if not project_directory: project_directory = generate_project_name() - if template and minimal: - from colorama import Fore, Style - - click.echo( - f"Please select either a {Style.BRIGHT + Fore.GREEN}template{Style.RESET_ALL} or " - f"{Style.BRIGHT + Fore.GREEN}minimal{Style.RESET_ALL}, not both" - ) - exit(1) if minimal: template = "minimal" diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index e1b151da1d..cdce694811 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -65,6 +65,14 @@ def __init__(self, registry_path: str, cache_ttl: timedelta): self.cached_registry_proto_ttl = cache_ttl return + def _initialize_registry(self): + """Explicitly forces the initialization of a registry""" + + def updater(registry_proto: RegistryProto): + return registry_proto # no-op + + self._registry_store.update_registry_proto(updater) + def apply_entity(self, entity: Entity, project: str): """ Registers a single entity with Feast diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 74ee038814..1a5a47d27b 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -65,6 +65,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): registry_path=registry_config.path, cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds), ) + registry._initialize_registry() sys.dont_write_bytecode = True repo = parse_repo(repo_path) sys.dont_write_bytecode = False From 32c630a91675eea32b873ebb085102058c2e266e Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Mon, 19 Apr 2021 15:46:27 -0700 Subject: [PATCH 08/11] Ensure FeatureStore object can be run from outside a repository Signed-off-by: Willem Pienaar --- sdk/python/feast/cli.py | 2 +- sdk/python/feast/feature_store.py | 8 ++++-- sdk/python/feast/registry.py | 17 +++++++---- sdk/python/feast/repo_operations.py | 5 +++- sdk/python/tests/test_cli_local.py | 39 ++++++++++++++++++++++++++ sdk/python/tests/test_feature_store.py | 12 ++++++++ 6 files changed, 74 insertions(+), 9 deletions(-) diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 6466d358ab..8520e2715f 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -181,7 +181,7 @@ def registry_dump_command(): cli_check_repo(Path.cwd()) repo_config = load_repo_config(Path.cwd()) - registry_dump(repo_config) + registry_dump(repo_config, repo_path=Path.cwd()) @cli.command("materialize") diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index e36a40836e..a72adcb21b 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os import sys from collections import OrderedDict, defaultdict from datetime import datetime, timedelta @@ -43,7 +44,7 @@ class FeatureStore: """ config: RepoConfig - repo_path: Optional[str] + repo_path: Path _registry: Registry def __init__( @@ -55,12 +56,13 @@ def __init__( repo_path: Path to a `feature_store.yaml` used to configure the feature store config (RepoConfig): Configuration object used to configure the feature store """ - self.repo_path = repo_path if repo_path is not None and config is not None: raise ValueError("You cannot specify both repo_path and config") if config is not None: + self.repo_path = Path(os.getcwd()) self.config = config elif repo_path is not None: + self.repo_path = Path(repo_path) self.config = load_repo_config(Path(repo_path)) else: raise ValueError("Please specify one of repo_path or config") @@ -68,6 +70,7 @@ def __init__( registry_config = self.config.get_registry_config() self._registry = Registry( registry_path=registry_config.path, + repo_path=self.repo_path, cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds), ) self._tele = Telemetry() @@ -103,6 +106,7 @@ def refresh_registry(self): registry_config = self.config.get_registry_config() self._registry = Registry( registry_path=registry_config.path, + repo_path=self.repo_path, cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds), ) self._registry.refresh() diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index cdce694811..ea14ceb0ae 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -45,11 +45,13 @@ class Registry: cached_registry_proto_ttl: timedelta cache_being_updated: bool = False - def __init__(self, registry_path: str, cache_ttl: timedelta): + def __init__(self, registry_path: str, repo_path: Path, cache_ttl: timedelta): """ Create the Registry object. Args: + repo_path: Path to the base of the Feast repository + cache_ttl: The amount of time that cached registry state stays valid registry_path: filepath or GCS URI that is the location of the object store registry, or where it will be created if it does not exist yet. """ @@ -57,7 +59,9 @@ def __init__(self, registry_path: str, cache_ttl: timedelta): if uri.scheme == "gs": self._registry_store: RegistryStore = GCSRegistryStore(registry_path) elif uri.scheme == "file" or uri.scheme == "": - self._registry_store = LocalRegistryStore(registry_path) + self._registry_store = LocalRegistryStore( + repo_path=repo_path, registry_path_string=registry_path + ) else: raise Exception( f"Registry path {registry_path} has unsupported scheme {uri.scheme}. Supported schemes are file and gs." @@ -389,9 +393,12 @@ def update_registry_proto(self, updater: Callable[[RegistryProto], RegistryProto class LocalRegistryStore(RegistryStore): - def __init__(self, filepath: str): - self._filepath = Path(filepath) - return + def __init__(self, repo_path: Path, registry_path_string: str): + registry_path = Path(registry_path_string) + if registry_path.is_absolute(): + self._filepath = registry_path + else: + self._filepath = repo_path.joinpath(registry_path) def get_registry_proto(self): registry_proto = RegistryProto() diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 1a5a47d27b..6828bd3c37 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -63,6 +63,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): project = repo_config.project registry = Registry( registry_path=registry_config.path, + repo_path=repo_path, cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds), ) registry._initialize_registry() @@ -152,6 +153,7 @@ def teardown(repo_config: RepoConfig, repo_path: Path): registry_config = repo_config.get_registry_config() registry = Registry( registry_path=registry_config.path, + repo_path=repo_path, cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds), ) project = repo_config.project @@ -162,12 +164,13 @@ def teardown(repo_config: RepoConfig, repo_path: Path): infra_provider.teardown_infra(project, tables=registry_tables) -def registry_dump(repo_config: RepoConfig): +def registry_dump(repo_config: RepoConfig, repo_path: Path): """ For debugging only: output contents of the metadata registry """ registry_config = repo_config.get_registry_config() project = repo_config.project registry = Registry( registry_path=registry_config.path, + repo_path=repo_path, cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds), ) diff --git a/sdk/python/tests/test_cli_local.py b/sdk/python/tests/test_cli_local.py index edbc04fec3..c32197d944 100644 --- a/sdk/python/tests/test_cli_local.py +++ b/sdk/python/tests/test_cli_local.py @@ -71,3 +71,42 @@ def test_workflow() -> None: result = runner.run(["teardown"], cwd=repo_path) assert result.returncode == 0 + +def test_non_local_feature_repo() -> None: + """ + Test running apply on a sample repo, and make sure the infra gets created. + """ + runner = CliRunner() + with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name: + + # Construct an example repo in a temporary dir + repo_path = Path(repo_dir_name) + data_path = Path(data_dir_name) + + repo_config = repo_path / "feature_store.yaml" + + repo_config.write_text( + dedent( + f""" + project: foo + registry: data/registry.db + provider: local + online_store: + path: data/online_store.db + """ + ) + ) + + repo_example = repo_path / "example.py" + repo_example.write_text( + (Path(__file__).parent / "example_feature_repo_1.py").read_text() + ) + + result = runner.run(["apply"], cwd=repo_path) + assert result.returncode == 0 + + fs = FeatureStore(repo_path=str(repo_path)) + assert len(fs.list_feature_views()) == 3 + + result = runner.run(["teardown"], cwd=repo_path) + assert result.returncode == 0 \ No newline at end of file diff --git a/sdk/python/tests/test_feature_store.py b/sdk/python/tests/test_feature_store.py index f0e25b6d62..1fe90a5bb2 100644 --- a/sdk/python/tests/test_feature_store.py +++ b/sdk/python/tests/test_feature_store.py @@ -304,3 +304,15 @@ def test_apply_object_and_read(test_feature_store): assert e1 == e1_actual assert fv2 != fv1_actual assert e2 != e1_actual + +def test_apply_remote_repo(): + fd, registry_path = mkstemp() + fd, online_store_path = mkstemp() + return FeatureStore( + config=RepoConfig( + registry=registry_path, + project="default", + provider="local", + online_store=SqliteOnlineStoreConfig(path=online_store_path), + ) + ) \ No newline at end of file From 0af71ad68fa368df74b65ad977cc631e36fec8d5 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Mon, 19 Apr 2021 15:50:04 -0700 Subject: [PATCH 09/11] Fix linting Signed-off-by: Willem Pienaar --- sdk/python/tests/test_cli_local.py | 8 ++++---- sdk/python/tests/test_feature_store.py | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/sdk/python/tests/test_cli_local.py b/sdk/python/tests/test_cli_local.py index c32197d944..6444492ad2 100644 --- a/sdk/python/tests/test_cli_local.py +++ b/sdk/python/tests/test_cli_local.py @@ -72,22 +72,22 @@ def test_workflow() -> None: result = runner.run(["teardown"], cwd=repo_path) assert result.returncode == 0 + def test_non_local_feature_repo() -> None: """ Test running apply on a sample repo, and make sure the infra gets created. """ runner = CliRunner() - with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name: + with tempfile.TemporaryDirectory() as repo_dir_name: # Construct an example repo in a temporary dir repo_path = Path(repo_dir_name) - data_path = Path(data_dir_name) repo_config = repo_path / "feature_store.yaml" repo_config.write_text( dedent( - f""" + """ project: foo registry: data/registry.db provider: local @@ -109,4 +109,4 @@ def test_non_local_feature_repo() -> None: assert len(fs.list_feature_views()) == 3 result = runner.run(["teardown"], cwd=repo_path) - assert result.returncode == 0 \ No newline at end of file + assert result.returncode == 0 diff --git a/sdk/python/tests/test_feature_store.py b/sdk/python/tests/test_feature_store.py index 1fe90a5bb2..015f0ab2ad 100644 --- a/sdk/python/tests/test_feature_store.py +++ b/sdk/python/tests/test_feature_store.py @@ -305,6 +305,7 @@ def test_apply_object_and_read(test_feature_store): assert fv2 != fv1_actual assert e2 != e1_actual + def test_apply_remote_repo(): fd, registry_path = mkstemp() fd, online_store_path = mkstemp() @@ -315,4 +316,4 @@ def test_apply_remote_repo(): provider="local", online_store=SqliteOnlineStoreConfig(path=online_store_path), ) - ) \ No newline at end of file + ) From b3f9021c0130626777703411b232068e6a5a7f12 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Mon, 19 Apr 2021 18:31:02 -0700 Subject: [PATCH 10/11] Fix missing sqlite database when using relative paths Signed-off-by: Willem Pienaar --- sdk/python/feast/feature_store.py | 3 ++- sdk/python/feast/infra/local.py | 10 +++++++--- sdk/python/feast/infra/provider.py | 5 +++-- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index a72adcb21b..8810749f62 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -85,7 +85,8 @@ def project(self) -> str: return self.config.project def _get_provider(self) -> Provider: - return get_provider(self.config) + # TODO: Bake self.repo_path into self.config so that we dont only have one interface to paths + return get_provider(self.config, self.repo_path) def refresh_registry(self): """Fetches and caches a copy of the feature registry in memory. diff --git a/sdk/python/feast/infra/local.py b/sdk/python/feast/infra/local.py index cb848f7b43..658210d1f0 100644 --- a/sdk/python/feast/infra/local.py +++ b/sdk/python/feast/infra/local.py @@ -25,15 +25,19 @@ class LocalProvider(Provider): - _db_path: str + _db_path: Path - def __init__(self, config: RepoConfig): + def __init__(self, config: RepoConfig, repo_path: Path): assert config is not None assert config.online_store is not None local_online_store_config = config.online_store assert isinstance(local_online_store_config, SqliteOnlineStoreConfig) - self._db_path = local_online_store_config.path + local_path = Path(local_online_store_config.path) + if local_path.is_absolute(): + self._db_path = local_path + else: + self._db_path = repo_path.joinpath(local_path) def _get_conn(self): Path(self._db_path).parent.mkdir(exist_ok=True) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 8d1d6639f7..aa27cdc143 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -1,5 +1,6 @@ import abc from datetime import datetime +from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union import pandas @@ -123,7 +124,7 @@ def online_read( ... -def get_provider(config: RepoConfig) -> Provider: +def get_provider(config: RepoConfig, repo_path: Path) -> Provider: if config.provider == "gcp": from feast.infra.gcp import GcpProvider @@ -131,7 +132,7 @@ def get_provider(config: RepoConfig) -> Provider: elif config.provider == "local": from feast.infra.local import LocalProvider - return LocalProvider(config) + return LocalProvider(config, repo_path) else: raise ValueError(config) From 5c9a57ca06cbdd77273bc0a0f829fc6c64ba0401 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Mon, 19 Apr 2021 18:39:26 -0700 Subject: [PATCH 11/11] Small tweaks based on PR feedback Signed-off-by: Willem Pienaar --- sdk/python/feast/infra/gcp.py | 1 - .../feast/infra/offline_stores/bigquery.py | 48 +++++++++---------- sdk/python/feast/registry.py | 13 +++-- sdk/python/feast/repo_operations.py | 4 +- 4 files changed, 32 insertions(+), 34 deletions(-) diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index a88dad2a29..3d19081a00 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -45,7 +45,6 @@ def _initialize_client(self): else: return datastore.Client() except DefaultCredentialsError as e: - assert isinstance(e, DefaultCredentialsError) raise FeastProviderLoginError( str(e) + '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your local Google Cloud account' diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 9131f8433a..258afeb86d 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -61,32 +61,10 @@ def pull_latest_from_table_or_query( @staticmethod def _pull_query(query: str) -> pyarrow.Table: - client = BigQueryOfflineStore._get_bigquery_client() + client = _get_bigquery_client() query_job = client.query(query) return query_job.to_arrow() - @staticmethod - def _get_bigquery_client(): - try: - from google.cloud import bigquery - - client = bigquery.Client() - except DefaultCredentialsError as e: - raise FeastProviderLoginError( - str(e) - + '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your ' - "local Google Cloud account" - ) - except EnvironmentError as e: - raise FeastProviderLoginError( - "GCP error: " - + str(e) - + "\nIt may be necessary to set a default GCP project by running " - '"gcloud config set project your-project"' - ) - - return client - @staticmethod def get_historical_features( config: RepoConfig, @@ -98,7 +76,7 @@ def get_historical_features( ) -> RetrievalJob: # TODO: Add entity_df validation in order to fail before interacting with BigQuery - client = BigQueryOfflineStore._get_bigquery_client() + client = _get_bigquery_client() if type(entity_df) is str: entity_df_sql_table = f"({entity_df})" @@ -270,6 +248,28 @@ def build_point_in_time_query( return query +def _get_bigquery_client(): + try: + from google.cloud import bigquery + + client = bigquery.Client() + except DefaultCredentialsError as e: + raise FeastProviderLoginError( + str(e) + + '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your ' + "local Google Cloud account" + ) + except EnvironmentError as e: + raise FeastProviderLoginError( + "GCP error: " + + str(e) + + "\nIt may be necessary to set a default GCP project by running " + '"gcloud config set project your-project"' + ) + + return client + + # TODO: Optimizations # * Use GENERATE_UUID() instead of ROW_NUMBER(), or join on entity columns directly # * Precompute ROW_NUMBER() so that it doesn't have to be recomputed for every query on entity_dataframe diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index ea14ceb0ae..473de1b49c 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -71,11 +71,7 @@ def __init__(self, registry_path: str, repo_path: Path, cache_ttl: timedelta): def _initialize_registry(self): """Explicitly forces the initialization of a registry""" - - def updater(registry_proto: RegistryProto): - return registry_proto # no-op - - self._registry_store.update_registry_proto(updater) + self._registry_store.update_registry_proto() def apply_entity(self, entity: Entity, project: str): """ @@ -409,14 +405,17 @@ def get_registry_proto(self): f'Registry not found at path "{self._filepath}". Have you run "feast apply"?' ) - def update_registry_proto(self, updater: Callable[[RegistryProto], RegistryProto]): + def update_registry_proto( + self, updater: Callable[[RegistryProto], RegistryProto] = None + ): try: registry_proto = self.get_registry_proto() except FileNotFoundError: registry_proto = RegistryProto() registry_proto.registry_schema_version = REGISTRY_SCHEMA_VERSION - registry_proto = updater(registry_proto) + if updater: + registry_proto = updater(registry_proto) self._write_registry(registry_proto) return diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 6828bd3c37..341629f364 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -119,7 +119,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): f"Registered feature view {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL}" ) - infra_provider = get_provider(repo_config) + infra_provider = get_provider(repo_config, repo_path) all_to_delete: List[Union[FeatureTable, FeatureView]] = [] all_to_delete.extend(tables_to_delete) @@ -160,7 +160,7 @@ def teardown(repo_config: RepoConfig, repo_path: Path): registry_tables: List[Union[FeatureTable, FeatureView]] = [] registry_tables.extend(registry.list_feature_tables(project=project)) registry_tables.extend(registry.list_feature_views(project=project)) - infra_provider = get_provider(repo_config) + infra_provider = get_provider(repo_config, repo_path) infra_provider.teardown_infra(project, tables=registry_tables)