From 8e5490f97c9ccd721ba3423fa7922ec57eaa0604 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 7 Sep 2021 22:10:11 -0700 Subject: [PATCH 01/11] Infer features for on demand feature views Signed-off-by: Achal Shah --- sdk/python/feast/errors.py | 8 +++ sdk/python/feast/feature.py | 2 +- sdk/python/feast/feature_store.py | 3 ++ sdk/python/feast/on_demand_feature_view.py | 52 +++++++++++++++++++ sdk/python/feast/type_map.py | 28 ++++++++-- .../feature_repos/universal/feature_views.py | 10 ++-- .../registration/test_inference.py | 10 ++-- .../test_universal_odfv_feature_inference.py | 48 +++++++++++++++++ 8 files changed, 148 insertions(+), 13 deletions(-) create mode 100644 sdk/python/tests/integration/registration/test_universal_odfv_feature_inference.py diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 59736ab001..6cd72def8e 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -129,6 +129,14 @@ def __init__(self, feature_refs_collisions: List[str], full_feature_names: bool) ) +class SpecifiedFeaturesNotPresentError(Exception): + def __init__(self, specified_features: List[str], feature_view_name: str): + features = ", ".join(specified_features) + super().__init__( + f"Explicitly specified features {features} not found in inferred list of features for '{feature_view_name}'" + ) + + class FeastOnlineStoreInvalidName(Exception): def __init__(self, online_store_class_name: str): super().__init__( diff --git a/sdk/python/feast/feature.py b/sdk/python/feast/feature.py index 8e4457d3b6..5048d06ca0 100644 --- a/sdk/python/feast/feature.py +++ b/sdk/python/feast/feature.py @@ -59,7 +59,7 @@ def __lt__(self, other): def __repr__(self): # return string representation of the reference - return self.name + return f"{self.name}-{self.dtype}" def __str__(self): # readable string of the reference diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 861d42d3ae..70e33bb6eb 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -360,6 +360,9 @@ def apply( for view in views_to_update: view.infer_features_from_batch_source(self.config) + for odfv in odfvs_to_update: + odfv.infer_features_from_batch_source(self.config) + if len(views_to_update) + len(entities_to_update) + len( services_to_update ) + len(odfvs_to_update) != len(objects): diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index b5b71c164c..0d04c52ec0 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -5,6 +5,8 @@ import dill import pandas as pd +from feast import errors +from feast.errors import RegistryInferenceFailure from feast.feature import Feature from feast.feature_view import FeatureView from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( @@ -14,6 +16,11 @@ from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( UserDefinedFunction as UserDefinedFunctionProto, ) +from feast.repo_config import RepoConfig +from feast.type_map import ( + feast_value_type_to_pandas_type, + python_type_to_feast_value_type, +) from feast.usage import log_exceptions from feast.value_type import ValueType @@ -125,6 +132,51 @@ def get_transformed_features_df( df_with_features.drop(columns=columns_to_cleanup, inplace=True) return df_with_transformed_features + def infer_features_from_batch_source(self, config: RepoConfig): + """ + Infers the set of features associated to this feature view from the input source. + + Args: + config: Configuration object used to configure the feature store. + + Raises: + RegistryInferenceFailure: The set of features could not be inferred. + """ + df = pd.DataFrame() + for feature_view in self.inputs.values(): + for feature in feature_view.features: + dtype = feast_value_type_to_pandas_type(feature.dtype) + df[f"{feature_view.name}__{feature.name}"] = pd.Series(dtype=dtype) + df[f"{feature.name}"] = pd.Series(dtype=dtype) + output_df: pd.DataFrame = self.udf.__call__(df) + inferred_features = [] + for f, dt in zip(output_df.columns, output_df.dtypes): + inferred_features.append( + Feature( + name=f, dtype=python_type_to_feast_value_type(f, type_name=str(dt)) + ) + ) + + print(f"self.features: {self.features}, inferred_features: {inferred_features}") + + if self.features: + missing_features = [] + for specified_features in self.features: + if specified_features not in inferred_features: + missing_features.append(specified_features) + if missing_features: + raise errors.SpecifiedFeaturesNotPresentError( + [f.name for f in missing_features], self.name + ) + else: + self.features = inferred_features + + if not self.features: + raise RegistryInferenceFailure( + "OnDemandFeatureView", + f"Could not infer Features for the feature view '{self.name}'.", + ) + def on_demand_feature_view(features: List[Feature], inputs: Dict[str, FeatureView]): """ diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index c65ec6e14c..d7ae0cc2d5 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -14,7 +14,7 @@ import re from datetime import datetime -from typing import Any, Dict, Union +from typing import Any, Dict, Optional, Union import numpy as np import pandas as pd @@ -72,8 +72,27 @@ def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any: ) +def feast_value_type_to_pandas_type(value_type: ValueType) -> Any: + value_type_to_pandas_type: Dict[ValueType, str] = { + ValueType.FLOAT: "float", + ValueType.INT32: "int", + ValueType.INT64: "int", + ValueType.STRING: "str", + ValueType.DOUBLE: "float", + ValueType.BYTES: "bytes", + ValueType.BOOL: "bool", + ValueType.UNIX_TIMESTAMP: "Timestamp", + } + if value_type in value_type_to_pandas_type: + return value_type_to_pandas_type[value_type] + raise TypeError( + f"Casting to pandas type for type {value_type} failed. " + f"Type {value_type} not found" + ) + + def python_type_to_feast_value_type( - name: str, value, recurse: bool = True + name: str, value: Any = None, recurse: bool = True, type_name: Optional[str] = None ) -> ValueType: """ Finds the equivalent Feast Value Type for a Python value. Both native @@ -88,8 +107,7 @@ def python_type_to_feast_value_type( Returns: Feast Value Type """ - - type_name = type(value).__name__ + type_name = type_name or type(value).__name__ type_map = { "int": ValueType.INT64, @@ -151,12 +169,14 @@ def python_type_to_feast_value_type( ) return ValueType[common_item_value_type.name + "_LIST"] else: + assert value raise ValueError( f"Value type for field {name} is {value.dtype.__str__()} but " f"recursion is not allowed. Array types can only be one level " f"deep." ) + assert value return type_map[value.dtype.__str__()] diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index c8029474aa..735f97cd9e 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -1,5 +1,5 @@ from datetime import timedelta -from typing import Dict +from typing import Dict, List, Optional import pandas as pd @@ -29,12 +29,16 @@ def conv_rate_plus_100(driver_hourly_stats: pd.DataFrame) -> pd.DataFrame: def conv_rate_plus_100_feature_view( - inputs: Dict[str, FeatureView] + inputs: Dict[str, FeatureView], + infer_features: bool = False, + features: Optional[List[Feature]] = None, ) -> OnDemandFeatureView: + _features = features or [Feature("conv_rate_plus_100", ValueType.DOUBLE)] + print(f"features: {_features}") return OnDemandFeatureView( name=conv_rate_plus_100.__name__, inputs=inputs, - features=[Feature("conv_rate_plus_100", ValueType.FLOAT)], + features=[] if infer_features else _features, udf=conv_rate_plus_100, ) diff --git a/sdk/python/tests/integration/registration/test_inference.py b/sdk/python/tests/integration/registration/test_inference.py index 65ffdbf051..f45a18ea55 100644 --- a/sdk/python/tests/integration/registration/test_inference.py +++ b/sdk/python/tests/integration/registration/test_inference.py @@ -1,9 +1,4 @@ import pytest -from utils.data_source_utils import ( - prep_file_source, - simple_bq_source_using_query_arg, - simple_bq_source_using_table_ref_arg, -) from feast import Entity, RepoConfig, ValueType from feast.errors import RegistryInferenceFailure @@ -12,6 +7,11 @@ update_data_sources_with_inferred_event_timestamp_col, update_entities_with_inferred_types_from_feature_views, ) +from tests.utils.data_source_utils import ( + prep_file_source, + simple_bq_source_using_query_arg, + simple_bq_source_using_table_ref_arg, +) def test_update_entities_with_inferred_types_from_feature_views( diff --git a/sdk/python/tests/integration/registration/test_universal_odfv_feature_inference.py b/sdk/python/tests/integration/registration/test_universal_odfv_feature_inference.py new file mode 100644 index 0000000000..b59a5f9350 --- /dev/null +++ b/sdk/python/tests/integration/registration/test_universal_odfv_feature_inference.py @@ -0,0 +1,48 @@ +import pytest + +from feast import Feature, ValueType +from feast.errors import SpecifiedFeaturesNotPresentError +from tests.integration.feature_repos.universal.entities import customer, driver +from tests.integration.feature_repos.universal.feature_views import ( + conv_rate_plus_100_feature_view, + create_driver_hourly_stats_feature_view, +) + + +@pytest.mark.integration +@pytest.mark.parametrize("infer_features", [True, False], ids=lambda v: str(v)) +def test_infer_odfv_features(environment, universal_data_sources, infer_features): + store = environment.feature_store + + (entities, datasets, data_sources) = universal_data_sources + + driver_hourly_stats = create_driver_hourly_stats_feature_view( + data_sources["driver"] + ) + driver_odfv = conv_rate_plus_100_feature_view( + {"driver": driver_hourly_stats}, infer_features=infer_features + ) + + feast_objects = [driver_hourly_stats, driver_odfv, driver(), customer()] + store.apply(feast_objects) + odfv = store.get_on_demand_feature_view("conv_rate_plus_100") + assert len(odfv.features) == 1 + + +@pytest.mark.integration +def test_infer_odfv_features_with_error(environment, universal_data_sources): + store = environment.feature_store + + (entities, datasets, data_sources) = universal_data_sources + + features = [Feature("conv_rate_plus_200", ValueType.DOUBLE)] + driver_hourly_stats = create_driver_hourly_stats_feature_view( + data_sources["driver"] + ) + driver_odfv = conv_rate_plus_100_feature_view( + {"driver": driver_hourly_stats}, features=features + ) + + feast_objects = [driver_hourly_stats, driver_odfv, driver(), customer()] + with pytest.raises(SpecifiedFeaturesNotPresentError): + store.apply(feast_objects) From a01e8bb5f772fe35930545ce268a879fba05ab84 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 9 Sep 2021 09:11:24 -0700 Subject: [PATCH 02/11] Update all feature_refs Signed-off-by: Achal Shah --- sdk/python/feast/feature_store.py | 90 ++++++++++++------- sdk/python/feast/infra/offline_stores/file.py | 4 +- .../infra/offline_stores/offline_utils.py | 4 +- sdk/python/feast/infra/provider.py | 26 ++++-- .../test_universal_historical_retrieval.py | 4 +- .../online_store/test_universal_online.py | 16 ++-- 6 files changed, 92 insertions(+), 52 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 049e734ae7..1e58db6d7e 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -489,8 +489,11 @@ def get_historical_features( _feature_refs = self._get_features(features, feature_refs) all_feature_views = self._registry.list_feature_views(project=self.project) + all_on_demand_feature_views = self._registry.list_on_demand_feature_views(project=self.project) + + fvs = _group_feature_refs(_feature_refs, all_feature_views, all_on_demand_feature_views) feature_views = list( - view for view, _ in _group_feature_refs(_feature_refs, all_feature_views) + view for view, _ in fvs if isinstance(view, FeatureView) ) _validate_feature_refs(_feature_refs, full_feature_names) @@ -754,9 +757,15 @@ def get_online_features( all_feature_views = self._registry.list_feature_views( project=self.project, allow_cache=True ) + all_on_demand_feature_views = self._registry.list_on_demand_feature_views( + project=self.project, allow_cache=True + ) + _validate_feature_refs(_feature_refs, full_feature_names) - grouped_refs = _group_feature_refs(_feature_refs, all_feature_views) + grouped_refs = _group_feature_refs(_feature_refs, all_feature_views, all_on_demand_feature_views) for table, requested_features in grouped_refs: + if not isinstance(table, FeatureView): + continue entity_keys = _get_table_entity_keys( table, union_of_entity_keys, entity_name_to_join_key_map ) @@ -809,32 +818,43 @@ def _augment_response_with_on_demand_transforms( initial_response: OnlineResponse, result_rows: List[GetOnlineFeaturesResponse.FieldValues], ) -> OnlineResponse: - all_on_demand_feature_views = self._registry.list_on_demand_feature_views( - project=self.project, allow_cache=True - ) + all_on_demand_feature_views = { + view.name: view for view in self._registry.list_on_demand_feature_views( + project=self.project, allow_cache=True + ) + } + all_odfv_feature_names = all_on_demand_feature_views.keys() + if len(all_on_demand_feature_views) == 0: return initial_response initial_response_df = initial_response.to_df() + + odfv_feature_refs = defaultdict(list) + for feature_ref in feature_refs: + view_name, feature_name = feature_ref.split(":")[0], feature_ref.split(":")[1] + if view_name in all_odfv_feature_names: + odfv_feature_refs[view_name].append(feature_name) + # Apply on demand transformations - for odfv in all_on_demand_feature_views: - feature_ref = odfv.name - if feature_ref in feature_refs: - transformed_features_df = odfv.get_transformed_features_df( - full_feature_names, initial_response_df - ) - for row_idx in range(len(result_rows)): - result_row = result_rows[row_idx] - # TODO(adchia): support multiple output features in an ODFV, which requires different naming - # conventions - result_row.fields[odfv.name].CopyFrom( - python_value_to_proto_value( - transformed_features_df[odfv.features[0].name].values[ - row_idx - ] - ) - ) + for odfv_name, _feature_refs in odfv_feature_refs.items(): + odfv = all_on_demand_feature_views[odfv_name] + transformed_features_df = odfv.get_transformed_features_df( + full_feature_names, initial_response_df + ) + for row_idx in range(len(result_rows)): + result_row = result_rows[row_idx] + + selected_subset = [f for f in transformed_features_df.columns if f in _feature_refs] + + for transformed_feature in selected_subset: + transformed_feature_name = f"{odfv.name}__{transformed_feature}" if full_feature_names else transformed_feature + print(f"transformed_feature: {transformed_feature}, transformed_feature_name: {transformed_feature_name}") + proto_value = python_value_to_proto_value(transformed_features_df[transformed_feature].values[row_idx]) + print(f"transformed_feature: {proto_value}, `{type(proto_value)}` ") + print(f"result_row: {result_row.fields[transformed_feature_name]}") + result_row.fields[transformed_feature_name].CopyFrom(proto_value) result_row.statuses[ - feature_ref + transformed_feature_name ] = GetOnlineFeaturesResponse.FieldStatus.PRESENT return OnlineResponse(GetOnlineFeaturesResponse(field_values=result_rows)) @@ -887,25 +907,33 @@ def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = F def _group_feature_refs( - features: Union[List[str], FeatureService], all_feature_views: List[FeatureView] -) -> List[Tuple[FeatureView, List[str]]]: + features: Union[List[str], FeatureService], + all_feature_views: List[FeatureView], + all_on_demand_feature_views: List[OnDemandFeatureView] +) -> List[Union[Tuple[FeatureView, List[str]], Tuple[OnDemandFeatureView, List[str]]]]: """ Get list of feature views and corresponding feature names based on feature references""" # view name to view proto view_index = {view.name: view for view in all_feature_views} + # on demand view to on demand view proto + on_demand_view_index = {view.name: view for view in all_on_demand_feature_views} + # view name to feature names views_features = defaultdict(list) + # on demand view name to feature names + on_demand_view_features = defaultdict(list) + if isinstance(features, list) and isinstance(features[0], str): for ref in features: - if ":" not in ref: - # This is an on demand feature view ref - continue view_name, feat_name = ref.split(":") - if view_name not in view_index: + if view_name in view_index: + views_features[view_name].append(feat_name) + elif view_name in on_demand_view_index: + on_demand_view_features[view_name].append(feat_name) + else: raise FeatureViewNotFoundException(view_name) - views_features[view_name].append(feat_name) elif isinstance(features, FeatureService): for feature_projection in features.features: projected_features = feature_projection.features @@ -916,6 +944,8 @@ def _group_feature_refs( result = [] for view_name, feature_names in views_features.items(): result.append((view_index[view_name], feature_names)) + for view_name, feature_names in on_demand_view_features.items(): + result.append((on_demand_view_features[view_name], feature_names)) return result diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 1afa5df08d..b1c6f1c88a 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -91,8 +91,8 @@ def get_historical_features( raise ValueError( f"Please provide an entity_df with a column named {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL} representing the time of events." ) - feature_views_to_features = _get_requested_feature_views_to_features_dict( - feature_refs, feature_views + feature_views_to_features, on_demand_feature_views_to_features = _get_requested_feature_views_to_features_dict( + feature_refs, feature_views + registry.list_on_demand_feature_views(config.project) ) # Create lazy function that is only called from the RetrievalJob object diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index f9125ab156..ecd56326d6 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -96,8 +96,8 @@ def get_feature_view_query_context( ) -> List[FeatureViewQueryContext]: """Build a query context containing all information required to template a BigQuery and Redshift point-in-time SQL query""" - feature_views_to_feature_map = _get_requested_feature_views_to_features_dict( - feature_refs, feature_views + feature_views_to_feature_map, on_demand_feature_views_to_features, = _get_requested_feature_views_to_features_dict( + feature_refs, feature_views + registry.list_on_demand_feature_views(project) ) query_context = [] diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 4c78a5d109..c723d9f4ae 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -11,6 +11,7 @@ from feast.entity import Entity from feast.feature_table import FeatureTable from feast.feature_view import FeatureView +from feast.on_demand_feature_view import OnDemandFeatureView from feast.infra.offline_stores.offline_store import RetrievalJob from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto @@ -169,17 +170,15 @@ def get_provider(config: RepoConfig, repo_path: Path) -> Provider: def _get_requested_feature_views_to_features_dict( - feature_refs: List[str], feature_views: List[FeatureView] -) -> Dict[FeatureView, List[str]]: + feature_refs: List[str], feature_views: List[Union[FeatureView, OnDemandFeatureView]] +) -> Tuple[Dict[FeatureView, List[str]], Dict[OnDemandFeatureView, List[str]]]: """Create a dict of FeatureView -> List[Feature] for all requested features. Set full_feature_names to True to have feature names prefixed by their feature view name.""" feature_views_to_feature_map: Dict[FeatureView, List[str]] = {} + on_demand_feature_views_to_feature_map: Dict[OnDemandFeatureView, List[str]] = {} for ref in feature_refs: - if ":" not in ref: - # ODFV - continue ref_parts = ref.split(":") feature_view_from_ref = ref_parts[0] feature_from_ref = ref_parts[1] @@ -192,15 +191,24 @@ def _get_requested_feature_views_to_features_dict( feature_views_to_feature_map[feature_view_from_registry].append( feature_from_ref ) - else: - feature_views_to_feature_map[feature_view_from_registry] = [ + elif feature_view_from_registry in on_demand_feature_views_to_feature_map: + on_demand_feature_views_to_feature_map[feature_view_from_registry].append( feature_from_ref - ] + ) + else: + if isinstance(feature_view_from_registry, OnDemandFeatureView): + on_demand_feature_views_to_feature_map[feature_view_from_registry] = [ + feature_from_ref + ] + else: + feature_views_to_feature_map[feature_view_from_registry] = [ + feature_from_ref + ] if not found: raise ValueError(f"Could not find feature view from reference {ref}") - return feature_views_to_feature_map + return feature_views_to_feature_map, on_demand_feature_views_to_feature_map def _get_column_names( diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index 1d3ae7edb5..269c993e44 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -192,7 +192,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n "customer_profile:current_balance", "customer_profile:avg_passenger_count", "customer_profile:lifetime_trip_count", - "conv_rate_plus_100", + "conv_rate_plus_100:conv_rate_plus_100", ], full_feature_names=full_feature_names, ) @@ -250,7 +250,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n "customer_profile:current_balance", "customer_profile:avg_passenger_count", "customer_profile:lifetime_trip_count", - "conv_rate_plus_100", + "conv_rate_plus_100:conv_rate_plus_100", ], full_feature_names=full_feature_names, ) diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index fc1404298b..7621efd0bf 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -44,9 +44,11 @@ def test_online_retrieval(environment, universal_data_sources, full_feature_name "customer_profile:current_balance", "customer_profile:avg_passenger_count", "customer_profile:lifetime_trip_count", - "conv_rate_plus_100", + "conv_rate_plus_100:conv_rate_plus_100", ] unprefixed_feature_refs = [f.rsplit(":", 1)[-1] for f in feature_refs if ":" in f] + # Remove the on demand feature view, since it's not present in the source dataframe + unprefixed_feature_refs.remove("conv_rate_plus_100") online_features = fs.get_online_features( features=feature_refs, @@ -55,21 +57,18 @@ def test_online_retrieval(environment, universal_data_sources, full_feature_name ) assert online_features is not None - keys = online_features.to_dict().keys() + online_features_dict = online_features.to_dict() + keys = online_features_dict.keys() assert ( len(keys) == len(feature_refs) + 2 ) # Add two for the driver id and the customer id entity keys. for feature in feature_refs: - if ":" in feature: - # This is the ODFV - continue if full_feature_names: assert feature.replace(":", "__") in keys else: assert feature.rsplit(":", 1)[-1] in keys assert "driver_stats" not in keys and "customer_profile" not in keys - online_features_dict = online_features.to_dict() tc = unittest.TestCase() for i, entity_row in enumerate(entity_rows): df_features = get_latest_feature_values_from_dataframes( @@ -79,7 +78,7 @@ def test_online_retrieval(environment, universal_data_sources, full_feature_name assert df_features["customer_id"] == online_features_dict["customer_id"][i] assert df_features["driver_id"] == online_features_dict["driver_id"][i] assert ( - online_features_dict["conv_rate_plus_100"][i] + online_features_dict[response_feature_name("conv_rate_plus_100", full_feature_names)][i] == df_features["conv_rate"] + 100 ) for unprefixed_feature_ref in unprefixed_feature_refs: @@ -115,6 +114,9 @@ def response_feature_name(feature: str, full_feature_names: bool) -> str: if feature in {"conv_rate", "avg_daily_trips"} and full_feature_names: return f"driver_stats__{feature}" + if feature in {"conv_rate_plus_100"} and full_feature_names: + return f"conv_rate_plus_100__{feature}" + return feature From 8c3c6257f0a8d440271a28bbd659fb525e64f928 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 9 Sep 2021 09:11:59 -0700 Subject: [PATCH 03/11] make format Signed-off-by: Achal Shah --- sdk/python/feast/feature_store.py | 42 +++++++++++++------ sdk/python/feast/infra/offline_stores/file.py | 8 +++- .../infra/offline_stores/offline_utils.py | 5 ++- sdk/python/feast/infra/provider.py | 21 ++++++---- .../online_store/test_universal_online.py | 4 +- 5 files changed, 55 insertions(+), 25 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 1e58db6d7e..28196e20b5 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -489,12 +489,14 @@ def get_historical_features( _feature_refs = self._get_features(features, feature_refs) all_feature_views = self._registry.list_feature_views(project=self.project) - all_on_demand_feature_views = self._registry.list_on_demand_feature_views(project=self.project) + all_on_demand_feature_views = self._registry.list_on_demand_feature_views( + project=self.project + ) - fvs = _group_feature_refs(_feature_refs, all_feature_views, all_on_demand_feature_views) - feature_views = list( - view for view, _ in fvs if isinstance(view, FeatureView) + fvs = _group_feature_refs( + _feature_refs, all_feature_views, all_on_demand_feature_views ) + feature_views = list(view for view, _ in fvs if isinstance(view, FeatureView)) _validate_feature_refs(_feature_refs, full_feature_names) @@ -762,7 +764,9 @@ def get_online_features( ) _validate_feature_refs(_feature_refs, full_feature_names) - grouped_refs = _group_feature_refs(_feature_refs, all_feature_views, all_on_demand_feature_views) + grouped_refs = _group_feature_refs( + _feature_refs, all_feature_views, all_on_demand_feature_views + ) for table, requested_features in grouped_refs: if not isinstance(table, FeatureView): continue @@ -819,7 +823,8 @@ def _augment_response_with_on_demand_transforms( result_rows: List[GetOnlineFeaturesResponse.FieldValues], ) -> OnlineResponse: all_on_demand_feature_views = { - view.name: view for view in self._registry.list_on_demand_feature_views( + view.name: view + for view in self._registry.list_on_demand_feature_views( project=self.project, allow_cache=True ) } @@ -831,7 +836,10 @@ def _augment_response_with_on_demand_transforms( odfv_feature_refs = defaultdict(list) for feature_ref in feature_refs: - view_name, feature_name = feature_ref.split(":")[0], feature_ref.split(":")[1] + view_name, feature_name = ( + feature_ref.split(":")[0], + feature_ref.split(":")[1], + ) if view_name in all_odfv_feature_names: odfv_feature_refs[view_name].append(feature_name) @@ -844,12 +852,22 @@ def _augment_response_with_on_demand_transforms( for row_idx in range(len(result_rows)): result_row = result_rows[row_idx] - selected_subset = [f for f in transformed_features_df.columns if f in _feature_refs] + selected_subset = [ + f for f in transformed_features_df.columns if f in _feature_refs + ] for transformed_feature in selected_subset: - transformed_feature_name = f"{odfv.name}__{transformed_feature}" if full_feature_names else transformed_feature - print(f"transformed_feature: {transformed_feature}, transformed_feature_name: {transformed_feature_name}") - proto_value = python_value_to_proto_value(transformed_features_df[transformed_feature].values[row_idx]) + transformed_feature_name = ( + f"{odfv.name}__{transformed_feature}" + if full_feature_names + else transformed_feature + ) + print( + f"transformed_feature: {transformed_feature}, transformed_feature_name: {transformed_feature_name}" + ) + proto_value = python_value_to_proto_value( + transformed_features_df[transformed_feature].values[row_idx] + ) print(f"transformed_feature: {proto_value}, `{type(proto_value)}` ") print(f"result_row: {result_row.fields[transformed_feature_name]}") result_row.fields[transformed_feature_name].CopyFrom(proto_value) @@ -909,7 +927,7 @@ def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = F def _group_feature_refs( features: Union[List[str], FeatureService], all_feature_views: List[FeatureView], - all_on_demand_feature_views: List[OnDemandFeatureView] + all_on_demand_feature_views: List[OnDemandFeatureView], ) -> List[Union[Tuple[FeatureView, List[str]], Tuple[OnDemandFeatureView, List[str]]]]: """ Get list of feature views and corresponding feature names based on feature references""" diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index b1c6f1c88a..8cce16560b 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -91,8 +91,12 @@ def get_historical_features( raise ValueError( f"Please provide an entity_df with a column named {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL} representing the time of events." ) - feature_views_to_features, on_demand_feature_views_to_features = _get_requested_feature_views_to_features_dict( - feature_refs, feature_views + registry.list_on_demand_feature_views(config.project) + ( + feature_views_to_features, + on_demand_feature_views_to_features, + ) = _get_requested_feature_views_to_features_dict( + feature_refs, + feature_views + registry.list_on_demand_feature_views(config.project), ) # Create lazy function that is only called from the RetrievalJob object diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index ecd56326d6..1c9cd62e4f 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -96,7 +96,10 @@ def get_feature_view_query_context( ) -> List[FeatureViewQueryContext]: """Build a query context containing all information required to template a BigQuery and Redshift point-in-time SQL query""" - feature_views_to_feature_map, on_demand_feature_views_to_features, = _get_requested_feature_views_to_features_dict( + ( + feature_views_to_feature_map, + on_demand_feature_views_to_features, + ) = _get_requested_feature_views_to_features_dict( feature_refs, feature_views + registry.list_on_demand_feature_views(project) ) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index c723d9f4ae..3cb7ddbc0f 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -11,8 +11,8 @@ from feast.entity import Entity from feast.feature_table import FeatureTable from feast.feature_view import FeatureView -from feast.on_demand_feature_view import OnDemandFeatureView from feast.infra.offline_stores.offline_store import RetrievalJob +from feast.on_demand_feature_view import OnDemandFeatureView from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.registry import Registry @@ -170,7 +170,8 @@ def get_provider(config: RepoConfig, repo_path: Path) -> Provider: def _get_requested_feature_views_to_features_dict( - feature_refs: List[str], feature_views: List[Union[FeatureView, OnDemandFeatureView]] + feature_refs: List[str], + feature_views: List[Union[FeatureView, OnDemandFeatureView]], ) -> Tuple[Dict[FeatureView, List[str]], Dict[OnDemandFeatureView, List[str]]]: """Create a dict of FeatureView -> List[Feature] for all requested features. Set full_feature_names to True to have feature names prefixed by their feature view name.""" @@ -191,15 +192,17 @@ def _get_requested_feature_views_to_features_dict( feature_views_to_feature_map[feature_view_from_registry].append( feature_from_ref ) - elif feature_view_from_registry in on_demand_feature_views_to_feature_map: - on_demand_feature_views_to_feature_map[feature_view_from_registry].append( - feature_from_ref - ) + elif ( + feature_view_from_registry in on_demand_feature_views_to_feature_map + ): + on_demand_feature_views_to_feature_map[ + feature_view_from_registry + ].append(feature_from_ref) else: if isinstance(feature_view_from_registry, OnDemandFeatureView): - on_demand_feature_views_to_feature_map[feature_view_from_registry] = [ - feature_from_ref - ] + on_demand_feature_views_to_feature_map[ + feature_view_from_registry + ] = [feature_from_ref] else: feature_views_to_feature_map[feature_view_from_registry] = [ feature_from_ref diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 7621efd0bf..0b69b4910c 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -78,7 +78,9 @@ def test_online_retrieval(environment, universal_data_sources, full_feature_name assert df_features["customer_id"] == online_features_dict["customer_id"][i] assert df_features["driver_id"] == online_features_dict["driver_id"][i] assert ( - online_features_dict[response_feature_name("conv_rate_plus_100", full_feature_names)][i] + online_features_dict[ + response_feature_name("conv_rate_plus_100", full_feature_names) + ][i] == df_features["conv_rate"] + 100 ) for unprefixed_feature_ref in unprefixed_feature_refs: From 6e9f476cc21c765880a184ef99ac52a78a2b311d Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 9 Sep 2021 09:24:36 -0700 Subject: [PATCH 04/11] Refactor Signed-off-by: Achal Shah --- sdk/python/feast/feature_store.py | 6 ++- sdk/python/feast/infra/offline_stores/file.py | 3 +- .../infra/offline_stores/offline_utils.py | 8 ++-- sdk/python/feast/infra/provider.py | 38 ++++++++----------- 4 files changed, 27 insertions(+), 28 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 28196e20b5..eda77afed6 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -959,11 +959,13 @@ def _group_feature_refs( [f.name for f in projected_features] ) - result = [] + result: List[ + Union[Tuple[FeatureView, List[str]], Tuple[OnDemandFeatureView, List[str]]] + ] = [] for view_name, feature_names in views_features.items(): result.append((view_index[view_name], feature_names)) for view_name, feature_names in on_demand_view_features.items(): - result.append((on_demand_view_features[view_name], feature_names)) + result.append((on_demand_view_index[view_name], feature_names)) return result diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 8cce16560b..8067682edc 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -96,7 +96,8 @@ def get_historical_features( on_demand_feature_views_to_features, ) = _get_requested_feature_views_to_features_dict( feature_refs, - feature_views + registry.list_on_demand_feature_views(config.project), + feature_views, + registry.list_on_demand_feature_views(config.project), ) # Create lazy function that is only called from the RetrievalJob object diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index 1c9cd62e4f..82e1661d51 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -2,7 +2,7 @@ import uuid from dataclasses import asdict, dataclass from datetime import timedelta -from typing import Any, Dict, List, Optional, Set, Tuple +from typing import Any, Dict, List, Optional, Set, Tuple, Union import numpy as np import pandas as pd @@ -16,8 +16,10 @@ FeastEntityDFMissingColumnsError, FeastModuleImportError, ) +from feast.feature_view import FeatureView from feast.infra.offline_stores.offline_store import OfflineStore from feast.infra.provider import _get_requested_feature_views_to_features_dict +from feast.on_demand_feature_view import OnDemandFeatureView from feast.registry import Registry DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL = "event_timestamp" @@ -90,7 +92,7 @@ class FeatureViewQueryContext: def get_feature_view_query_context( feature_refs: List[str], - feature_views: List["feast.FeatureView"], + feature_views: List[FeatureView], registry: Registry, project: str, ) -> List[FeatureViewQueryContext]: @@ -100,7 +102,7 @@ def get_feature_view_query_context( feature_views_to_feature_map, on_demand_feature_views_to_features, ) = _get_requested_feature_views_to_features_dict( - feature_refs, feature_views + registry.list_on_demand_feature_views(project) + feature_refs, feature_views, registry.list_on_demand_feature_views(project) ) query_context = [] diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 3cb7ddbc0f..ada4ddefdb 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -1,4 +1,5 @@ import abc +from collections import defaultdict from datetime import datetime from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union @@ -171,13 +172,16 @@ def get_provider(config: RepoConfig, repo_path: Path) -> Provider: def _get_requested_feature_views_to_features_dict( feature_refs: List[str], - feature_views: List[Union[FeatureView, OnDemandFeatureView]], + feature_views: List[FeatureView], + on_demand_feature_views: List[OnDemandFeatureView], ) -> Tuple[Dict[FeatureView, List[str]], Dict[OnDemandFeatureView, List[str]]]: """Create a dict of FeatureView -> List[Feature] for all requested features. Set full_feature_names to True to have feature names prefixed by their feature view name.""" - feature_views_to_feature_map: Dict[FeatureView, List[str]] = {} - on_demand_feature_views_to_feature_map: Dict[OnDemandFeatureView, List[str]] = {} + feature_views_to_feature_map: Dict[FeatureView, List[str]] = defaultdict(list) + on_demand_feature_views_to_feature_map: Dict[ + OnDemandFeatureView, List[str] + ] = defaultdict(list) for ref in feature_refs: ref_parts = ref.split(":") @@ -188,25 +192,15 @@ def _get_requested_feature_views_to_features_dict( for feature_view_from_registry in feature_views: if feature_view_from_registry.name == feature_view_from_ref: found = True - if feature_view_from_registry in feature_views_to_feature_map: - feature_views_to_feature_map[feature_view_from_registry].append( - feature_from_ref - ) - elif ( - feature_view_from_registry in on_demand_feature_views_to_feature_map - ): - on_demand_feature_views_to_feature_map[ - feature_view_from_registry - ].append(feature_from_ref) - else: - if isinstance(feature_view_from_registry, OnDemandFeatureView): - on_demand_feature_views_to_feature_map[ - feature_view_from_registry - ] = [feature_from_ref] - else: - feature_views_to_feature_map[feature_view_from_registry] = [ - feature_from_ref - ] + feature_views_to_feature_map[feature_view_from_registry].append( + feature_from_ref + ) + for odfv_from_registry in on_demand_feature_views: + if odfv_from_registry.name == feature_view_from_ref: + found = True + on_demand_feature_views_to_feature_map[odfv_from_registry].append( + feature_from_ref + ) if not found: raise ValueError(f"Could not find feature view from reference {ref}") From 3fd5c47ffd2ba0d7578d1568602ec9bec40e0064 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 9 Sep 2021 09:28:48 -0700 Subject: [PATCH 05/11] remove imports Signed-off-by: Achal Shah --- sdk/python/feast/infra/offline_stores/offline_utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index 82e1661d51..635ce69e8c 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -2,7 +2,7 @@ import uuid from dataclasses import asdict, dataclass from datetime import timedelta -from typing import Any, Dict, List, Optional, Set, Tuple, Union +from typing import Any, Dict, List, Optional, Set, Tuple import numpy as np import pandas as pd @@ -19,7 +19,6 @@ from feast.feature_view import FeatureView from feast.infra.offline_stores.offline_store import OfflineStore from feast.infra.provider import _get_requested_feature_views_to_features_dict -from feast.on_demand_feature_view import OnDemandFeatureView from feast.registry import Registry DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL = "event_timestamp" From 6a6fa08db03face7378dae7ee3b462aaeacc3175 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 9 Sep 2021 09:36:12 -0700 Subject: [PATCH 06/11] remove a wierd union in the return type for _group_refs Signed-off-by: Achal Shah --- sdk/python/feast/feature_store.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index eda77afed6..cd3b3663e7 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -493,7 +493,10 @@ def get_historical_features( project=self.project ) - fvs = _group_feature_refs( + # TODO(achal): _group_feature_refs returns the on demand feature views, but it's no passed into the provider. + # This is a weird interface quirk - we should revisit the `get_historical_features` to + # pass in the on demand feature views as well. + fvs, _ = _group_feature_refs( _feature_refs, all_feature_views, all_on_demand_feature_views ) feature_views = list(view for view, _ in fvs if isinstance(view, FeatureView)) @@ -764,7 +767,7 @@ def get_online_features( ) _validate_feature_refs(_feature_refs, full_feature_names) - grouped_refs = _group_feature_refs( + grouped_refs, _ = _group_feature_refs( _feature_refs, all_feature_views, all_on_demand_feature_views ) for table, requested_features in grouped_refs: @@ -928,7 +931,9 @@ def _group_feature_refs( features: Union[List[str], FeatureService], all_feature_views: List[FeatureView], all_on_demand_feature_views: List[OnDemandFeatureView], -) -> List[Union[Tuple[FeatureView, List[str]], Tuple[OnDemandFeatureView, List[str]]]]: +) -> Tuple[ + List[Tuple[FeatureView, List[str]]], List[Tuple[OnDemandFeatureView, List[str]]] +]: """ Get list of feature views and corresponding feature names based on feature references""" # view name to view proto @@ -959,14 +964,14 @@ def _group_feature_refs( [f.name for f in projected_features] ) - result: List[ - Union[Tuple[FeatureView, List[str]], Tuple[OnDemandFeatureView, List[str]]] - ] = [] + fvs_result: List[Tuple[FeatureView, List[str]]] = [] + odfvs_result: List[Tuple[OnDemandFeatureView, List[str]]] = [] + for view_name, feature_names in views_features.items(): - result.append((view_index[view_name], feature_names)) + fvs_result.append((view_index[view_name], feature_names)) for view_name, feature_names in on_demand_view_features.items(): - result.append((on_demand_view_index[view_name], feature_names)) - return result + odfvs_result.append((on_demand_view_index[view_name], feature_names)) + return fvs_result, odfvs_result def _get_feature_refs_from_feature_services( From 445e984d729915dcf3570f1b2b4f1a7002efbe55 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 9 Sep 2021 10:31:25 -0700 Subject: [PATCH 07/11] use self._list Signed-off-by: Achal Shah --- sdk/python/feast/feature_store.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index e734cdf330..ee29450fe1 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -779,8 +779,8 @@ def get_online_features( >>> online_response_dict = online_response.to_dict() """ _feature_refs = self._get_features(features, feature_refs) - all_feature_views = self._registry.list_feature_views( - project=self.project, allow_cache=True + all_feature_views = self._list_feature_views( + allow_cache=True, hide_dummy_entity=False ) all_on_demand_feature_views = self._registry.list_on_demand_feature_views( project=self.project, allow_cache=True From 955b92b217035a073899d8dc006daef3b789c2ce Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 9 Sep 2021 10:52:59 -0700 Subject: [PATCH 08/11] build template Signed-off-by: Achal Shah --- sdk/python/feast/feature_store.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index ee29450fe1..456b182919 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -546,7 +546,7 @@ def get_historical_features( fvs, _ = _group_feature_refs( _feature_refs, all_feature_views, all_on_demand_feature_views ) - feature_views = list(view for view, _ in fvs if isinstance(view, FeatureView)) + feature_views = list(view for view, _ in fvs) _validate_feature_refs(_feature_refs, full_feature_names) @@ -826,8 +826,6 @@ def get_online_features( result_rows.append(_entity_row_to_field_values(entity_row_proto)) for table, requested_features in grouped_refs: - if not isinstance(table, FeatureView): - continue entity_keys = _get_table_entity_keys( table, union_of_entity_keys, entity_name_to_join_key_map ) From 3e909a2fd1fc713b79892ab18c4f89566bd9ca41 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 9 Sep 2021 11:13:11 -0700 Subject: [PATCH 09/11] fix bug and remove prints Signed-off-by: Achal Shah --- sdk/python/feast/feature_store.py | 2 +- sdk/python/feast/on_demand_feature_view.py | 2 -- .../tests/integration/feature_repos/universal/feature_views.py | 1 - 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 456b182919..422c876b25 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -535,7 +535,7 @@ def get_historical_features( _feature_refs = self._get_features(features, feature_refs) - all_feature_views = self._registry.list_feature_views(project=self.project) + all_feature_views = self.list_feature_views() all_on_demand_feature_views = self._registry.list_on_demand_feature_views( project=self.project ) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 0d04c52ec0..e6fc70b0e0 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -157,8 +157,6 @@ def infer_features_from_batch_source(self, config: RepoConfig): ) ) - print(f"self.features: {self.features}, inferred_features: {inferred_features}") - if self.features: missing_features = [] for specified_features in self.features: diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index 6aa247a1e0..d8afdba484 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -49,7 +49,6 @@ def conv_rate_plus_100_feature_view( features: Optional[List[Feature]] = None, ) -> OnDemandFeatureView: _features = features or [Feature("conv_rate_plus_100", ValueType.DOUBLE)] - print(f"features: {_features}") return OnDemandFeatureView( name=conv_rate_plus_100.__name__, inputs=inputs, From 729e77d390c807a1c1b3e0f91b45c9b973df598d Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 9 Sep 2021 11:14:48 -0700 Subject: [PATCH 10/11] Remove more prints Signed-off-by: Achal Shah --- sdk/python/feast/feature_store.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 422c876b25..528d22b948 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -918,14 +918,9 @@ def _augment_response_with_on_demand_transforms( if full_feature_names else transformed_feature ) - print( - f"transformed_feature: {transformed_feature}, transformed_feature_name: {transformed_feature_name}" - ) proto_value = python_value_to_proto_value( transformed_features_df[transformed_feature].values[row_idx] ) - print(f"transformed_feature: {proto_value}, `{type(proto_value)}` ") - print(f"result_row: {result_row.fields[transformed_feature_name]}") result_row.fields[transformed_feature_name].CopyFrom(proto_value) result_row.statuses[ transformed_feature_name From bb914b643fef8b1446d201dece2100322f2e8ae7 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 9 Sep 2021 14:02:31 -0700 Subject: [PATCH 11/11] CR updates Signed-off-by: Achal Shah --- sdk/python/feast/feature_store.py | 5 +---- sdk/python/feast/type_map.py | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 528d22b948..8987255743 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -892,10 +892,7 @@ def _augment_response_with_on_demand_transforms( odfv_feature_refs = defaultdict(list) for feature_ref in feature_refs: - view_name, feature_name = ( - feature_ref.split(":")[0], - feature_ref.split(":")[1], - ) + view_name, feature_name = feature_ref.split(":") if view_name in all_odfv_feature_names: odfv_feature_refs[view_name].append(feature_name) diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 647975008e..7646ad0c90 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -81,7 +81,7 @@ def feast_value_type_to_pandas_type(value_type: ValueType) -> Any: ValueType.DOUBLE: "float", ValueType.BYTES: "bytes", ValueType.BOOL: "bool", - ValueType.UNIX_TIMESTAMP: "Timestamp", + ValueType.UNIX_TIMESTAMP: "datetime", } if value_type in value_type_to_pandas_type: return value_type_to_pandas_type[value_type]