diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index fafec32c5d..e83a24b664 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -2138,7 +2138,7 @@ def _augment_response_with_on_demand_transforms( ) initial_response = OnlineResponse(online_features_response) - initial_response_df: Optional[pd.DataFrame] = None + initial_response_arrow: Optional[pa.Table] = None initial_response_dict: Optional[Dict[str, List[Any]]] = None # Apply on demand transformations and augment the result rows @@ -2148,18 +2148,14 @@ def _augment_response_with_on_demand_transforms( if odfv.mode == "python": if initial_response_dict is None: initial_response_dict = initial_response.to_dict() - transformed_features_dict: Dict[str, List[Any]] = ( - odfv.get_transformed_features( - initial_response_dict, - full_feature_names, - ) + transformed_features_dict: Dict[str, List[Any]] = odfv.transform_dict( + initial_response_dict ) elif odfv.mode in {"pandas", "substrait"}: - if initial_response_df is None: - initial_response_df = initial_response.to_df() - transformed_features_df: pd.DataFrame = odfv.get_transformed_features( - initial_response_df, - full_feature_names, + if initial_response_arrow is None: + initial_response_arrow = initial_response.to_arrow() + transformed_features_arrow = odfv.transform_arrow( + initial_response_arrow, full_feature_names ) else: raise Exception( @@ -2169,11 +2165,11 @@ def _augment_response_with_on_demand_transforms( transformed_features = ( transformed_features_dict if odfv.mode == "python" - else transformed_features_df + else transformed_features_arrow ) transformed_columns = ( - transformed_features.columns - if isinstance(transformed_features, pd.DataFrame) + transformed_features.column_names + if isinstance(transformed_features, pa.Table) else transformed_features ) selected_subset = [f for f in transformed_columns if f in _feature_refs] @@ -2183,6 +2179,10 @@ def _augment_response_with_on_demand_transforms( feature_vector = transformed_features[selected_feature] proto_values.append( python_values_to_proto_values(feature_vector, ValueType.UNKNOWN) + if odfv.mode == "python" + else python_values_to_proto_values( + feature_vector.to_numpy(), ValueType.UNKNOWN + ) ) odfv_result_names |= set(selected_subset) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index b532fa651a..e5de1f20a6 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -12,7 +12,6 @@ from typeguard import typechecked from feast.base_feature_view import BaseFeatureView -from feast.batch_feature_view import BatchFeatureView from feast.data_source import RequestSource from feast.errors import RegistryInferenceFailure, SpecifiedFeaturesNotPresentError from feast.feature_view import FeatureView @@ -493,53 +492,7 @@ def transform_arrow( ] ) - def get_transformed_features_df( - self, - df_with_features: pd.DataFrame, - full_feature_names: bool = False, - ) -> pd.DataFrame: - # Apply on demand transformations - if not isinstance(df_with_features, pd.DataFrame): - raise TypeError("get_transformed_features_df only accepts pd.DataFrame") - columns_to_cleanup = [] - for source_fv_projection in self.source_feature_view_projections.values(): - for feature in source_fv_projection.features: - full_feature_ref = f"{source_fv_projection.name}__{feature.name}" - if full_feature_ref in df_with_features.keys(): - # Make sure the partial feature name is always present - df_with_features[feature.name] = df_with_features[full_feature_ref] - columns_to_cleanup.append(feature.name) - elif feature.name in df_with_features.keys(): - # Make sure the full feature name is always present - df_with_features[full_feature_ref] = df_with_features[feature.name] - columns_to_cleanup.append(full_feature_ref) - - # Compute transformed values and apply to each result row - df_with_transformed_features: pd.DataFrame = ( - self.feature_transformation.transform(df_with_features) - ) - - # Work out whether the correct columns names are used. - rename_columns: Dict[str, str] = {} - for feature in self.features: - short_name = feature.name - long_name = self._get_projected_feature_name(feature.name) - if ( - short_name in df_with_transformed_features.columns - and full_feature_names - ): - rename_columns[short_name] = long_name - elif not full_feature_names: - # Long name must be in dataframe. - rename_columns[long_name] = short_name - - # Cleanup extra columns used for transformation - df_with_transformed_features = df_with_transformed_features[ - [f.name for f in self.features] - ] - return df_with_transformed_features.rename(columns=rename_columns) - - def get_transformed_features_dict( + def transform_dict( self, feature_dict: Dict[str, Any], # type: ignore ) -> Dict[str, Any]: @@ -566,29 +519,6 @@ def get_transformed_features_dict( del output_dict[feature_name] return output_dict - def get_transformed_features( - self, - features: Union[Dict[str, Any], pd.DataFrame], - full_feature_names: bool = False, - ) -> Union[Dict[str, Any], pd.DataFrame]: - # TODO: classic inheritance pattern....maybe fix this - if self.mode == "python" and isinstance(features, Dict): - # note full_feature_names is not needed for the dictionary - return self.get_transformed_features_dict( - feature_dict=features, - ) - elif self.mode in {"pandas", "substrait"} and isinstance( - features, pd.DataFrame - ): - return self.get_transformed_features_df( - df_with_features=features, - full_feature_names=full_feature_names, - ) - else: - raise Exception( - f'Invalid OnDemandFeatureMode: {self.mode}. Expected one of "pandas" or "python".' - ) - def infer_features(self) -> None: inferred_features = self.feature_transformation.infer_features( self._construct_random_input() @@ -745,23 +675,6 @@ def decorator(user_function): return decorator -def feature_view_to_batch_feature_view(fv: FeatureView) -> BatchFeatureView: - bfv = BatchFeatureView( - name=fv.name, - entities=fv.entities, - ttl=fv.ttl, - tags=fv.tags, - online=fv.online, - owner=fv.owner, - schema=fv.schema, - source=fv.batch_source, - ) - - bfv.features = copy.copy(fv.features) - bfv.entities = copy.copy(fv.entities) - return bfv - - def _empty_odfv_udf_fn(x: Any) -> Any: # just an identity mapping, otherwise we risk tripping some downstream tests return x diff --git a/sdk/python/feast/online_response.py b/sdk/python/feast/online_response.py index 48524359bf..050b374340 100644 --- a/sdk/python/feast/online_response.py +++ b/sdk/python/feast/online_response.py @@ -15,6 +15,7 @@ from typing import Any, Dict, List import pandas as pd +import pyarrow as pa from feast.feature_view import DUMMY_ENTITY_ID from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesResponse @@ -77,3 +78,13 @@ def to_df(self, include_event_timestamps: bool = False) -> pd.DataFrame: """ return pd.DataFrame(self.to_dict(include_event_timestamps)) + + def to_arrow(self, include_event_timestamps: bool = False) -> pa.Table: + """ + Converts GetOnlineFeaturesResponse features into pyarrow Table. + + Args: + is_with_event_timestamps: bool Optionally include feature timestamps in the table + """ + + return pa.Table.from_pydict(self.to_dict(include_event_timestamps)) diff --git a/sdk/python/feast/transformation/python_transformation.py b/sdk/python/feast/transformation/python_transformation.py index ec950a24f3..88cde7cc72 100644 --- a/sdk/python/feast/transformation/python_transformation.py +++ b/sdk/python/feast/transformation/python_transformation.py @@ -64,9 +64,6 @@ def __eq__(self, other): "Comparisons should only involve PythonTransformation class objects." ) - if not super().__eq__(other): - return False - if ( self.udf_string != other.udf_string or self.udf.__code__.co_code != other.udf.__code__.co_code diff --git a/sdk/python/feast/transformation/substrait_transformation.py b/sdk/python/feast/transformation/substrait_transformation.py index 48a87b6207..02b94d8572 100644 --- a/sdk/python/feast/transformation/substrait_transformation.py +++ b/sdk/python/feast/transformation/substrait_transformation.py @@ -77,9 +77,6 @@ def __eq__(self, other): "Comparisons should only involve SubstraitTransformation class objects." ) - if not super().__eq__(other): - return False - return ( self.substrait_plan == other.substrait_plan and self.ibis_function.__code__.co_code diff --git a/sdk/python/feast/transformation_server.py b/sdk/python/feast/transformation_server.py index 34fe3eac76..db8b0d942e 100644 --- a/sdk/python/feast/transformation_server.py +++ b/sdk/python/feast/transformation_server.py @@ -45,15 +45,14 @@ def TransformFeatures(self, request, context): context.set_code(grpc.StatusCode.INVALID_ARGUMENT) raise - df = pa.ipc.open_file(request.transformation_input.arrow_value).read_pandas() + df = pa.ipc.open_file(request.transformation_input.arrow_value).read_all() if odfv.mode != "pandas": raise Exception( f'OnDemandFeatureView mode "{odfv.mode}" not supported by TransformationServer.' ) - result_df = odfv.get_transformed_features_df(df, True) - result_arrow = pa.Table.from_pandas(result_df) + result_arrow = odfv.transform_arrow(df, True) sink = pa.BufferOutputStream() writer = pa.ipc.new_file(sink, result_arrow.schema) writer.write_table(result_arrow) diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index cf4afa9422..402aa4e0e3 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -204,7 +204,7 @@ def test_python_native_transformation_mode(): } ) - assert on_demand_feature_view_python_native.get_transformed_features( + assert on_demand_feature_view_python_native.transform_dict( { "feature1": 0, "feature2": 1,