Skip to content

Commit

Permalink
feat: Make arrow primary interchange for online ODFV execution (#4143)
Browse files Browse the repository at this point in the history
* rewrite online flow to use transform_arrow

Signed-off-by: tokoko <[email protected]>

* fix transformation server

Signed-off-by: tokoko <[email protected]>

---------

Signed-off-by: tokoko <[email protected]>
  • Loading branch information
tokoko authored Apr 25, 2024
1 parent 6ef7852 commit 3fdb716
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 112 deletions.
28 changes: 14 additions & 14 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2138,7 +2138,7 @@ def _augment_response_with_on_demand_transforms(
)

initial_response = OnlineResponse(online_features_response)
initial_response_df: Optional[pd.DataFrame] = None
initial_response_arrow: Optional[pa.Table] = None
initial_response_dict: Optional[Dict[str, List[Any]]] = None

# Apply on demand transformations and augment the result rows
Expand All @@ -2148,18 +2148,14 @@ def _augment_response_with_on_demand_transforms(
if odfv.mode == "python":
if initial_response_dict is None:
initial_response_dict = initial_response.to_dict()
transformed_features_dict: Dict[str, List[Any]] = (
odfv.get_transformed_features(
initial_response_dict,
full_feature_names,
)
transformed_features_dict: Dict[str, List[Any]] = odfv.transform_dict(
initial_response_dict
)
elif odfv.mode in {"pandas", "substrait"}:
if initial_response_df is None:
initial_response_df = initial_response.to_df()
transformed_features_df: pd.DataFrame = odfv.get_transformed_features(
initial_response_df,
full_feature_names,
if initial_response_arrow is None:
initial_response_arrow = initial_response.to_arrow()
transformed_features_arrow = odfv.transform_arrow(
initial_response_arrow, full_feature_names
)
else:
raise Exception(
Expand All @@ -2169,11 +2165,11 @@ def _augment_response_with_on_demand_transforms(
transformed_features = (
transformed_features_dict
if odfv.mode == "python"
else transformed_features_df
else transformed_features_arrow
)
transformed_columns = (
transformed_features.columns
if isinstance(transformed_features, pd.DataFrame)
transformed_features.column_names
if isinstance(transformed_features, pa.Table)
else transformed_features
)
selected_subset = [f for f in transformed_columns if f in _feature_refs]
Expand All @@ -2183,6 +2179,10 @@ def _augment_response_with_on_demand_transforms(
feature_vector = transformed_features[selected_feature]
proto_values.append(
python_values_to_proto_values(feature_vector, ValueType.UNKNOWN)
if odfv.mode == "python"
else python_values_to_proto_values(
feature_vector.to_numpy(), ValueType.UNKNOWN
)
)

odfv_result_names |= set(selected_subset)
Expand Down
89 changes: 1 addition & 88 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from typeguard import typechecked

from feast.base_feature_view import BaseFeatureView
from feast.batch_feature_view import BatchFeatureView
from feast.data_source import RequestSource
from feast.errors import RegistryInferenceFailure, SpecifiedFeaturesNotPresentError
from feast.feature_view import FeatureView
Expand Down Expand Up @@ -493,53 +492,7 @@ def transform_arrow(
]
)

def get_transformed_features_df(
self,
df_with_features: pd.DataFrame,
full_feature_names: bool = False,
) -> pd.DataFrame:
# Apply on demand transformations
if not isinstance(df_with_features, pd.DataFrame):
raise TypeError("get_transformed_features_df only accepts pd.DataFrame")
columns_to_cleanup = []
for source_fv_projection in self.source_feature_view_projections.values():
for feature in source_fv_projection.features:
full_feature_ref = f"{source_fv_projection.name}__{feature.name}"
if full_feature_ref in df_with_features.keys():
# Make sure the partial feature name is always present
df_with_features[feature.name] = df_with_features[full_feature_ref]
columns_to_cleanup.append(feature.name)
elif feature.name in df_with_features.keys():
# Make sure the full feature name is always present
df_with_features[full_feature_ref] = df_with_features[feature.name]
columns_to_cleanup.append(full_feature_ref)

# Compute transformed values and apply to each result row
df_with_transformed_features: pd.DataFrame = (
self.feature_transformation.transform(df_with_features)
)

# Work out whether the correct columns names are used.
rename_columns: Dict[str, str] = {}
for feature in self.features:
short_name = feature.name
long_name = self._get_projected_feature_name(feature.name)
if (
short_name in df_with_transformed_features.columns
and full_feature_names
):
rename_columns[short_name] = long_name
elif not full_feature_names:
# Long name must be in dataframe.
rename_columns[long_name] = short_name

# Cleanup extra columns used for transformation
df_with_transformed_features = df_with_transformed_features[
[f.name for f in self.features]
]
return df_with_transformed_features.rename(columns=rename_columns)

def get_transformed_features_dict(
def transform_dict(
self,
feature_dict: Dict[str, Any], # type: ignore
) -> Dict[str, Any]:
Expand All @@ -566,29 +519,6 @@ def get_transformed_features_dict(
del output_dict[feature_name]
return output_dict

def get_transformed_features(
self,
features: Union[Dict[str, Any], pd.DataFrame],
full_feature_names: bool = False,
) -> Union[Dict[str, Any], pd.DataFrame]:
# TODO: classic inheritance pattern....maybe fix this
if self.mode == "python" and isinstance(features, Dict):
# note full_feature_names is not needed for the dictionary
return self.get_transformed_features_dict(
feature_dict=features,
)
elif self.mode in {"pandas", "substrait"} and isinstance(
features, pd.DataFrame
):
return self.get_transformed_features_df(
df_with_features=features,
full_feature_names=full_feature_names,
)
else:
raise Exception(
f'Invalid OnDemandFeatureMode: {self.mode}. Expected one of "pandas" or "python".'
)

def infer_features(self) -> None:
inferred_features = self.feature_transformation.infer_features(
self._construct_random_input()
Expand Down Expand Up @@ -745,23 +675,6 @@ def decorator(user_function):
return decorator


def feature_view_to_batch_feature_view(fv: FeatureView) -> BatchFeatureView:
bfv = BatchFeatureView(
name=fv.name,
entities=fv.entities,
ttl=fv.ttl,
tags=fv.tags,
online=fv.online,
owner=fv.owner,
schema=fv.schema,
source=fv.batch_source,
)

bfv.features = copy.copy(fv.features)
bfv.entities = copy.copy(fv.entities)
return bfv


def _empty_odfv_udf_fn(x: Any) -> Any:
# just an identity mapping, otherwise we risk tripping some downstream tests
return x
11 changes: 11 additions & 0 deletions sdk/python/feast/online_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from typing import Any, Dict, List

import pandas as pd
import pyarrow as pa

from feast.feature_view import DUMMY_ENTITY_ID
from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesResponse
Expand Down Expand Up @@ -77,3 +78,13 @@ def to_df(self, include_event_timestamps: bool = False) -> pd.DataFrame:
"""

return pd.DataFrame(self.to_dict(include_event_timestamps))

def to_arrow(self, include_event_timestamps: bool = False) -> pa.Table:
"""
Converts GetOnlineFeaturesResponse features into pyarrow Table.
Args:
is_with_event_timestamps: bool Optionally include feature timestamps in the table
"""

return pa.Table.from_pydict(self.to_dict(include_event_timestamps))
3 changes: 0 additions & 3 deletions sdk/python/feast/transformation/python_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ def __eq__(self, other):
"Comparisons should only involve PythonTransformation class objects."
)

if not super().__eq__(other):
return False

if (
self.udf_string != other.udf_string
or self.udf.__code__.co_code != other.udf.__code__.co_code
Expand Down
3 changes: 0 additions & 3 deletions sdk/python/feast/transformation/substrait_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ def __eq__(self, other):
"Comparisons should only involve SubstraitTransformation class objects."
)

if not super().__eq__(other):
return False

return (
self.substrait_plan == other.substrait_plan
and self.ibis_function.__code__.co_code
Expand Down
5 changes: 2 additions & 3 deletions sdk/python/feast/transformation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,14 @@ def TransformFeatures(self, request, context):
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
raise

df = pa.ipc.open_file(request.transformation_input.arrow_value).read_pandas()
df = pa.ipc.open_file(request.transformation_input.arrow_value).read_all()

if odfv.mode != "pandas":
raise Exception(
f'OnDemandFeatureView mode "{odfv.mode}" not supported by TransformationServer.'
)

result_df = odfv.get_transformed_features_df(df, True)
result_arrow = pa.Table.from_pandas(result_df)
result_arrow = odfv.transform_arrow(df, True)
sink = pa.BufferOutputStream()
writer = pa.ipc.new_file(sink, result_arrow.schema)
writer.write_table(result_arrow)
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tests/unit/test_on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def test_python_native_transformation_mode():
}
)

assert on_demand_feature_view_python_native.get_transformed_features(
assert on_demand_feature_view_python_native.transform_dict(
{
"feature1": 0,
"feature2": 1,
Expand Down

0 comments on commit 3fdb716

Please sign in to comment.