diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 15de4a5d29..c75c22b6d2 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -46,6 +46,7 @@ from feast.inference import ( update_data_sources_with_inferred_event_timestamp_col, update_entities_with_inferred_types_from_feature_views, + update_feature_views_with_inferred_features, ) from feast.infra.provider import Provider, RetrievalJob, get_provider from feast.on_demand_feature_view import OnDemandFeatureView @@ -479,8 +480,9 @@ def apply( [view.batch_source for view in views_to_update], self.config ) - for view in views_to_update: - view.infer_features_from_batch_source(self.config) + update_feature_views_with_inferred_features( + views_to_update, entities_to_update, self.config + ) for odfv in odfvs_to_update: odfv.infer_features() diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index a8186991cd..ac8abefeb0 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import copy -import re import warnings from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Type, Union @@ -22,7 +21,6 @@ from feast import utils from feast.base_feature_view import BaseFeatureView from feast.data_source import DataSource -from feast.errors import RegistryInferenceFailure from feast.feature import Feature from feast.feature_view_projection import FeatureViewProjection from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto @@ -35,7 +33,6 @@ from feast.protos.feast.core.FeatureView_pb2 import ( MaterializationInterval as MaterializationIntervalProto, ) -from feast.repo_config import RepoConfig from feast.usage import log_exceptions from feast.value_type import ValueType @@ -406,69 +403,3 @@ def most_recent_end_time(self) -> Optional[datetime]: if len(self.materialization_intervals) == 0: return None return max([interval[1] for interval in self.materialization_intervals]) - - def infer_features_from_batch_source(self, config: RepoConfig): - """ - Infers the set of features associated to this feature view from the input source. - - Args: - config: Configuration object used to configure the feature store. - - Raises: - RegistryInferenceFailure: The set of features could not be inferred. - """ - if not self.features: - columns_to_exclude = { - self.batch_source.event_timestamp_column, - self.batch_source.created_timestamp_column, - } | set(self.entities) - - if ( - self.batch_source.event_timestamp_column - in self.batch_source.field_mapping - ): - columns_to_exclude.add( - self.batch_source.field_mapping[ - self.batch_source.event_timestamp_column - ] - ) - if ( - self.batch_source.created_timestamp_column - in self.batch_source.field_mapping - ): - columns_to_exclude.add( - self.batch_source.field_mapping[ - self.batch_source.created_timestamp_column - ] - ) - for e in self.entities: - if e in self.batch_source.field_mapping: - columns_to_exclude.add(self.batch_source.field_mapping[e]) - - for ( - col_name, - col_datatype, - ) in self.batch_source.get_table_column_names_and_types(config): - if col_name not in columns_to_exclude and not re.match( - "^__|__$", - col_name, # double underscores often signal an internal-use column - ): - feature_name = ( - self.batch_source.field_mapping[col_name] - if col_name in self.batch_source.field_mapping - else col_name - ) - self.features.append( - Feature( - feature_name, - self.batch_source.source_datatype_to_feast_value_type()( - col_datatype - ), - ) - ) - - if not self.features: - raise RegistryInferenceFailure( - "FeatureView", - f"Could not infer Features for the FeatureView named {self.name}.", - ) diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 0ee2a437a6..39a77264bc 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -1,7 +1,7 @@ import re from typing import List -from feast import BigQuerySource, Entity, FileSource, RedshiftSource +from feast import BigQuerySource, Entity, Feature, FileSource, RedshiftSource from feast.data_source import DataSource from feast.errors import RegistryInferenceFailure from feast.feature_view import FeatureView @@ -118,3 +118,67 @@ def update_data_sources_with_inferred_event_timestamp_col( {ERROR_MSG_PREFIX} due to an absence of columns that satisfy the criteria. """, ) + + +def update_feature_views_with_inferred_features( + fvs: List[FeatureView], entities: List[Entity], config: RepoConfig +) -> None: + """ + Infers the set of features associated to each FeatureView and updates the FeatureView with those features. + Inference occurs through considering each column of the underlying data source as a feature except columns that are + associated with the data source's timestamp columns and the FeatureView's entity columns. + """ + entity_name_to_join_key_map = {entity.name: entity.join_key for entity in entities} + + for fv in fvs: + if not fv.features: + columns_to_exclude = { + fv.batch_source.event_timestamp_column, + fv.batch_source.created_timestamp_column, + } | { + entity_name_to_join_key_map[entity_name] for entity_name in fv.entities + } + + if fv.batch_source.event_timestamp_column in fv.batch_source.field_mapping: + columns_to_exclude.add( + fv.batch_source.field_mapping[ + fv.batch_source.event_timestamp_column + ] + ) + if ( + fv.batch_source.created_timestamp_column + in fv.batch_source.field_mapping + ): + columns_to_exclude.add( + fv.batch_source.field_mapping[ + fv.batch_source.created_timestamp_column + ] + ) + + for ( + col_name, + col_datatype, + ) in fv.batch_source.get_table_column_names_and_types(config): + if col_name not in columns_to_exclude and not re.match( + "^__|__$", + col_name, # double underscores often signal an internal-use column + ): + feature_name = ( + fv.batch_source.field_mapping[col_name] + if col_name in fv.batch_source.field_mapping + else col_name + ) + fv.features.append( + Feature( + feature_name, + fv.batch_source.source_datatype_to_feast_value_type()( + col_datatype + ), + ) + ) + + if not fv.features: + raise RegistryInferenceFailure( + "FeatureView", + f"Could not infer Features for the FeatureView named {fv.name}.", + ) diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 63bf4be629..b7731fbb4c 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -99,7 +99,7 @@ def simple_dataset_1() -> pd.DataFrame: now = datetime.utcnow() ts = pd.Timestamp(now).round("ms") data = { - "id": [1, 2, 1, 3, 3], + "id_join_key": [1, 2, 1, 3, 3], "float_col": [0.1, 0.2, 0.3, 4, 5], "int64_col": [1, 2, 3, 4, 5], "string_col": ["a", "b", "c", "d", "e"], @@ -119,7 +119,7 @@ def simple_dataset_2() -> pd.DataFrame: now = datetime.utcnow() ts = pd.Timestamp(now).round("ms") data = { - "id": ["a", "b", "c", "d", "e"], + "id_join_key": ["a", "b", "c", "d", "e"], "float_col": [0.1, 0.2, 0.3, 4, 5], "int64_col": [1, 2, 3, 4, 5], "string_col": ["a", "b", "c", "d", "e"], diff --git a/sdk/python/tests/integration/registration/test_feature_store.py b/sdk/python/tests/integration/registration/test_feature_store.py index b0cedfd642..d605865960 100644 --- a/sdk/python/tests/integration/registration/test_feature_store.py +++ b/sdk/python/tests/integration/registration/test_feature_store.py @@ -218,6 +218,8 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source): with prep_file_source( df=dataframe_source, event_timestamp_column="ts_1" ) as file_source: + entity = Entity(name="id", join_key="id_join_key", value_type=ValueType.INT64) + fv1 = FeatureView( name="fv1", entities=["id"], @@ -245,7 +247,7 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source): tags={}, ) - test_feature_store.apply([fv1, fv2, fv3]) # Register Feature Views + test_feature_store.apply([entity, fv1, fv2, fv3]) # Register Feature Views feature_view_1 = test_feature_store.list_feature_views()[0] feature_view_2 = test_feature_store.list_feature_views()[1] feature_view_3 = test_feature_store.list_feature_views()[2] @@ -433,7 +435,7 @@ def test_reapply_feature_view_success(test_feature_store, dataframe_source): df=dataframe_source, event_timestamp_column="ts_1" ) as file_source: - e = Entity(name="id", value_type=ValueType.STRING) + e = Entity(name="id", join_key="id_join_key", value_type=ValueType.STRING) # Create Feature View fv1 = FeatureView( diff --git a/sdk/python/tests/integration/registration/test_inference.py b/sdk/python/tests/integration/registration/test_inference.py index f45a18ea55..ca0a640849 100644 --- a/sdk/python/tests/integration/registration/test_inference.py +++ b/sdk/python/tests/integration/registration/test_inference.py @@ -30,8 +30,8 @@ def test_update_entities_with_inferred_types_from_feature_views( name="fv2", entities=["id"], batch_source=file_source_2, ttl=None, ) - actual_1 = Entity(name="id") - actual_2 = Entity(name="id") + actual_1 = Entity(name="id", join_key="id_join_key") + actual_2 = Entity(name="id", join_key="id_join_key") update_entities_with_inferred_types_from_feature_views( [actual_1], [fv1], RepoConfig(provider="local", project="test") @@ -39,13 +39,17 @@ def test_update_entities_with_inferred_types_from_feature_views( update_entities_with_inferred_types_from_feature_views( [actual_2], [fv2], RepoConfig(provider="local", project="test") ) - assert actual_1 == Entity(name="id", value_type=ValueType.INT64) - assert actual_2 == Entity(name="id", value_type=ValueType.STRING) + assert actual_1 == Entity( + name="id", join_key="id_join_key", value_type=ValueType.INT64 + ) + assert actual_2 == Entity( + name="id", join_key="id_join_key", value_type=ValueType.STRING + ) with pytest.raises(RegistryInferenceFailure): # two viable data types update_entities_with_inferred_types_from_feature_views( - [Entity(name="id")], + [Entity(name="id", join_key="id_join_key")], [fv1, fv2], RepoConfig(provider="local", project="test"), )