Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve exception handling, logging, and validation #1477

Merged
merged 11 commits into from
Apr 20, 2021
18 changes: 6 additions & 12 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import pkg_resources
import yaml

from feast.errors import FeastObjectNotFoundException
from feast.errors import FeastObjectNotFoundException, FeastProviderLoginError
from feast.feature_store import FeatureStore
from feast.repo_config import load_repo_config
from feast.repo_operations import (
Expand Down Expand Up @@ -156,8 +156,10 @@ def apply_total_command():
"""
cli_check_repo(Path.cwd())
repo_config = load_repo_config(Path.cwd())

apply_total(repo_config, Path.cwd())
try:
apply_total(repo_config, Path.cwd())
except FeastProviderLoginError as e:
print(str(e))


@cli.command("teardown")
Expand All @@ -179,7 +181,7 @@ def registry_dump_command():
cli_check_repo(Path.cwd())
repo_config = load_repo_config(Path.cwd())

registry_dump(repo_config)
registry_dump(repo_config, repo_path=Path.cwd())


@cli.command("materialize")
Expand Down Expand Up @@ -244,14 +246,6 @@ def init_command(project_directory, minimal: bool, template: str):
"""Create a new Feast repository"""
if not project_directory:
project_directory = generate_project_name()
if template and minimal:
from colorama import Fore, Style

click.echo(
f"Please select either a {Style.BRIGHT + Fore.GREEN}template{Style.RESET_ALL} or "
f"{Style.BRIGHT + Fore.GREEN}minimal{Style.RESET_ALL}, not both"
)
exit(1)

if minimal:
template = "minimal"
Expand Down
27 changes: 21 additions & 6 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,30 @@ class FeastObjectNotFoundException(Exception):


class EntityNotFoundException(FeastObjectNotFoundException):
def __init__(self, project, name):
super().__init__(f"Entity {name} does not exist in project {project}")
def __init__(self, name, project=None):
if project:
super().__init__(f"Entity {name} does not exist in project {project}")
else:
super().__init__(f"Entity {name} does not exist")


class FeatureViewNotFoundException(FeastObjectNotFoundException):
def __init__(self, project, name):
super().__init__(f"Feature view {name} does not exist in project {project}")
def __init__(self, name, project=None):
if project:
super().__init__(f"Feature view {name} does not exist in project {project}")
else:
super().__init__(f"Feature view {name} does not exist")


class FeatureTableNotFoundException(FeastObjectNotFoundException):
def __init__(self, project, name):
super().__init__(f"Feature table {name} does not exist in project {project}")
def __init__(self, name, project=None):
if project:
super().__init__(
f"Feature table {name} does not exist in project {project}"
)
else:
super().__init__(f"Feature table {name} does not exist")


class FeastProviderLoginError(Exception):
"""Error class that indicates a user has not authenticated with their provider."""
43 changes: 30 additions & 13 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
from collections import OrderedDict, defaultdict
from datetime import datetime, timedelta
from pathlib import Path
Expand All @@ -21,6 +23,7 @@

from feast import utils
from feast.entity import Entity
from feast.errors import FeastProviderLoginError, FeatureViewNotFoundException
from feast.feature_view import FeatureView
from feast.infra.provider import Provider, RetrievalJob, get_provider
from feast.online_response import OnlineResponse, _infer_online_entity_rows
Expand All @@ -41,7 +44,7 @@ class FeatureStore:
"""

config: RepoConfig
repo_path: Optional[str]
repo_path: Path
_registry: Registry

def __init__(
Expand All @@ -53,19 +56,21 @@ def __init__(
repo_path: Path to a `feature_store.yaml` used to configure the feature store
config (RepoConfig): Configuration object used to configure the feature store
"""
self.repo_path = repo_path
if repo_path is not None and config is not None:
raise ValueError("You cannot specify both repo_path and config")
if config is not None:
self.repo_path = Path(os.getcwd())
self.config = config
elif repo_path is not None:
self.repo_path = Path(repo_path)
self.config = load_repo_config(Path(repo_path))
else:
raise ValueError("Please specify one of repo_path or config")

registry_config = self.config.get_registry_config()
self._registry = Registry(
registry_path=registry_config.path,
repo_path=self.repo_path,
cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds),
)
self._tele = Telemetry()
Expand All @@ -80,7 +85,8 @@ def project(self) -> str:
return self.config.project

def _get_provider(self) -> Provider:
return get_provider(self.config)
# TODO: Bake self.repo_path into self.config so that we dont only have one interface to paths
return get_provider(self.config, self.repo_path)

def refresh_registry(self):
"""Fetches and caches a copy of the feature registry in memory.
Expand All @@ -101,6 +107,7 @@ def refresh_registry(self):
registry_config = self.config.get_registry_config()
self._registry = Registry(
registry_path=registry_config.path,
repo_path=self.repo_path,
cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds),
)
self._registry.refresh()
Expand Down Expand Up @@ -271,16 +278,26 @@ def get_historical_features(
all_feature_views = self._registry.list_feature_views(
project=self.config.project
)
feature_views = _get_requested_feature_views(feature_refs, all_feature_views)
try:
feature_views = _get_requested_feature_views(
feature_refs, all_feature_views
)
except FeatureViewNotFoundException as e:
sys.exit(e)

provider = self._get_provider()
job = provider.get_historical_features(
self.config,
feature_views,
feature_refs,
entity_df,
self._registry,
self.project,
)
try:
job = provider.get_historical_features(
self.config,
feature_views,
feature_refs,
entity_df,
self._registry,
self.project,
)
except FeastProviderLoginError as e:
sys.exit(e)

return job

def materialize_incremental(
Expand Down Expand Up @@ -529,7 +546,7 @@ def _group_refs(
for ref in feature_refs:
view_name, feat_name = ref.split(":")
if view_name not in view_index:
raise ValueError(f"Could not find feature view from reference {ref}")
raise FeatureViewNotFoundException(view_name)
views_features[view_name].append(feat_name)

result = []
Expand Down
16 changes: 12 additions & 4 deletions sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import mmh3
import pandas
import pyarrow
from google.auth.exceptions import DefaultCredentialsError

from feast import FeatureTable, utils
from feast.errors import FeastProviderLoginError
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.offline_stores.helpers import get_offline_store_from_sources
Expand Down Expand Up @@ -37,10 +39,16 @@ def __init__(self, config: RepoConfig):
def _initialize_client(self):
from google.cloud import datastore

if self._gcp_project_id is not None:
return datastore.Client(self._gcp_project_id)
else:
return datastore.Client()
try:
if self._gcp_project_id is not None:
return datastore.Client(self._gcp_project_id)
else:
return datastore.Client()
except DefaultCredentialsError as e:
raise FeastProviderLoginError(
str(e)
+ '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your local Google Cloud account'
)

def update_infra(
self,
Expand Down
10 changes: 7 additions & 3 deletions sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@


class LocalProvider(Provider):
_db_path: str
_db_path: Path

def __init__(self, config: RepoConfig):
def __init__(self, config: RepoConfig, repo_path: Path):

assert config is not None
assert config.online_store is not None
local_online_store_config = config.online_store
assert isinstance(local_online_store_config, SqliteOnlineStoreConfig)
self._db_path = local_online_store_config.path
local_path = Path(local_online_store_config.path)
if local_path.is_absolute():
self._db_path = local_path
else:
self._db_path = repo_path.joinpath(local_path)

def _get_conn(self):
Path(self._db_path).parent.mkdir(exist_ok=True)
Expand Down
46 changes: 36 additions & 10 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

import pandas
import pyarrow
from google.auth.exceptions import DefaultCredentialsError
from google.cloud import bigquery
from jinja2 import BaseLoader, Environment

from feast.data_source import BigQuerySource, DataSource
from feast.errors import FeastProviderLoginError
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.provider import (
Expand Down Expand Up @@ -59,9 +61,7 @@ def pull_latest_from_table_or_query(

@staticmethod
def _pull_query(query: str) -> pyarrow.Table:
from google.cloud import bigquery

client = bigquery.Client()
client = _get_bigquery_client()
query_job = client.query(query)
return query_job.to_arrow()

Expand All @@ -76,14 +76,18 @@ def get_historical_features(
) -> RetrievalJob:
# TODO: Add entity_df validation in order to fail before interacting with BigQuery

client = _get_bigquery_client()

if type(entity_df) is str:
entity_df_sql_table = f"({entity_df})"
elif isinstance(entity_df, pandas.DataFrame):
if "event_timestamp" not in entity_df.columns:
raise ValueError(
"Please provide an entity_df with a column named event_timestamp representing the time of events."
)
table_id = _upload_entity_df_into_bigquery(config.project, entity_df)
table_id = _upload_entity_df_into_bigquery(
config.project, entity_df, client
)
entity_df_sql_table = f"`{table_id}`"
else:
raise ValueError(
Expand All @@ -104,18 +108,19 @@ def get_historical_features(
max_timestamp=datetime.now() + timedelta(days=1),
left_table_query_string=entity_df_sql_table,
)
job = BigQueryRetrievalJob(query=query)

job = BigQueryRetrievalJob(query=query, client=client)
return job


class BigQueryRetrievalJob(RetrievalJob):
def __init__(self, query):
def __init__(self, query, client):
self.query = query
self.client = client

def to_df(self):
# TODO: Ideally only start this job when the user runs "get_historical_features", not when they run to_df()
client = bigquery.Client()
df = client.query(self.query).to_dataframe(create_bqstorage_client=True)
df = self.client.query(self.query).to_dataframe(create_bqstorage_client=True)
return df


Expand All @@ -135,9 +140,8 @@ class FeatureViewQueryContext:
entity_selections: List[str]


def _upload_entity_df_into_bigquery(project, entity_df) -> str:
def _upload_entity_df_into_bigquery(project, entity_df, client) -> str:
"""Uploads a Pandas entity dataframe into a BigQuery table and returns a reference to the resulting table"""
client = bigquery.Client()

# First create the BigQuery dataset if it doesn't exist
dataset = bigquery.Dataset(f"{client.project}.feast_{project}")
Expand Down Expand Up @@ -244,6 +248,28 @@ def build_point_in_time_query(
return query


def _get_bigquery_client():
try:
from google.cloud import bigquery

client = bigquery.Client()
except DefaultCredentialsError as e:
raise FeastProviderLoginError(
str(e)
+ '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your '
"local Google Cloud account"
)
except EnvironmentError as e:
raise FeastProviderLoginError(
"GCP error: "
+ str(e)
+ "\nIt may be necessary to set a default GCP project by running "
'"gcloud config set project your-project"'
)

return client


# TODO: Optimizations
# * Use GENERATE_UUID() instead of ROW_NUMBER(), or join on entity columns directly
# * Precompute ROW_NUMBER() so that it doesn't have to be recomputed for every query on entity_dataframe
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import abc
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import pandas
Expand Down Expand Up @@ -123,15 +124,15 @@ def online_read(
...


def get_provider(config: RepoConfig) -> Provider:
def get_provider(config: RepoConfig, repo_path: Path) -> Provider:
if config.provider == "gcp":
from feast.infra.gcp import GcpProvider

return GcpProvider(config)
elif config.provider == "local":
from feast.infra.local import LocalProvider

return LocalProvider(config)
return LocalProvider(config, repo_path)
else:
raise ValueError(config)

Expand Down
Loading