Skip to content

Commit

Permalink
Add offline_store config (#1552)
Browse files Browse the repository at this point in the history
* Add offline_store config

Signed-off-by: Tsotne Tabidze <[email protected]>

* Enforce offline_store during feast apply, rename entity_dataset_name to dataset

Signed-off-by: Tsotne Tabidze <[email protected]>

* Remove ugly getattr since it's unnecessary anymore

Signed-off-by: Tsotne Tabidze <[email protected]>

* Rename Bigquery to BigQuery

Signed-off-by: Tsotne Tabidze <[email protected]>
  • Loading branch information
Tsotne Tabidze authored and woop committed May 19, 2021
1 parent 883f52b commit b359bee
Show file tree
Hide file tree
Showing 14 changed files with 206 additions and 84 deletions.
7 changes: 7 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,10 @@ def __init__(self, extras_type: str, nested_error: str):
+ f"You may need run {Style.BRIGHT + Fore.GREEN}pip install 'feast[{extras_type}]'{Style.RESET_ALL}"
)
super().__init__(message)


class FeastOfflineStoreUnsupportedDataSource(Exception):
def __init__(self, offline_store_name: str, data_source_name: str):
super().__init__(
f"Offline Store '{offline_store_name}' does not support data source '{data_source_name}'"
)
24 changes: 8 additions & 16 deletions sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@

import mmh3
import pandas
import pyarrow
from tqdm import tqdm

from feast import FeatureTable, utils
from feast.entity import Entity
from feast.errors import FeastProviderLoginError
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.offline_stores.helpers import get_offline_store_from_sources
from feast.infra.offline_stores.helpers import get_offline_store_from_config
from feast.infra.provider import (
Provider,
RetrievalJob,
Expand All @@ -28,7 +27,7 @@

try:
from google.auth.exceptions import DefaultCredentialsError
from google.cloud import bigquery, datastore
from google.cloud import datastore
except ImportError as e:
from feast.errors import FeastExtrasDependencyImportError

Expand All @@ -40,11 +39,14 @@ class GcpProvider(Provider):

def __init__(self, config: RepoConfig):
assert isinstance(config.online_store, DatastoreOnlineStoreConfig)
assert config.offline_store is not None
if config and config.online_store and config.online_store.project_id:
self._gcp_project_id = config.online_store.project_id
else:
self._gcp_project_id = None

self.offline_store = get_offline_store_from_config(config.offline_store)

def _initialize_client(self):
try:
if self._gcp_project_id is not None:
Expand Down Expand Up @@ -168,8 +170,7 @@ def materialize_single_feature_view(
start_date = utils.make_tzaware(start_date)
end_date = utils.make_tzaware(end_date)

offline_store = get_offline_store_from_sources([feature_view.input])
table = offline_store.pull_latest_from_table_or_query(
table = self.offline_store.pull_latest_from_table_or_query(
data_source=feature_view.input,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
Expand All @@ -193,25 +194,16 @@ def materialize_single_feature_view(
feature_view.materialization_intervals.append((start_date, end_date))
registry.apply_feature_view(feature_view, project)

@staticmethod
def _pull_query(query: str) -> pyarrow.Table:
client = bigquery.Client()
query_job = client.query(query)
return query_job.to_arrow()

@staticmethod
def get_historical_features(
self,
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pandas.DataFrame, str],
registry: Registry,
project: str,
) -> RetrievalJob:
offline_store = get_offline_store_from_sources(
[feature_view.input for feature_view in feature_views]
)
job = offline_store.get_historical_features(
job = self.offline_store.get_historical_features(
config=config,
feature_views=feature_views,
feature_refs=feature_refs,
Expand Down
21 changes: 8 additions & 13 deletions sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.offline_stores.helpers import get_offline_store_from_sources
from feast.infra.offline_stores.helpers import get_offline_store_from_config
from feast.infra.provider import (
Provider,
RetrievalJob,
Expand All @@ -30,16 +30,15 @@ class LocalProvider(Provider):
_db_path: Path

def __init__(self, config: RepoConfig, repo_path: Path):

assert config is not None
assert config.online_store is not None
local_online_store_config = config.online_store
assert isinstance(local_online_store_config, SqliteOnlineStoreConfig)
local_path = Path(local_online_store_config.path)
assert isinstance(config.online_store, SqliteOnlineStoreConfig)
assert config.offline_store is not None
local_path = Path(config.online_store.path)
if local_path.is_absolute():
self._db_path = local_path
else:
self._db_path = repo_path.joinpath(local_path)
self.offline_store = get_offline_store_from_config(config.offline_store)

def _get_conn(self):
Path(self._db_path).parent.mkdir(exist_ok=True)
Expand Down Expand Up @@ -184,8 +183,7 @@ def materialize_single_feature_view(
start_date = utils.make_tzaware(start_date)
end_date = utils.make_tzaware(end_date)

offline_store = get_offline_store_from_sources([feature_view.input])
table = offline_store.pull_latest_from_table_or_query(
table = self.offline_store.pull_latest_from_table_or_query(
data_source=feature_view.input,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
Expand All @@ -209,19 +207,16 @@ def materialize_single_feature_view(
feature_view.materialization_intervals.append((start_date, end_date))
registry.apply_feature_view(feature_view, project)

@staticmethod
def get_historical_features(
self,
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: Registry,
project: str,
) -> RetrievalJob:
offline_store = get_offline_store_from_sources(
[feature_view.input for feature_view in feature_views]
)
return offline_store.get_historical_features(
return self.offline_store.get_historical_features(
config=config,
feature_views=feature_views,
feature_refs=feature_refs,
Expand Down
12 changes: 7 additions & 5 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
_get_requested_feature_views_to_features_dict,
)
from feast.registry import Registry
from feast.repo_config import RepoConfig
from feast.repo_config import BigQueryOfflineStoreConfig, RepoConfig

try:
from google.auth.exceptions import DefaultCredentialsError
Expand Down Expand Up @@ -100,8 +100,10 @@ def get_historical_features(
entity_df
)

assert isinstance(config.offline_store, BigQueryOfflineStoreConfig)

table_id = _upload_entity_df_into_bigquery(
config.project, entity_df, client
config.project, config.offline_store.dataset, entity_df, client,
)
entity_df_sql_table = f"`{table_id}`"
else:
Expand Down Expand Up @@ -198,11 +200,11 @@ class FeatureViewQueryContext:
entity_selections: List[str]


def _upload_entity_df_into_bigquery(project, entity_df, client) -> str:
def _upload_entity_df_into_bigquery(project, dataset_name, entity_df, client) -> str:
"""Uploads a Pandas entity dataframe into a BigQuery table and returns a reference to the resulting table"""

# First create the BigQuery dataset if it doesn't exist
dataset = bigquery.Dataset(f"{client.project}.feast_{project}")
dataset = bigquery.Dataset(f"{client.project}.{dataset_name}")
dataset.location = "US"
client.create_dataset(
dataset, exists_ok=True
Expand All @@ -213,7 +215,7 @@ def _upload_entity_df_into_bigquery(project, entity_df, client) -> str:

# Upload the dataframe into BigQuery, creating a temporary table
job_config = bigquery.LoadJobConfig()
table_id = f"{client.project}.feast_{project}.entity_df_{int(time.time())}"
table_id = f"{client.project}.{dataset_name}.entity_df_{project}_{int(time.time())}"
job = client.load_table_from_dataframe(entity_df, table_id, job_config=job_config,)
job.result()

Expand Down
43 changes: 28 additions & 15 deletions sdk/python/feast/infra/offline_stores/helpers.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,41 @@
from typing import List

from feast.data_source import BigQuerySource, DataSource, FileSource
from feast.errors import FeastOfflineStoreUnsupportedDataSource
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.repo_config import (
BigQueryOfflineStoreConfig,
FileOfflineStoreConfig,
OfflineStoreConfig,
)


def get_offline_store_from_sources(sources: List[DataSource]) -> OfflineStore:
"""Detect which offline store should be used for retrieving historical features"""

source_types = [type(source) for source in sources]
def get_offline_store_from_config(
offline_store_config: OfflineStoreConfig,
) -> OfflineStore:
"""Get the offline store from offline store config"""

# Retrieve features from ParquetOfflineStore
if all(source == FileSource for source in source_types):
if isinstance(offline_store_config, FileOfflineStoreConfig):
from feast.infra.offline_stores.file import FileOfflineStore

return FileOfflineStore()

# Retrieve features from BigQueryOfflineStore
if all(source == BigQuerySource for source in source_types):
elif isinstance(offline_store_config, BigQueryOfflineStoreConfig):
from feast.infra.offline_stores.bigquery import BigQueryOfflineStore

return BigQueryOfflineStore()

# Could not map inputs to an OfflineStore implementation
raise NotImplementedError(
"Unsupported combination of feature view input source types. Please ensure that all source types are "
"consistent and available in the same offline store."
raise ValueError(f"Unsupported offline store config '{offline_store_config}'")


def assert_offline_store_supports_data_source(
offline_store_config: OfflineStoreConfig, data_source: DataSource
):
if (
isinstance(offline_store_config, FileOfflineStoreConfig)
and isinstance(data_source, FileSource)
) or (
isinstance(offline_store_config, BigQueryOfflineStoreConfig)
and isinstance(data_source, BigQuerySource)
):
return
raise FeastOfflineStoreUnsupportedDataSource(
offline_store_config.type, data_source.__class__.__name__
)
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ def materialize_single_feature_view(
) -> None:
pass

@staticmethod
@abc.abstractmethod
def get_historical_features(
self,
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
Expand Down
Loading

0 comments on commit b359bee

Please sign in to comment.