From cfd49cf74911f2fa7d6458d399cb99e4d9d7b9ef Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Wed, 1 Sep 2021 22:55:54 -0400 Subject: [PATCH 1/2] On demand transforms for historical retrieval Signed-off-by: Danny Chiao --- .../feast/infra/offline_stores/bigquery.py | 49 ++++++++++++++++--- sdk/python/feast/infra/offline_stores/file.py | 36 ++++++++++++-- .../infra/offline_stores/offline_store.py | 24 +++++++++ .../infra/offline_stores/offline_utils.py | 2 +- .../feast/infra/offline_stores/redshift.py | 32 ++++++++++-- sdk/python/feast/infra/provider.py | 3 ++ sdk/python/feast/on_demand_feature_view.py | 2 +- 7 files changed, 130 insertions(+), 18 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 2bfd863991..edd6957b8c 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -3,7 +3,7 @@ from typing import Dict, List, Optional, Union import numpy as np -import pandas +import pandas as pd import pyarrow from pydantic import StrictStr from pydantic.typing import Literal @@ -19,6 +19,7 @@ from feast.feature_view import FeatureView from feast.infra.offline_stores import offline_utils from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob +from feast.on_demand_feature_view import OnDemandFeatureView from feast.registry import Registry from feast.repo_config import FeastConfigBaseModel, RepoConfig @@ -87,14 +88,21 @@ def pull_latest_from_table_or_query( WHERE _feast_row = 1 """ - return BigQueryRetrievalJob(query=query, client=client, config=config) + # When materializing a single feature view, we don't need full feature names. On demand transforms aren't materialized + return BigQueryRetrievalJob( + query=query, + client=client, + config=config, + full_feature_names=False, + on_demand_feature_views=None, + ) @staticmethod def get_historical_features( config: RepoConfig, feature_views: List[FeatureView], feature_refs: List[str], - entity_df: Union[pandas.DataFrame, str], + entity_df: Union[pd.DataFrame, str], registry: Registry, project: str, full_feature_names: bool = False, @@ -140,16 +148,41 @@ def get_historical_features( full_feature_names=full_feature_names, ) - return BigQueryRetrievalJob(query=query, client=client, config=config) + return BigQueryRetrievalJob( + query=query, + client=client, + config=config, + full_feature_names=full_feature_names, + on_demand_feature_views=registry.list_on_demand_feature_views( + project, allow_cache=True + ), + ) class BigQueryRetrievalJob(RetrievalJob): - def __init__(self, query, client, config): + def __init__( + self, + query: str, + client: bigquery.Client, + config: RepoConfig, + full_feature_names: bool, + on_demand_feature_views: Optional[List[OnDemandFeatureView]], + ): self.query = query self.client = client self.config = config + self._full_feature_names = full_feature_names + self._on_demand_feature_views = on_demand_feature_views + + @property + def full_feature_names(self) -> bool: + return self._full_feature_names + + @property + def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]: + return self._on_demand_feature_views - def to_df(self): + def to_df_internal(self) -> pd.DataFrame: # TODO: Ideally only start this job when the user runs "get_historical_features", not when they run to_df() df = self.client.query(self.query).to_dataframe(create_bqstorage_client=True) return df @@ -266,7 +299,7 @@ def _get_table_reference_for_new_entity( def _upload_entity_df_and_get_entity_schema( - client: Client, table_name: str, entity_df: Union[pandas.DataFrame, str], + client: Client, table_name: str, entity_df: Union[pd.DataFrame, str], ) -> Dict[str, np.dtype]: """Uploads a Pandas entity dataframe into a BigQuery table and returns the resulting table""" @@ -278,7 +311,7 @@ def _upload_entity_df_and_get_entity_schema( client.query(f"SELECT * FROM {table_name} LIMIT 1").result().to_dataframe() ) entity_schema = dict(zip(limited_entity_df.columns, limited_entity_df.dtypes)) - elif isinstance(entity_df, pandas.DataFrame): + elif isinstance(entity_df, pd.DataFrame): # Drop the index so that we dont have unnecessary columns entity_df.reset_index(drop=True, inplace=True) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 5c6f96df57..1afa5df08d 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -6,7 +6,7 @@ import pytz from pydantic.typing import Literal -from feast import FileSource +from feast import FileSource, OnDemandFeatureView from feast.data_source import DataSource from feast.errors import FeastJoinKeysDuringMaterialization from feast.feature_view import FeatureView @@ -30,13 +30,28 @@ class FileOfflineStoreConfig(FeastConfigBaseModel): class FileRetrievalJob(RetrievalJob): - def __init__(self, evaluation_function: Callable): + def __init__( + self, + evaluation_function: Callable, + full_feature_names: bool, + on_demand_feature_views: Optional[List[OnDemandFeatureView]], + ): """Initialize a lazy historical retrieval job""" # The evaluation function executes a stored procedure to compute a historical retrieval. self.evaluation_function = evaluation_function + self._full_feature_names = full_feature_names + self._on_demand_feature_views = on_demand_feature_views - def to_df(self): + @property + def full_feature_names(self) -> bool: + return self._full_feature_names + + @property + def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]: + return self._on_demand_feature_views + + def to_df_internal(self) -> pd.DataFrame: # Only execute the evaluation function to build the final historical retrieval dataframe at the last moment. df = self.evaluation_function() return df @@ -224,7 +239,13 @@ def evaluate_historical_retrieval(): return entity_df_with_features - job = FileRetrievalJob(evaluation_function=evaluate_historical_retrieval) + job = FileRetrievalJob( + evaluation_function=evaluate_historical_retrieval, + full_feature_names=full_feature_names, + on_demand_feature_views=registry.list_on_demand_feature_views( + project, allow_cache=True + ), + ) return job @staticmethod @@ -284,4 +305,9 @@ def evaluate_offline_job(): ) return last_values_df[columns_to_extract] - return FileRetrievalJob(evaluation_function=evaluate_offline_job) + # When materializing a single feature view, we don't need full feature names. On demand transforms aren't materialized + return FileRetrievalJob( + evaluation_function=evaluate_offline_job, + full_feature_names=False, + on_demand_feature_views=None, + ) diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index e8d32cd384..16fc3b1fa0 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -20,6 +20,7 @@ from feast.data_source import DataSource from feast.feature_view import FeatureView +from feast.on_demand_feature_view import OnDemandFeatureView from feast.registry import Registry from feast.repo_config import RepoConfig @@ -27,11 +28,34 @@ class RetrievalJob(ABC): """RetrievalJob is used to manage the execution of a historical feature retrieval""" + @property @abstractmethod + def full_feature_names(self) -> bool: + pass + + @property + @abstractmethod + def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]: + pass + def to_df(self) -> pd.DataFrame: + """Return dataset as Pandas DataFrame synchronously including on demand transforms""" + features_df = self.to_df_internal() + if self.on_demand_feature_views is None: + return features_df + + for odfv in self.on_demand_feature_views: + features_df = features_df.join( + odfv.get_transformed_features_df(self.full_feature_names, features_df) + ) + return features_df + + @abstractmethod + def to_df_internal(self) -> pd.DataFrame: """Return dataset as Pandas DataFrame synchronously""" pass + # TODO(adchia): implement ODFV for to_arrow method @abstractmethod def to_arrow(self) -> pyarrow.Table: """Return dataset as pyarrow Table synchronously""" diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index 304bdc8e91..f9125ab156 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -148,7 +148,7 @@ def build_point_in_time_query( entity_df_event_timestamp_col: str, query_template: str, full_feature_names: bool = False, -): +) -> str: """Build point-in-time query between each feature view table and the entity dataframe for Bigquery and Redshift""" template = Environment(loader=BaseLoader()).from_string(source=query_template) diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index 94d6a2877b..4a71d89752 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -9,7 +9,7 @@ from pydantic import StrictStr from pydantic.typing import Literal -from feast import RedshiftSource +from feast import OnDemandFeatureView, RedshiftSource from feast.data_source import DataSource from feast.errors import InvalidEntityType from feast.feature_view import FeatureView @@ -90,11 +90,14 @@ def pull_latest_from_table_or_query( ) WHERE _feast_row = 1 """ + # When materializing a single feature view, we don't need full feature names. On demand transforms aren't materialized return RedshiftRetrievalJob( query=query, redshift_client=redshift_client, s3_resource=s3_resource, config=config, + full_feature_names=False, + on_demand_feature_views=None, ) @staticmethod @@ -164,6 +167,10 @@ def query_generator() -> Iterator[str]: redshift_client=redshift_client, s3_resource=s3_resource, config=config, + full_feature_names=full_feature_names, + on_demand_feature_views=registry.list_on_demand_feature_views( + project=project, allow_cache=True + ), drop_columns=["entity_timestamp"] + [ f"{feature_view.name}__entity_row_unique_id" @@ -179,6 +186,8 @@ def __init__( redshift_client, s3_resource, config: RepoConfig, + full_feature_names: bool, + on_demand_feature_views: Optional[List[OnDemandFeatureView]], drop_columns: Optional[List[str]] = None, ): """Initialize RedshiftRetrievalJob object. @@ -188,6 +197,8 @@ def __init__( redshift_client: boto3 redshift-data client s3_resource: boto3 s3 resource object config: Feast repo config + full_feature_names: Whether to add the feature view prefixes to the feature names + on_demand_feature_views: A list of on demand transforms to apply at retrieval time drop_columns: Optionally a list of columns to drop before unloading to S3. This is a convenient field, since "SELECT ... EXCEPT col" isn't supported in Redshift. """ @@ -209,9 +220,19 @@ def query_generator() -> Iterator[str]: + "/unload/" + str(uuid.uuid4()) ) + self._full_feature_names = full_feature_names + self._on_demand_feature_views = on_demand_feature_views self._drop_columns = drop_columns - def to_df(self) -> pd.DataFrame: + @property + def full_feature_names(self) -> bool: + return self._full_feature_names + + @property + def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]: + return self._on_demand_feature_views + + def to_df_internal(self) -> pd.DataFrame: with self._query_generator() as query: return aws_utils.unload_redshift_query_to_df( self._redshift_client, @@ -304,7 +325,12 @@ def _upload_entity_df_and_get_entity_schema( f"CREATE TABLE {table_name} AS ({entity_df})", ) limited_entity_df = RedshiftRetrievalJob( - f"SELECT * FROM {table_name} LIMIT 1", redshift_client, s3_resource, config + f"SELECT * FROM {table_name} LIMIT 1", + redshift_client, + s3_resource, + config, + full_feature_names=False, + on_demand_feature_views=None, ).to_df() return dict(zip(limited_entity_df.columns, limited_entity_df.dtypes)) else: diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index e5210b566f..4c78a5d109 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -177,6 +177,9 @@ def _get_requested_feature_views_to_features_dict( feature_views_to_feature_map: Dict[FeatureView, List[str]] = {} for ref in feature_refs: + if ":" not in ref: + # ODFV + continue ref_parts = ref.split(":") feature_view_from_ref = ref_parts[0] feature_from_ref = ref_parts[1] diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index b5b71c164c..cac41ff44f 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -103,7 +103,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): def get_transformed_features_df( self, full_feature_names: bool, df_with_features: pd.DataFrame - ) -> pd.DataFrame: + ): # Apply on demand transformations # TODO(adchia): Include only the feature values from the specified input FVs in the ODFV. # Copy over un-prefixed features even if not requested since transform may need it From e2a7830b911c0177bb8fe7b88ffaa9c2a6d1d8f9 Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Wed, 1 Sep 2021 22:58:36 -0400 Subject: [PATCH 2/2] Merge error Signed-off-by: Danny Chiao --- sdk/python/feast/on_demand_feature_view.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index cac41ff44f..b5b71c164c 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -103,7 +103,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): def get_transformed_features_df( self, full_feature_names: bool, df_with_features: pd.DataFrame - ): + ) -> pd.DataFrame: # Apply on demand transformations # TODO(adchia): Include only the feature values from the specified input FVs in the ODFV. # Copy over un-prefixed features even if not requested since transform may need it