From 8f97cf1d1b9c033a4b9695d93b6305f72190ce1b Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 17 Mar 2024 14:25:16 -0400 Subject: [PATCH 01/18] feat: updating protos to separate transformation Signed-off-by: Francisco Javier Arceo --- protos/feast/core/OnDemandFeatureView.proto | 24 +++------------- protos/feast/core/StreamFeatureView.proto | 1 + protos/feast/core/Transformation.proto | 31 +++++++++++++++++++++ 3 files changed, 36 insertions(+), 20 deletions(-) create mode 100644 protos/feast/core/Transformation.proto diff --git a/protos/feast/core/OnDemandFeatureView.proto b/protos/feast/core/OnDemandFeatureView.proto index c43b33c1d2..d57465ef96 100644 --- a/protos/feast/core/OnDemandFeatureView.proto +++ b/protos/feast/core/OnDemandFeatureView.proto @@ -27,6 +27,7 @@ import "feast/core/FeatureView.proto"; import "feast/core/FeatureViewProjection.proto"; import "feast/core/Feature.proto"; import "feast/core/DataSource.proto"; +import "feast/core/Transformation.proto"; message OnDemandFeatureView { // User-specified specifications of this feature view. @@ -48,10 +49,8 @@ message OnDemandFeatureViewSpec { // Map of sources for this feature view. map sources = 4; - oneof transformation { - UserDefinedFunction user_defined_function = 5; - OnDemandSubstraitTransformation on_demand_substrait_transformation = 9; - } + // Oneof with {user_defined_function, on_demand_substrait_transformation} + FeatureTransformation transformation = 5; // Description of the on demand feature view. string description = 6; @@ -61,6 +60,7 @@ message OnDemandFeatureViewSpec { // Owner of the on demand feature view. string owner = 8; + string mode = 9; } message OnDemandFeatureViewMeta { @@ -78,19 +78,3 @@ message OnDemandSource { DataSource request_data_source = 2; } } - -// Serialized representation of python function. -message UserDefinedFunction { - // The function name - string name = 1; - - // The python-syntax function body (serialized by dill) - bytes body = 2; - - // The string representation of the udf - string body_text = 3; -} - -message OnDemandSubstraitTransformation { - bytes substrait_plan = 1; -} \ No newline at end of file diff --git a/protos/feast/core/StreamFeatureView.proto b/protos/feast/core/StreamFeatureView.proto index 3181bdf360..bac2b9abcd 100644 --- a/protos/feast/core/StreamFeatureView.proto +++ b/protos/feast/core/StreamFeatureView.proto @@ -29,6 +29,7 @@ import "feast/core/FeatureView.proto"; import "feast/core/Feature.proto"; import "feast/core/DataSource.proto"; import "feast/core/Aggregation.proto"; +import "feast/core/Transformation.proto"; message StreamFeatureView { // User-specified specifications of this feature view. diff --git a/protos/feast/core/Transformation.proto b/protos/feast/core/Transformation.proto new file mode 100644 index 0000000000..2116ec055d --- /dev/null +++ b/protos/feast/core/Transformation.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; +package feast.core; + +option go_package = "github.com/feast-dev/feast/go/protos/feast/core"; +option java_outer_classname = "FeatureTransformationProto"; +option java_package = "feast.proto.core"; + +import "google/protobuf/duration.proto"; + +// Serialized representation of python function. +message UserDefinedFunction { + // The function name + string name = 1; + + // The python-syntax function body (serialized by dill) + bytes body = 2; + + // The string representation of the udf + string body_text = 3; +} + +message FeatureTransformation { + oneof transformation { + UserDefinedFunction user_defined_function = 1; + OnDemandSubstraitTransformation on_demand_substrait_transformation = 2; + } +} + +message OnDemandSubstraitTransformation { + bytes substrait_plan = 1; +} From bc981c7cb1f00ca29a7bfd161f35b420f0807778 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 17 Mar 2024 14:44:21 -0400 Subject: [PATCH 02/18] fixed stuff...i think Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/diff/registry_diff.py | 4 ++-- sdk/python/feast/on_demand_feature_view.py | 22 ++++++++++++------- .../feast/on_demand_pandas_transformation.py | 2 +- .../on_demand_substrait_transformation.py | 2 +- sdk/python/feast/stream_feature_view.py | 6 ++--- 5 files changed, 21 insertions(+), 15 deletions(-) diff --git a/sdk/python/feast/diff/registry_diff.py b/sdk/python/feast/diff/registry_diff.py index 15f880e392..04167c4a73 100644 --- a/sdk/python/feast/diff/registry_diff.py +++ b/sdk/python/feast/diff/registry_diff.py @@ -147,8 +147,8 @@ def diff_registry_objects( if _field.name == "user_defined_function": current_spec = cast(OnDemandFeatureViewSpec, current_spec) new_spec = cast(OnDemandFeatureViewSpec, new_spec) - current_udf = current_spec.user_defined_function - new_udf = new_spec.user_defined_function + current_udf = current_spec.transformation.user_defined_function + new_udf = new_spec.transformation.user_defined_function for _udf_field in current_udf.DESCRIPTOR.fields: if _udf_field.name == "body": continue diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 586286a3d4..5b5393ef7e 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -27,6 +27,9 @@ OnDemandFeatureViewSpec, OnDemandSource, ) +from feast.protos.feast.core.Transformation_pb2 import ( + FeatureTransformation as FeatureTransformationProto, +) from feast.type_map import ( feast_value_type_to_pandas_type, python_type_to_feast_value_type, @@ -205,16 +208,19 @@ def to_proto(self) -> OnDemandFeatureViewProto: request_data_source=request_sources.to_proto() ) - spec = OnDemandFeatureViewSpec( - name=self.name, - features=[feature.to_proto() for feature in self.features], - sources=sources, + feature_transformation = FeatureTransformationProto( user_defined_function=self.transformation.to_proto() if type(self.transformation) == OnDemandPandasTransformation else None, - on_demand_substrait_transformation=self.transformation.to_proto() # type: ignore + on_demand_substrait_transformation=self.transformation.to_proto() if type(self.transformation) == OnDemandSubstraitTransformation - else None, + else None, # type: ignore + ) + spec = OnDemandFeatureViewSpec( + name=self.name, + features=[feature.to_proto() for feature in self.features], + sources=sources, + transformation=feature_transformation, description=self.description, tags=self.tags, owner=self.owner, @@ -258,14 +264,14 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): == "user_defined_function" ): transformation = OnDemandPandasTransformation.from_proto( - on_demand_feature_view_proto.spec.user_defined_function + on_demand_feature_view_proto.spec.transformation.user_defined_function ) elif ( on_demand_feature_view_proto.spec.WhichOneof("transformation") == "on_demand_substrait_transformation" ): transformation = OnDemandSubstraitTransformation.from_proto( - on_demand_feature_view_proto.spec.on_demand_substrait_transformation + on_demand_feature_view_proto.spec.transformation.on_demand_substrait_transformation ) else: raise Exception("At least one transformation type needs to be provided") diff --git a/sdk/python/feast/on_demand_pandas_transformation.py b/sdk/python/feast/on_demand_pandas_transformation.py index 32cb44b429..ac995241d1 100644 --- a/sdk/python/feast/on_demand_pandas_transformation.py +++ b/sdk/python/feast/on_demand_pandas_transformation.py @@ -3,7 +3,7 @@ import dill import pandas as pd -from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( +from feast.protos.feast.core.Transformation_pb2 import ( UserDefinedFunction as UserDefinedFunctionProto, ) diff --git a/sdk/python/feast/on_demand_substrait_transformation.py b/sdk/python/feast/on_demand_substrait_transformation.py index 4e92e77dc8..5df24dfac7 100644 --- a/sdk/python/feast/on_demand_substrait_transformation.py +++ b/sdk/python/feast/on_demand_substrait_transformation.py @@ -2,7 +2,7 @@ import pyarrow import pyarrow.substrait as substrait # type: ignore # noqa -from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( +from feast.protos.feast.core.Transformation_pb2 import ( OnDemandSubstraitTransformation as OnDemandSubstraitTransformationProto, ) diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index 13abbc5e28..13e209a03a 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -16,15 +16,15 @@ from feast.feature_view import FeatureView from feast.field import Field from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto -from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( - UserDefinedFunction as UserDefinedFunctionProto, -) from feast.protos.feast.core.StreamFeatureView_pb2 import ( StreamFeatureView as StreamFeatureViewProto, ) from feast.protos.feast.core.StreamFeatureView_pb2 import ( StreamFeatureViewSpec as StreamFeatureViewSpecProto, ) +from feast.protos.feast.core.Transformation_pb2 import ( + UserDefinedFunction as UserDefinedFunctionProto, +) warnings.simplefilter("once", RuntimeWarning) From 326ca4a27832ddd0beb60b224da12b9ccb50f130 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 17 Mar 2024 22:27:45 -0400 Subject: [PATCH 03/18] updated tests and registry diff function Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/diff/registry_diff.py | 2 +- sdk/python/feast/on_demand_feature_view.py | 4 ++-- sdk/python/tests/unit/diff/test_registry_diff.py | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/diff/registry_diff.py b/sdk/python/feast/diff/registry_diff.py index 04167c4a73..b6784cd40b 100644 --- a/sdk/python/feast/diff/registry_diff.py +++ b/sdk/python/feast/diff/registry_diff.py @@ -144,7 +144,7 @@ def diff_registry_objects( if _field.name in FIELDS_TO_IGNORE: continue elif getattr(current_spec, _field.name) != getattr(new_spec, _field.name): - if _field.name == "user_defined_function": + if _field.name == "transformation": current_spec = cast(OnDemandFeatureViewSpec, current_spec) new_spec = cast(OnDemandFeatureViewSpec, new_spec) current_udf = current_spec.transformation.user_defined_function diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 5b5393ef7e..0c16ce33f0 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -260,14 +260,14 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): ) if ( - on_demand_feature_view_proto.spec.WhichOneof("transformation") + on_demand_feature_view_proto.spec.transformation.WhichOneof("transformation") == "user_defined_function" ): transformation = OnDemandPandasTransformation.from_proto( on_demand_feature_view_proto.spec.transformation.user_defined_function ) elif ( - on_demand_feature_view_proto.spec.WhichOneof("transformation") + on_demand_feature_view_proto.spec.transformation.WhichOneof("transformation") == "on_demand_substrait_transformation" ): transformation = OnDemandSubstraitTransformation.from_proto( diff --git a/sdk/python/tests/unit/diff/test_registry_diff.py b/sdk/python/tests/unit/diff/test_registry_diff.py index ce40295f8b..1dd0abce2c 100644 --- a/sdk/python/tests/unit/diff/test_registry_diff.py +++ b/sdk/python/tests/unit/diff/test_registry_diff.py @@ -139,11 +139,11 @@ def post_changed(inputs: pd.DataFrame) -> pd.DataFrame: assert feast_object_diffs.feast_object_property_diffs[0].property_name == "name" assert ( feast_object_diffs.feast_object_property_diffs[1].property_name - == "user_defined_function.name" + == "transformation.name" ) assert ( feast_object_diffs.feast_object_property_diffs[2].property_name - == "user_defined_function.body_text" + == "transformation.body_text" ) From 0ea16a3f5b918c97cf6c488acab5682b715a9675 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 17 Mar 2024 22:48:40 -0400 Subject: [PATCH 04/18] updated base registry Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/infra/registry/base_registry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index 9ee3bbbabc..64f3225071 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -663,7 +663,7 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]: ): odfv_dict = self._message_to_sorted_dict(on_demand_feature_view.to_proto()) - odfv_dict["spec"]["userDefinedFunction"][ + odfv_dict["spec"]["transformation"]["userDefinedFunction"][ "body" ] = on_demand_feature_view.transformation.udf_string registry_dict["onDemandFeatureViews"].append(odfv_dict) From 5121d8dcec53b9f6cf0fa50d745cfab0a924a0ab Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 17 Mar 2024 22:53:30 -0400 Subject: [PATCH 05/18] updated react component Signed-off-by: Francisco Javier Arceo --- ui/src/pages/feature-views/OnDemandFeatureViewOverviewTab.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ui/src/pages/feature-views/OnDemandFeatureViewOverviewTab.tsx b/ui/src/pages/feature-views/OnDemandFeatureViewOverviewTab.tsx index ee8e41bbf6..4ac23de030 100644 --- a/ui/src/pages/feature-views/OnDemandFeatureViewOverviewTab.tsx +++ b/ui/src/pages/feature-views/OnDemandFeatureViewOverviewTab.tsx @@ -57,7 +57,7 @@ const OnDemandFeatureViewOverviewTab = ({ - {data?.spec?.userDefinedFunction?.bodyText} + {data?.spec?.transformation?.userDefinedFunction?.bodyText} From cbbfcf892da6bd407f037141f3e49db16a5c43ca Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 17 Mar 2024 22:54:18 -0400 Subject: [PATCH 06/18] formatted Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/on_demand_feature_view.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 0c16ce33f0..fa7c0c806d 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -260,14 +260,18 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): ) if ( - on_demand_feature_view_proto.spec.transformation.WhichOneof("transformation") + on_demand_feature_view_proto.spec.transformation.WhichOneof( + "transformation" + ) == "user_defined_function" ): transformation = OnDemandPandasTransformation.from_proto( on_demand_feature_view_proto.spec.transformation.user_defined_function ) elif ( - on_demand_feature_view_proto.spec.transformation.WhichOneof("transformation") + on_demand_feature_view_proto.spec.transformation.WhichOneof( + "transformation" + ) == "on_demand_substrait_transformation" ): transformation = OnDemandSubstraitTransformation.from_proto( From 01b82c89c8c0fd94b0aa211f5915644c0cf0642d Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 17 Mar 2024 23:30:15 -0400 Subject: [PATCH 07/18] updated stream feature view proto Signed-off-by: Francisco Javier Arceo --- protos/feast/core/StreamFeatureView.proto | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/protos/feast/core/StreamFeatureView.proto b/protos/feast/core/StreamFeatureView.proto index bac2b9abcd..f7aa14e2fa 100644 --- a/protos/feast/core/StreamFeatureView.proto +++ b/protos/feast/core/StreamFeatureView.proto @@ -80,6 +80,7 @@ message StreamFeatureViewSpec { // Serialized function that is encoded in the streamfeatureview UserDefinedFunction user_defined_function = 13; + // Mode of execution string mode = 14; @@ -88,5 +89,8 @@ message StreamFeatureViewSpec { // Timestamp field for aggregation string timestamp_field = 16; + + // Oneof with {user_defined_function, on_demand_substrait_transformation} + FeatureTransformation transformation = 17; } From 8712b2e3eb77ad023b0f8449b8afd7665412ea27 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Mon, 18 Mar 2024 06:33:55 -0400 Subject: [PATCH 08/18] making the proto changes backwards compatable Signed-off-by: Francisco Javier Arceo --- protos/feast/core/Transformation.proto | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/protos/feast/core/Transformation.proto b/protos/feast/core/Transformation.proto index 2116ec055d..2a893b0d22 100644 --- a/protos/feast/core/Transformation.proto +++ b/protos/feast/core/Transformation.proto @@ -19,10 +19,12 @@ message UserDefinedFunction { string body_text = 3; } +// A feature transformation executed as a user-defined function message FeatureTransformation { + // Note this Transformation starts at 5 for backwards compatibility oneof transformation { - UserDefinedFunction user_defined_function = 1; - OnDemandSubstraitTransformation on_demand_substrait_transformation = 2; + UserDefinedFunction user_defined_function = 5; + OnDemandSubstraitTransformation on_demand_substrait_transformation = 6; } } From f1e3764eafccb6b8c50b0408ee1f1e714d0d0df1 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 19 Mar 2024 20:41:33 -0400 Subject: [PATCH 09/18] trying to make this backwards compatible Signed-off-by: Francisco Javier Arceo --- protos/feast/core/OnDemandFeatureView.proto | 28 +++++++++++++++++-- protos/feast/core/StreamFeatureView.proto | 2 +- protos/feast/core/Transformation.proto | 10 +++---- sdk/python/feast/diff/registry_diff.py | 21 ++++++++++++++ .../feast/infra/registry/base_registry.py | 3 +- sdk/python/feast/on_demand_feature_view.py | 12 ++++---- .../feast/on_demand_pandas_transformation.py | 2 +- .../on_demand_substrait_transformation.py | 2 +- sdk/python/feast/stream_feature_view.py | 21 ++++++++++++-- .../tests/unit/diff/test_registry_diff.py | 4 +-- sdk/python/tests/unit/test_sql_registry.py | 4 +-- .../OnDemandFeatureViewOverviewTab.tsx | 2 +- 12 files changed, 86 insertions(+), 25 deletions(-) diff --git a/protos/feast/core/OnDemandFeatureView.proto b/protos/feast/core/OnDemandFeatureView.proto index d57465ef96..92c953c16d 100644 --- a/protos/feast/core/OnDemandFeatureView.proto +++ b/protos/feast/core/OnDemandFeatureView.proto @@ -49,8 +49,12 @@ message OnDemandFeatureViewSpec { // Map of sources for this feature view. map sources = 4; + oneof transformation { + UserDefinedFunction user_defined_function = 5; + OnDemandSubstraitTransformation on_demand_substrait_transformation = 9; + } // Oneof with {user_defined_function, on_demand_substrait_transformation} - FeatureTransformation transformation = 5; + FeatureTransformationV2 feature_transformation = 10; // Description of the on demand feature view. string description = 6; @@ -60,7 +64,7 @@ message OnDemandFeatureViewSpec { // Owner of the on demand feature view. string owner = 8; - string mode = 9; + string mode = 11; } message OnDemandFeatureViewMeta { @@ -78,3 +82,23 @@ message OnDemandSource { DataSource request_data_source = 2; } } + +// Serialized representation of python function. +message UserDefinedFunction { + option deprecated = true; + + // The function name + string name = 1; + + // The python-syntax function body (serialized by dill) + bytes body = 2; + + // The string representation of the udf + string body_text = 3; +} + +message OnDemandSubstraitTransformation { + option deprecated = true; + + bytes substrait_plan = 1; +} diff --git a/protos/feast/core/StreamFeatureView.proto b/protos/feast/core/StreamFeatureView.proto index f7aa14e2fa..a54024af4b 100644 --- a/protos/feast/core/StreamFeatureView.proto +++ b/protos/feast/core/StreamFeatureView.proto @@ -91,6 +91,6 @@ message StreamFeatureViewSpec { string timestamp_field = 16; // Oneof with {user_defined_function, on_demand_substrait_transformation} - FeatureTransformation transformation = 17; + FeatureTransformationV2 feature_transformation = 17; } diff --git a/protos/feast/core/Transformation.proto b/protos/feast/core/Transformation.proto index 2a893b0d22..89bbf3daae 100644 --- a/protos/feast/core/Transformation.proto +++ b/protos/feast/core/Transformation.proto @@ -8,7 +8,7 @@ option java_package = "feast.proto.core"; import "google/protobuf/duration.proto"; // Serialized representation of python function. -message UserDefinedFunction { +message UserDefinedFunctionV2 { // The function name string name = 1; @@ -20,14 +20,14 @@ message UserDefinedFunction { } // A feature transformation executed as a user-defined function -message FeatureTransformation { +message FeatureTransformationV2 { // Note this Transformation starts at 5 for backwards compatibility oneof transformation { - UserDefinedFunction user_defined_function = 5; - OnDemandSubstraitTransformation on_demand_substrait_transformation = 6; + UserDefinedFunctionV2 user_defined_function = 5; + OnDemandSubstraitTransformationV2 on_demand_substrait_transformation = 6; } } -message OnDemandSubstraitTransformation { +message OnDemandSubstraitTransformationV2 { bytes substrait_plan = 1; } diff --git a/sdk/python/feast/diff/registry_diff.py b/sdk/python/feast/diff/registry_diff.py index b6784cd40b..6f0421b203 100644 --- a/sdk/python/feast/diff/registry_diff.py +++ b/sdk/python/feast/diff/registry_diff.py @@ -163,6 +163,27 @@ def diff_registry_objects( getattr(new_udf, _udf_field.name), ) ) + elif _field.name == "feature_transformation": + current_spec = cast(OnDemandFeatureViewSpec, current_spec) + new_spec = cast(OnDemandFeatureViewSpec, new_spec) + current_udf = ( + current_spec.feature_transformation.user_defined_function + ) + new_udf = new_spec.feature_transformation.user_defined_function + for _udf_field in current_udf.DESCRIPTOR.fields: + if _udf_field.name == "body": + continue + if getattr(current_udf, _udf_field.name) != getattr( + new_udf, _udf_field.name + ): + transition = TransitionType.UPDATE + property_diffs.append( + PropertyDiff( + _field.name + "." + _udf_field.name, + getattr(current_udf, _udf_field.name), + getattr(new_udf, _udf_field.name), + ) + ) else: transition = TransitionType.UPDATE property_diffs.append( diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index 64f3225071..80c1c9e07c 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -662,8 +662,7 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]: key=lambda on_demand_feature_view: on_demand_feature_view.name, ): odfv_dict = self._message_to_sorted_dict(on_demand_feature_view.to_proto()) - - odfv_dict["spec"]["transformation"]["userDefinedFunction"][ + odfv_dict["spec"]["featureTransformation"]["userDefinedFunction"][ "body" ] = on_demand_feature_view.transformation.udf_string registry_dict["onDemandFeatureViews"].append(odfv_dict) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index fa7c0c806d..ead66ab5fb 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -28,7 +28,7 @@ OnDemandSource, ) from feast.protos.feast.core.Transformation_pb2 import ( - FeatureTransformation as FeatureTransformationProto, + FeatureTransformationV2 as FeatureTransformationProto, ) from feast.type_map import ( feast_value_type_to_pandas_type, @@ -220,7 +220,7 @@ def to_proto(self) -> OnDemandFeatureViewProto: name=self.name, features=[feature.to_proto() for feature in self.features], sources=sources, - transformation=feature_transformation, + feature_transformation=feature_transformation, description=self.description, tags=self.tags, owner=self.owner, @@ -260,22 +260,22 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): ) if ( - on_demand_feature_view_proto.spec.transformation.WhichOneof( + on_demand_feature_view_proto.spec.feature_transformation.WhichOneof( "transformation" ) == "user_defined_function" ): transformation = OnDemandPandasTransformation.from_proto( - on_demand_feature_view_proto.spec.transformation.user_defined_function + on_demand_feature_view_proto.spec.feature_transformation.user_defined_function ) elif ( - on_demand_feature_view_proto.spec.transformation.WhichOneof( + on_demand_feature_view_proto.spec.feature_transformation.WhichOneof( "transformation" ) == "on_demand_substrait_transformation" ): transformation = OnDemandSubstraitTransformation.from_proto( - on_demand_feature_view_proto.spec.transformation.on_demand_substrait_transformation + on_demand_feature_view_proto.spec.feature_transformation.on_demand_substrait_transformation ) else: raise Exception("At least one transformation type needs to be provided") diff --git a/sdk/python/feast/on_demand_pandas_transformation.py b/sdk/python/feast/on_demand_pandas_transformation.py index ac995241d1..48f5263051 100644 --- a/sdk/python/feast/on_demand_pandas_transformation.py +++ b/sdk/python/feast/on_demand_pandas_transformation.py @@ -4,7 +4,7 @@ import pandas as pd from feast.protos.feast.core.Transformation_pb2 import ( - UserDefinedFunction as UserDefinedFunctionProto, + UserDefinedFunctionV2 as UserDefinedFunctionProto, ) diff --git a/sdk/python/feast/on_demand_substrait_transformation.py b/sdk/python/feast/on_demand_substrait_transformation.py index 5df24dfac7..0666739125 100644 --- a/sdk/python/feast/on_demand_substrait_transformation.py +++ b/sdk/python/feast/on_demand_substrait_transformation.py @@ -3,7 +3,7 @@ import pyarrow.substrait as substrait # type: ignore # noqa from feast.protos.feast.core.Transformation_pb2 import ( - OnDemandSubstraitTransformation as OnDemandSubstraitTransformationProto, + OnDemandSubstraitTransformationV2 as OnDemandSubstraitTransformationProto, ) diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index 13e209a03a..f9f7fe6eee 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -16,6 +16,9 @@ from feast.feature_view import FeatureView from feast.field import Field from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( + UserDefinedFunction as UserDefinedFunctionProto, +) from feast.protos.feast.core.StreamFeatureView_pb2 import ( StreamFeatureView as StreamFeatureViewProto, ) @@ -23,7 +26,10 @@ StreamFeatureViewSpec as StreamFeatureViewSpecProto, ) from feast.protos.feast.core.Transformation_pb2 import ( - UserDefinedFunction as UserDefinedFunctionProto, + FeatureTransformationV2 as FeatureTransformationProto, +) +from feast.protos.feast.core.Transformation_pb2 import ( + UserDefinedFunctionV2 as UserDefinedFunctionProtoV2, ) warnings.simplefilter("once", RuntimeWarning) @@ -171,19 +177,30 @@ def to_proto(self): stream_source_proto = self.stream_source.to_proto() stream_source_proto.data_source_class_type = f"{self.stream_source.__class__.__module__}.{self.stream_source.__class__.__name__}" - udf_proto = None + udf_proto, feature_transformation = None, None if self.udf: udf_proto = UserDefinedFunctionProto( name=self.udf.__name__, body=dill.dumps(self.udf, recurse=True), body_text=self.udf_string, ) + udf_proto_v2 = UserDefinedFunctionProtoV2( + name=self.udf.__name__, + body=dill.dumps(self.udf, recurse=True), + body_text=self.udf_string, + ) + + feature_transformation = FeatureTransformationProto( + user_defined_function=udf_proto_v2, + ) + spec = StreamFeatureViewSpecProto( name=self.name, entities=self.entities, entity_columns=[field.to_proto() for field in self.entity_columns], features=[field.to_proto() for field in self.schema], user_defined_function=udf_proto, + feature_transformation=feature_transformation, description=self.description, tags=self.tags, owner=self.owner, diff --git a/sdk/python/tests/unit/diff/test_registry_diff.py b/sdk/python/tests/unit/diff/test_registry_diff.py index 1dd0abce2c..dc373792a3 100644 --- a/sdk/python/tests/unit/diff/test_registry_diff.py +++ b/sdk/python/tests/unit/diff/test_registry_diff.py @@ -139,11 +139,11 @@ def post_changed(inputs: pd.DataFrame) -> pd.DataFrame: assert feast_object_diffs.feast_object_property_diffs[0].property_name == "name" assert ( feast_object_diffs.feast_object_property_diffs[1].property_name - == "transformation.name" + == "feature_transformation.name" ) assert ( feast_object_diffs.feast_object_property_diffs[2].property_name - == "transformation.body_text" + == "feature_transformation.body_text" ) diff --git a/sdk/python/tests/unit/test_sql_registry.py b/sdk/python/tests/unit/test_sql_registry.py index 094b8967c1..90c9f9e75c 100644 --- a/sdk/python/tests/unit/test_sql_registry.py +++ b/sdk/python/tests/unit/test_sql_registry.py @@ -382,8 +382,8 @@ def location_features_from_push(inputs: pd.DataFrame) -> pd.DataFrame: @pytest.mark.parametrize( "sql_registry", [ - lazy_fixture("mysql_registry"), - lazy_fixture("pg_registry"), + # lazy_fixture("mysql_registry"), + # lazy_fixture("pg_registry"), lazy_fixture("sqlite_registry"), ], ) diff --git a/ui/src/pages/feature-views/OnDemandFeatureViewOverviewTab.tsx b/ui/src/pages/feature-views/OnDemandFeatureViewOverviewTab.tsx index 4ac23de030..dc34528e7e 100644 --- a/ui/src/pages/feature-views/OnDemandFeatureViewOverviewTab.tsx +++ b/ui/src/pages/feature-views/OnDemandFeatureViewOverviewTab.tsx @@ -57,7 +57,7 @@ const OnDemandFeatureViewOverviewTab = ({ - {data?.spec?.transformation?.userDefinedFunction?.bodyText} + {data?.spec?.feature_transformation?.userDefinedFunction?.bodyText} From 21c1c35b277bcd348d4cbe111bc3cda39b68e8fa Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 19 Mar 2024 20:56:45 -0400 Subject: [PATCH 10/18] caught a bug and fixed the linter --- sdk/python/feast/diff/registry_diff.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/diff/registry_diff.py b/sdk/python/feast/diff/registry_diff.py index 6f0421b203..a5bb5e7326 100644 --- a/sdk/python/feast/diff/registry_diff.py +++ b/sdk/python/feast/diff/registry_diff.py @@ -147,8 +147,8 @@ def diff_registry_objects( if _field.name == "transformation": current_spec = cast(OnDemandFeatureViewSpec, current_spec) new_spec = cast(OnDemandFeatureViewSpec, new_spec) - current_udf = current_spec.transformation.user_defined_function - new_udf = new_spec.transformation.user_defined_function + current_udf = current_spec.feature_transformation.user_defined_function + new_udf = new_spec.feature_transformation.user_defined_function for _udf_field in current_udf.DESCRIPTOR.fields: if _udf_field.name == "body": continue From ff57b4536b1559910d8039b21d59c64541340d15 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 19 Mar 2024 21:02:54 -0400 Subject: [PATCH 11/18] actually linted Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/diff/registry_diff.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/diff/registry_diff.py b/sdk/python/feast/diff/registry_diff.py index a5bb5e7326..1620b30b2b 100644 --- a/sdk/python/feast/diff/registry_diff.py +++ b/sdk/python/feast/diff/registry_diff.py @@ -147,7 +147,9 @@ def diff_registry_objects( if _field.name == "transformation": current_spec = cast(OnDemandFeatureViewSpec, current_spec) new_spec = cast(OnDemandFeatureViewSpec, new_spec) - current_udf = current_spec.feature_transformation.user_defined_function + current_udf = ( + current_spec.feature_transformation.user_defined_function + ) new_udf = new_spec.feature_transformation.user_defined_function for _udf_field in current_udf.DESCRIPTOR.fields: if _udf_field.name == "body": From 6de6fcc9d8fe54d5782a01423acd8e7ab683c887 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 19 Mar 2024 21:57:38 -0400 Subject: [PATCH 12/18] updated ui component Signed-off-by: Francisco Javier Arceo --- ui/src/pages/feature-views/OnDemandFeatureViewOverviewTab.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ui/src/pages/feature-views/OnDemandFeatureViewOverviewTab.tsx b/ui/src/pages/feature-views/OnDemandFeatureViewOverviewTab.tsx index dc34528e7e..aac3f6ac5b 100644 --- a/ui/src/pages/feature-views/OnDemandFeatureViewOverviewTab.tsx +++ b/ui/src/pages/feature-views/OnDemandFeatureViewOverviewTab.tsx @@ -57,7 +57,7 @@ const OnDemandFeatureViewOverviewTab = ({ - {data?.spec?.feature_transformation?.userDefinedFunction?.bodyText} + {data?.spec?.featureTransformation?.userDefinedFunction?.bodyText} From 1212c1200b0c3d1219f37690a9d628fe65df185f Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Wed, 20 Mar 2024 09:52:00 -0400 Subject: [PATCH 13/18] accidentally commented out fixtures --- sdk/python/tests/unit/test_sql_registry.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/tests/unit/test_sql_registry.py b/sdk/python/tests/unit/test_sql_registry.py index 90c9f9e75c..094b8967c1 100644 --- a/sdk/python/tests/unit/test_sql_registry.py +++ b/sdk/python/tests/unit/test_sql_registry.py @@ -382,8 +382,8 @@ def location_features_from_push(inputs: pd.DataFrame) -> pd.DataFrame: @pytest.mark.parametrize( "sql_registry", [ - # lazy_fixture("mysql_registry"), - # lazy_fixture("pg_registry"), + lazy_fixture("mysql_registry"), + lazy_fixture("pg_registry"), lazy_fixture("sqlite_registry"), ], ) From 2883d1b4c7903339503609e1254caceb11b85e03 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Fri, 22 Mar 2024 11:19:34 -0400 Subject: [PATCH 14/18] Updated Signed-off-by: Francisco Javier Arceo --- protos/feast/core/OnDemandFeatureView.proto | 4 +-- protos/feast/core/StreamFeatureView.proto | 2 +- sdk/python/feast/diff/registry_diff.py | 34 +++++++------------ .../feast/infra/registry/base_registry.py | 11 +++++- sdk/python/feast/on_demand_feature_view.py | 6 ++++ .../tests/unit/diff/test_registry_diff.py | 1 + .../tests/unit/test_on_demand_feature_view.py | 4 +++ 7 files changed, 37 insertions(+), 25 deletions(-) diff --git a/protos/feast/core/OnDemandFeatureView.proto b/protos/feast/core/OnDemandFeatureView.proto index 92c953c16d..cd3ceba150 100644 --- a/protos/feast/core/OnDemandFeatureView.proto +++ b/protos/feast/core/OnDemandFeatureView.proto @@ -50,8 +50,8 @@ message OnDemandFeatureViewSpec { map sources = 4; oneof transformation { - UserDefinedFunction user_defined_function = 5; - OnDemandSubstraitTransformation on_demand_substrait_transformation = 9; + UserDefinedFunction user_defined_function = 5 [deprecated = true]; + OnDemandSubstraitTransformation on_demand_substrait_transformation = 9 [deprecated = true]; } // Oneof with {user_defined_function, on_demand_substrait_transformation} FeatureTransformationV2 feature_transformation = 10; diff --git a/protos/feast/core/StreamFeatureView.proto b/protos/feast/core/StreamFeatureView.proto index a54024af4b..cb7da0faf3 100644 --- a/protos/feast/core/StreamFeatureView.proto +++ b/protos/feast/core/StreamFeatureView.proto @@ -78,7 +78,7 @@ message StreamFeatureViewSpec { bool online = 12; // Serialized function that is encoded in the streamfeatureview - UserDefinedFunction user_defined_function = 13; + UserDefinedFunction user_defined_function = 13 [deprecated = true]; // Mode of execution diff --git a/sdk/python/feast/diff/registry_diff.py b/sdk/python/feast/diff/registry_diff.py index 1620b30b2b..120f5d697a 100644 --- a/sdk/python/feast/diff/registry_diff.py +++ b/sdk/python/feast/diff/registry_diff.py @@ -1,3 +1,4 @@ +import warnings from dataclasses import dataclass from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, TypeVar, cast @@ -144,32 +145,23 @@ def diff_registry_objects( if _field.name in FIELDS_TO_IGNORE: continue elif getattr(current_spec, _field.name) != getattr(new_spec, _field.name): - if _field.name == "transformation": + # TODO: Delete "transformation" after we've safely deprecated it from the proto + if _field.name in ["transformation", "feature_transformation"]: + warnings.warn( + "transformation will be deprecated in the future please use feature_transformation instead.", + DeprecationWarning, + ) current_spec = cast(OnDemandFeatureViewSpec, current_spec) new_spec = cast(OnDemandFeatureViewSpec, new_spec) - current_udf = ( + # Check if the old proto is populated and use that if it is + deprecated_udf = current_spec.user_defined_function + feature_transformation_udf = ( current_spec.feature_transformation.user_defined_function ) - new_udf = new_spec.feature_transformation.user_defined_function - for _udf_field in current_udf.DESCRIPTOR.fields: - if _udf_field.name == "body": - continue - if getattr(current_udf, _udf_field.name) != getattr( - new_udf, _udf_field.name - ): - transition = TransitionType.UPDATE - property_diffs.append( - PropertyDiff( - _field.name + "." + _udf_field.name, - getattr(current_udf, _udf_field.name), - getattr(new_udf, _udf_field.name), - ) - ) - elif _field.name == "feature_transformation": - current_spec = cast(OnDemandFeatureViewSpec, current_spec) - new_spec = cast(OnDemandFeatureViewSpec, new_spec) current_udf = ( - current_spec.feature_transformation.user_defined_function + deprecated_udf + if deprecated_udf.body_text != "" + else feature_transformation_udf ) new_udf = new_spec.feature_transformation.user_defined_function for _udf_field in current_udf.DESCRIPTOR.fields: diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index 80c1c9e07c..d3d82a80b0 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import json +import warnings from abc import ABC, abstractmethod from collections import defaultdict from datetime import datetime @@ -662,9 +663,16 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]: key=lambda on_demand_feature_view: on_demand_feature_view.name, ): odfv_dict = self._message_to_sorted_dict(on_demand_feature_view.to_proto()) + # We are logging a warning because the registry object may be read from a proto that is not updated + # i.e., we have to submit dual writes but in order to ensure the read behavior succeeds we have to load + # both objects to compare any changes in the registry + warnings.warn( + "We will be deprecating the usage of spec.userDefinedFunction in a future release please upgrade cautiously.", + DeprecationWarning, + ) odfv_dict["spec"]["featureTransformation"]["userDefinedFunction"][ "body" - ] = on_demand_feature_view.transformation.udf_string + ] = on_demand_feature_view.feature_transformation.udf_string registry_dict["onDemandFeatureViews"].append(odfv_dict) for request_feature_view in sorted( self.list_request_feature_views(project=project), @@ -683,6 +691,7 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]: "body" ] = stream_feature_view.udf_string registry_dict["streamFeatureViews"].append(sfv_dict) + for saved_dataset in sorted( self.list_saved_datasets(project=project), key=lambda item: item.name ): diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index ead66ab5fb..9711d336e8 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -66,6 +66,7 @@ class OnDemandFeatureView(BaseFeatureView): source_feature_view_projections: Dict[str, FeatureViewProjection] source_request_sources: Dict[str, RequestSource] transformation: Union[OnDemandPandasTransformation] + feature_transformation: Union[OnDemandPandasTransformation] description: str tags: Dict[str, str] owner: str @@ -86,6 +87,7 @@ def __init__( # noqa: C901 udf: Optional[FunctionType] = None, udf_string: str = "", transformation: Optional[Union[OnDemandPandasTransformation]] = None, + feature_transformation: Optional[Union[OnDemandPandasTransformation]] = None, description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", @@ -104,6 +106,7 @@ def __init__( # noqa: C901 dataframes as inputs. udf_string (deprecated): The source code version of the udf (for diffing and displaying in Web UI) transformation: The user defined transformation. + feature_transformation: The user defined transformation. description (optional): A human-readable description. tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the on demand feature view, typically the email @@ -142,6 +145,7 @@ def __init__( # noqa: C901 ] = odfv_source.projection self.transformation = transformation + self.feature_transformation = self.transformation @property def proto_class(self) -> Type[OnDemandFeatureViewProto]: @@ -154,6 +158,7 @@ def __copy__(self): sources=list(self.source_feature_view_projections.values()) + list(self.source_request_sources.values()), transformation=self.transformation, + feature_transformation=self.transformation, description=self.description, tags=self.tags, owner=self.owner, @@ -175,6 +180,7 @@ def __eq__(self, other): != other.source_feature_view_projections or self.source_request_sources != other.source_request_sources or self.transformation != other.transformation + or self.feature_transformation != other.feature_transformation ): return False diff --git a/sdk/python/tests/unit/diff/test_registry_diff.py b/sdk/python/tests/unit/diff/test_registry_diff.py index dc373792a3..c209f1e0e0 100644 --- a/sdk/python/tests/unit/diff/test_registry_diff.py +++ b/sdk/python/tests/unit/diff/test_registry_diff.py @@ -137,6 +137,7 @@ def post_changed(inputs: pd.DataFrame) -> pd.DataFrame: # if no code is changed assert len(feast_object_diffs.feast_object_property_diffs) == 3 assert feast_object_diffs.feast_object_property_diffs[0].property_name == "name" + # Note we should only now be looking at changes for the feature_transformation field assert ( feast_object_diffs.feast_object_property_diffs[1].property_name == "feature_transformation.name" diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index 66d02c65d1..21c07b9d7f 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -129,3 +129,7 @@ def test_hash(): assert on_demand_feature_view_5.transformation == OnDemandPandasTransformation( udf2, "udf2 source code" ) + assert ( + on_demand_feature_view_5.feature_transformation + == on_demand_feature_view_5.transformation + ) From ea5e559703f499acf68e86e917126894749f11d1 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Fri, 22 Mar 2024 11:59:19 -0400 Subject: [PATCH 15/18] incrementing protos --- protos/feast/core/Transformation.proto | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/protos/feast/core/Transformation.proto b/protos/feast/core/Transformation.proto index 89bbf3daae..cde2833fa4 100644 --- a/protos/feast/core/Transformation.proto +++ b/protos/feast/core/Transformation.proto @@ -23,8 +23,8 @@ message UserDefinedFunctionV2 { message FeatureTransformationV2 { // Note this Transformation starts at 5 for backwards compatibility oneof transformation { - UserDefinedFunctionV2 user_defined_function = 5; - OnDemandSubstraitTransformationV2 on_demand_substrait_transformation = 6; + UserDefinedFunctionV2 user_defined_function = 1; + OnDemandSubstraitTransformationV2 on_demand_substrait_transformation = 2; } } From ef0795b55190147b6fd67cbd4af933d21e05e540 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Fri, 22 Mar 2024 15:03:14 -0400 Subject: [PATCH 16/18] updated tests Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/on_demand_feature_view.py | 10 +++ .../tests/unit/test_on_demand_feature_view.py | 62 +++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 9711d336e8..f776164985 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -270,6 +270,8 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): "transformation" ) == "user_defined_function" + and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text + != "" ): transformation = OnDemandPandasTransformation.from_proto( on_demand_feature_view_proto.spec.feature_transformation.user_defined_function @@ -283,6 +285,14 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): transformation = OnDemandSubstraitTransformation.from_proto( on_demand_feature_view_proto.spec.feature_transformation.on_demand_substrait_transformation ) + elif ( + hasattr(on_demand_feature_view_proto.spec, "user_defined_function") + and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text + == "" + ): + transformation = OnDemandPandasTransformation.from_proto( + on_demand_feature_view_proto.spec.user_defined_function + ) else: raise Exception("At least one transformation type needs to be provided") diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index 21c07b9d7f..b83449519f 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -133,3 +133,65 @@ def test_hash(): on_demand_feature_view_5.feature_transformation == on_demand_feature_view_5.transformation ) + + +@pytest.mark.filterwarnings("ignore:udf and udf_string parameters are deprecated") +def test_from_proto_backwards_compatable_udf(): + file_source = FileSource(name="my-file-source", path="test.parquet") + feature_view = FeatureView( + name="my-feature-view", + entities=[], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=file_source, + ) + sources = [feature_view] + on_demand_feature_view = OnDemandFeatureView( + name="my-on-demand-feature-view", + sources=sources, + schema=[ + Field(name="output1", dtype=Float32), + Field(name="output2", dtype=Float32), + ], + transformation=OnDemandPandasTransformation( + udf=udf1, udf_string="udf1 source code" + ), + ) + + # We need a proto with the "udf1 source code" in the user_defined_function.body_text + # and to populate it in feature_transformation + proto = on_demand_feature_view.to_proto() + assert ( + on_demand_feature_view.transformation.udf_string + == proto.spec.feature_transformation.user_defined_function.body_text + ) + # Because of the current set of code this is just confirming it is empty + assert proto.spec.user_defined_function.body_text == "" + assert proto.spec.user_defined_function.body == b"" + assert proto.spec.user_defined_function.name == "" + + # Assuming we pull it from the registry we set it to the feature_transformation proto values + proto.spec.user_defined_function.name = ( + proto.spec.feature_transformation.user_defined_function.name + ) + proto.spec.user_defined_function.body = ( + proto.spec.feature_transformation.user_defined_function.body + ) + proto.spec.user_defined_function.body_text = ( + proto.spec.feature_transformation.user_defined_function.body_text + ) + + # And now we're going to null the feature_transformation proto object before reserializing the entire proto + # proto.spec.user_defined_function.body_text = on_demand_feature_view.transformation.udf_string + proto.spec.feature_transformation.user_defined_function.name = "" + proto.spec.feature_transformation.user_defined_function.body = b"" + proto.spec.feature_transformation.user_defined_function.body_text = "" + + # And now we expect the to get the same object back under feature_transformation + reserialized_proto = OnDemandFeatureView.from_proto(proto) + assert ( + reserialized_proto.feature_transformation.udf_string + == on_demand_feature_view.feature_transformation.udf_string + ) From 529acace538c312d1e745272f5f56c5861fe4e8e Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Fri, 22 Mar 2024 16:59:04 -0400 Subject: [PATCH 17/18] fixed linting issue and made backwards compatible Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/on_demand_feature_view.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index f776164985..61e55bb0c0 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -30,6 +30,9 @@ from feast.protos.feast.core.Transformation_pb2 import ( FeatureTransformationV2 as FeatureTransformationProto, ) +from feast.protos.feast.core.Transformation_pb2 import ( + UserDefinedFunctionV2 as UserDefinedFunctionProto, +) from feast.type_map import ( feast_value_type_to_pandas_type, python_type_to_feast_value_type, @@ -290,8 +293,13 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text == "" ): + backwards_compatible_udf = UserDefinedFunctionProto( + name=on_demand_feature_view_proto.spec.user_defined_function.name, + body=on_demand_feature_view_proto.spec.user_defined_function.body, + body_text=on_demand_feature_view_proto.spec.user_defined_function.body_text, + ) transformation = OnDemandPandasTransformation.from_proto( - on_demand_feature_view_proto.spec.user_defined_function + user_defined_function_proto=backwards_compatible_udf, ) else: raise Exception("At least one transformation type needs to be provided") From 6748fe74bc8a92ff76dab159eb607e4215639f41 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Fri, 22 Mar 2024 23:11:20 -0400 Subject: [PATCH 18/18] added more tests Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/stream_feature_view.py | 14 + sdk/python/tests/unit/test_feature_views.py | 205 +------------- .../tests/unit/test_stream_feature_view.py | 252 ++++++++++++++++++ 3 files changed, 267 insertions(+), 204 deletions(-) create mode 100644 sdk/python/tests/unit/test_stream_feature_view.py diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index f9f7fe6eee..e8741a75fe 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -15,6 +15,7 @@ from feast.entity import Entity from feast.feature_view import FeatureView from feast.field import Field +from feast.on_demand_pandas_transformation import OnDemandPandasTransformation from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( UserDefinedFunction as UserDefinedFunctionProto, @@ -79,6 +80,7 @@ class StreamFeatureView(FeatureView): materialization_intervals: List[Tuple[datetime, datetime]] udf: Optional[FunctionType] udf_string: Optional[str] + feature_transformation: Optional[OnDemandPandasTransformation] def __init__( self, @@ -97,6 +99,7 @@ def __init__( timestamp_field: Optional[str] = "", udf: Optional[FunctionType] = None, udf_string: Optional[str] = "", + feature_transformation: Optional[Union[OnDemandPandasTransformation]] = None, ): if not flags_helper.is_test(): warnings.warn( @@ -124,6 +127,7 @@ def __init__( self.timestamp_field = timestamp_field or "" self.udf = udf self.udf_string = udf_string + self.feature_transformation = feature_transformation super().__init__( name=name, @@ -237,6 +241,11 @@ def from_proto(cls, sfv_proto): if sfv_proto.spec.HasField("user_defined_function") else None ) + feature_transformation = ( + sfv_proto.spec.feature_transformation.user_defined_function.body_text + if sfv_proto.spec.HasField("feature_transformation") + else None + ) stream_feature_view = cls( name=sfv_proto.spec.name, description=sfv_proto.spec.description, @@ -255,6 +264,7 @@ def from_proto(cls, sfv_proto): mode=sfv_proto.spec.mode, udf=udf, udf_string=udf_string, + feature_transformation=feature_transformation, aggregations=[ Aggregation.from_proto(agg_proto) for agg_proto in sfv_proto.spec.aggregations @@ -311,6 +321,7 @@ def __copy__(self): timestamp_field=self.timestamp_field, source=self.stream_source if self.stream_source else self.batch_source, udf=self.udf, + feature_transformation=self.feature_transformation, ) fv.entities = self.entities fv.features = copy.copy(self.features) @@ -360,6 +371,9 @@ def decorator(user_function): schema=schema, udf=user_function, udf_string=udf_string, + feature_transformation=OnDemandPandasTransformation( + user_function, udf_string + ), description=description, tags=tags, online=online, diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index 2ad9680703..0220d1a8a9 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -1,22 +1,16 @@ -import copy from datetime import timedelta import pytest from typeguard import TypeCheckError -from feast.aggregation import Aggregation from feast.batch_feature_view import BatchFeatureView from feast.data_format import AvroFormat -from feast.data_source import KafkaSource, PushSource +from feast.data_source import KafkaSource from feast.entity import Entity from feast.feature_view import FeatureView from feast.field import Field from feast.infra.offline_stores.file_source import FileSource -from feast.protos.feast.core.StreamFeatureView_pb2 import ( - StreamFeatureView as StreamFeatureViewProto, -) from feast.protos.feast.types.Value_pb2 import ValueType -from feast.stream_feature_view import StreamFeatureView, stream_feature_view from feast.types import Float32 @@ -65,169 +59,10 @@ def test_create_batch_feature_view(): ) -def test_create_stream_feature_view(): - stream_source = KafkaSource( - name="kafka", - timestamp_field="event_timestamp", - kafka_bootstrap_servers="", - message_format=AvroFormat(""), - topic="topic", - batch_source=FileSource(path="some path"), - ) - StreamFeatureView( - name="test kafka stream feature view", - entities=[], - ttl=timedelta(days=30), - source=stream_source, - aggregations=[], - ) - - push_source = PushSource( - name="push source", batch_source=FileSource(path="some path") - ) - StreamFeatureView( - name="test push source feature view", - entities=[], - ttl=timedelta(days=30), - source=push_source, - aggregations=[], - ) - - with pytest.raises(TypeError): - StreamFeatureView( - name="test batch feature view", - entities=[], - ttl=timedelta(days=30), - aggregations=[], - ) - - with pytest.raises(ValueError): - StreamFeatureView( - name="test batch feature view", - entities=[], - ttl=timedelta(days=30), - source=FileSource(path="some path"), - aggregations=[], - ) - - def simple_udf(x: int): return x + 3 -def test_stream_feature_view_serialization(): - entity = Entity(name="driver_entity", join_keys=["test_key"]) - stream_source = KafkaSource( - name="kafka", - timestamp_field="event_timestamp", - kafka_bootstrap_servers="", - message_format=AvroFormat(""), - topic="topic", - batch_source=FileSource(path="some path"), - ) - - sfv = StreamFeatureView( - name="test kafka stream feature view", - entities=[entity], - ttl=timedelta(days=30), - owner="test@example.com", - online=True, - schema=[Field(name="dummy_field", dtype=Float32)], - description="desc", - aggregations=[ - Aggregation( - column="dummy_field", - function="max", - time_window=timedelta(days=1), - ) - ], - timestamp_field="event_timestamp", - mode="spark", - source=stream_source, - udf=simple_udf, - tags={}, - ) - - sfv_proto = sfv.to_proto() - - new_sfv = StreamFeatureView.from_proto(sfv_proto=sfv_proto) - assert new_sfv == sfv - - -def test_stream_feature_view_udfs(): - entity = Entity(name="driver_entity", join_keys=["test_key"]) - stream_source = KafkaSource( - name="kafka", - timestamp_field="event_timestamp", - kafka_bootstrap_servers="", - message_format=AvroFormat(""), - topic="topic", - batch_source=FileSource(path="some path"), - ) - - @stream_feature_view( - entities=[entity], - ttl=timedelta(days=30), - owner="test@example.com", - online=True, - schema=[Field(name="dummy_field", dtype=Float32)], - description="desc", - aggregations=[ - Aggregation( - column="dummy_field", - function="max", - time_window=timedelta(days=1), - ) - ], - timestamp_field="event_timestamp", - source=stream_source, - ) - def pandas_udf(pandas_df): - import pandas as pd - - assert type(pandas_df) == pd.DataFrame - df = pandas_df.transform(lambda x: x + 10, axis=1) - return df - - import pandas as pd - - df = pd.DataFrame({"A": [1, 2, 3], "B": [10, 20, 30]}) - sfv = pandas_udf - sfv_proto = sfv.to_proto() - new_sfv = StreamFeatureView.from_proto(sfv_proto) - new_df = new_sfv.udf(df) - - expected_df = pd.DataFrame({"A": [11, 12, 13], "B": [20, 30, 40]}) - - assert new_df.equals(expected_df) - - -def test_stream_feature_view_initialization_with_optional_fields_omitted(): - entity = Entity(name="driver_entity", join_keys=["test_key"]) - stream_source = KafkaSource( - name="kafka", - timestamp_field="event_timestamp", - kafka_bootstrap_servers="", - message_format=AvroFormat(""), - topic="topic", - batch_source=FileSource(path="some path"), - ) - - sfv = StreamFeatureView( - name="test kafka stream feature view", - entities=[entity], - schema=[], - description="desc", - timestamp_field="event_timestamp", - source=stream_source, - tags={}, - ) - sfv_proto = sfv.to_proto() - - new_sfv = StreamFeatureView.from_proto(sfv_proto=sfv_proto) - assert new_sfv == sfv - - def test_hash(): file_source = FileSource(name="my-file-source", path="test.parquet") feature_view_1 = FeatureView( @@ -282,41 +117,3 @@ def test_hash(): def test_field_types(): with pytest.raises(TypeCheckError): Field(name="name", dtype=ValueType.INT32) - - -def test_stream_feature_view_proto_type(): - stream_source = KafkaSource( - name="kafka", - timestamp_field="event_timestamp", - kafka_bootstrap_servers="", - message_format=AvroFormat(""), - topic="topic", - batch_source=FileSource(path="some path"), - ) - sfv = StreamFeatureView( - name="test stream featureview proto class", - entities=[], - ttl=timedelta(days=30), - source=stream_source, - aggregations=[], - ) - assert sfv.proto_class is StreamFeatureViewProto - - -def test_stream_feature_view_copy(): - stream_source = KafkaSource( - name="kafka", - timestamp_field="event_timestamp", - kafka_bootstrap_servers="", - message_format=AvroFormat(""), - topic="topic", - batch_source=FileSource(path="some path"), - ) - sfv = StreamFeatureView( - name="test stream featureview proto class", - entities=[], - ttl=timedelta(days=30), - source=stream_source, - aggregations=[], - ) - assert sfv == copy.copy(sfv) diff --git a/sdk/python/tests/unit/test_stream_feature_view.py b/sdk/python/tests/unit/test_stream_feature_view.py new file mode 100644 index 0000000000..b53f9a593a --- /dev/null +++ b/sdk/python/tests/unit/test_stream_feature_view.py @@ -0,0 +1,252 @@ +import copy +from datetime import timedelta + +import pytest + +from feast.aggregation import Aggregation +from feast.batch_feature_view import BatchFeatureView +from feast.data_format import AvroFormat +from feast.data_source import KafkaSource, PushSource +from feast.entity import Entity +from feast.field import Field +from feast.infra.offline_stores.file_source import FileSource +from feast.protos.feast.core.StreamFeatureView_pb2 import ( + StreamFeatureView as StreamFeatureViewProto, +) +from feast.stream_feature_view import StreamFeatureView, stream_feature_view +from feast.types import Float32 + + +def test_create_batch_feature_view(): + batch_source = FileSource(path="some path") + BatchFeatureView( + name="test batch feature view", + entities=[], + ttl=timedelta(days=30), + source=batch_source, + ) + + with pytest.raises(TypeError): + BatchFeatureView( + name="test batch feature view", entities=[], ttl=timedelta(days=30) + ) + + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + with pytest.raises(ValueError): + BatchFeatureView( + name="test batch feature view", + entities=[], + ttl=timedelta(days=30), + source=stream_source, + ) + + +def test_create_stream_feature_view(): + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + StreamFeatureView( + name="test kafka stream feature view", + entities=[], + ttl=timedelta(days=30), + source=stream_source, + aggregations=[], + ) + + push_source = PushSource( + name="push source", batch_source=FileSource(path="some path") + ) + StreamFeatureView( + name="test push source feature view", + entities=[], + ttl=timedelta(days=30), + source=push_source, + aggregations=[], + ) + + with pytest.raises(TypeError): + StreamFeatureView( + name="test batch feature view", + entities=[], + ttl=timedelta(days=30), + aggregations=[], + ) + + with pytest.raises(ValueError): + StreamFeatureView( + name="test batch feature view", + entities=[], + ttl=timedelta(days=30), + source=FileSource(path="some path"), + aggregations=[], + ) + + +def simple_udf(x: int): + return x + 3 + + +def test_stream_feature_view_serialization(): + entity = Entity(name="driver_entity", join_keys=["test_key"]) + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + + sfv = StreamFeatureView( + name="test kafka stream feature view", + entities=[entity], + ttl=timedelta(days=30), + owner="test@example.com", + online=True, + schema=[Field(name="dummy_field", dtype=Float32)], + description="desc", + aggregations=[ + Aggregation( + column="dummy_field", + function="max", + time_window=timedelta(days=1), + ) + ], + timestamp_field="event_timestamp", + mode="spark", + source=stream_source, + udf=simple_udf, + tags={}, + ) + + sfv_proto = sfv.to_proto() + + new_sfv = StreamFeatureView.from_proto(sfv_proto=sfv_proto) + assert new_sfv == sfv + assert ( + sfv_proto.spec.feature_transformation.user_defined_function.name == "simple_udf" + ) + + +def test_stream_feature_view_udfs(): + entity = Entity(name="driver_entity", join_keys=["test_key"]) + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + + @stream_feature_view( + entities=[entity], + ttl=timedelta(days=30), + owner="test@example.com", + online=True, + schema=[Field(name="dummy_field", dtype=Float32)], + description="desc", + aggregations=[ + Aggregation( + column="dummy_field", + function="max", + time_window=timedelta(days=1), + ) + ], + timestamp_field="event_timestamp", + source=stream_source, + ) + def pandas_udf(pandas_df): + import pandas as pd + + assert type(pandas_df) == pd.DataFrame + df = pandas_df.transform(lambda x: x + 10, axis=1) + return df + + import pandas as pd + + df = pd.DataFrame({"A": [1, 2, 3], "B": [10, 20, 30]}) + sfv = pandas_udf + sfv_proto = sfv.to_proto() + new_sfv = StreamFeatureView.from_proto(sfv_proto) + new_df = new_sfv.udf(df) + + expected_df = pd.DataFrame({"A": [11, 12, 13], "B": [20, 30, 40]}) + + assert new_df.equals(expected_df) + + +def test_stream_feature_view_initialization_with_optional_fields_omitted(): + entity = Entity(name="driver_entity", join_keys=["test_key"]) + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + + sfv = StreamFeatureView( + name="test kafka stream feature view", + entities=[entity], + schema=[], + description="desc", + timestamp_field="event_timestamp", + source=stream_source, + tags={}, + ) + sfv_proto = sfv.to_proto() + + new_sfv = StreamFeatureView.from_proto(sfv_proto=sfv_proto) + assert new_sfv == sfv + + +def test_stream_feature_view_proto_type(): + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + sfv = StreamFeatureView( + name="test stream featureview proto class", + entities=[], + ttl=timedelta(days=30), + source=stream_source, + aggregations=[], + ) + assert sfv.proto_class is StreamFeatureViewProto + + +def test_stream_feature_view_copy(): + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + sfv = StreamFeatureView( + name="test stream featureview proto class", + entities=[], + ttl=timedelta(days=30), + source=stream_source, + aggregations=[], + ) + assert sfv == copy.copy(sfv)