Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix feature service inference logic #3089

Merged
Merged
12 changes: 9 additions & 3 deletions sdk/python/feast/base_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,16 @@ def __getitem__(self, item):

cp = self.__copy__()
if self.features:
feature_name_to_feature = {
feature.name: feature for feature in self.features
}
referenced_features = []
for feature in self.features:
if feature.name in item:
referenced_features.append(feature)
for feature in item:
if feature not in feature_name_to_feature:
raise ValueError(
f"Feature {feature} does not exist in this feature view."
)
referenced_features.append(feature_name_to_feature[feature])
cp.projection.features = referenced_features
else:
cp.projection.desired_features = item
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,13 @@ def __init__(self, expected_column_name: str):
)


class FeatureViewMissingDuringFeatureServiceInference(Exception):
def __init__(self, feature_view_name: str, feature_service_name: str):
super().__init__(
f"Missing {feature_view_name} feature view during inference for {feature_service_name} feature service."
)


class InvalidEntityType(Exception):
def __init__(self, entity_type: type):
super().__init__(
Expand Down
65 changes: 51 additions & 14 deletions sdk/python/feast/feature_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typeguard import typechecked

from feast.base_feature_view import BaseFeatureView
from feast.errors import FeatureViewMissingDuringFeatureServiceInference
from feast.feature_logging import LoggingConfig
from feast.feature_view import FeatureView
from feast.feature_view_projection import FeatureViewProjection
Expand Down Expand Up @@ -85,32 +86,68 @@ def __init__(
if isinstance(feature_grouping, BaseFeatureView):
self.feature_view_projections.append(feature_grouping.projection)

def infer_features(self, fvs_to_update: Optional[Dict[str, FeatureView]] = None):
def infer_features(self, fvs_to_update: Dict[str, FeatureView]):
"""
Infers the features for the projections of this feature service, and updates this feature
service in place.

This method is necessary since feature services may rely on feature views which require
feature inference.

Args:
fvs_to_update: A mapping of feature view names to corresponding feature views that
contains all the feature views necessary to run inference.
"""
for feature_grouping in self._features:
if isinstance(feature_grouping, BaseFeatureView):
# For feature services that depend on an unspecified feature view, apply inferred schema
if fvs_to_update and feature_grouping.name in fvs_to_update:
if feature_grouping.projection.desired_features:
desired_features = set(
feature_grouping.projection.desired_features
)
projection = feature_grouping.projection

if projection.desired_features:
# The projection wants to select a specific set of inferred features.
# Example: FeatureService(features=[fv[["inferred_feature"]]]), where
# 'fv' is a feature view that was defined without a schema.
if feature_grouping.name in fvs_to_update:
# First we validate that the selected features have actually been inferred.
desired_features = set(projection.desired_features)
actual_features = set(
[
f.name
for f in fvs_to_update[feature_grouping.name].features
]
)
assert desired_features.issubset(actual_features)
# We need to set the features for the projection at this point so we ensure we're starting with
# an empty list.
feature_grouping.projection.features = []

# Then we extract the selected features and add them to the projection.
projection.features = []
for f in fvs_to_update[feature_grouping.name].features:
if f.name in desired_features:
feature_grouping.projection.features.append(f)
projection.features.append(f)
else:
feature_grouping.projection.features = fvs_to_update[
feature_grouping.name
].features
raise FeatureViewMissingDuringFeatureServiceInference(
adchia marked this conversation as resolved.
Show resolved Hide resolved
feature_view_name=feature_grouping.name,
feature_service_name=self.name,
)

continue

if projection.features:
# The projection has already selected features from a feature view with a
# known schema, so no action needs to be taken.
# Example: FeatureService(features=[fv[["existing_feature"]]]), where
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also applies if the user is selecting from a fv with a known schema, but doesn't select features right?

# 'existing_feature' was defined as part of the schema of 'fv'.
# Example: FeatureService(features=[fv]), where 'fv' was defined with a schema.
continue

# The projection wants to select all possible inferred features.
# Example: FeatureService(features=[fv]), where 'fv' is a feature view that
# was defined without a schema.
if feature_grouping.name in fvs_to_update:
projection.features = fvs_to_update[feature_grouping.name].features
else:
raise FeatureViewMissingDuringFeatureServiceInference(
feature_view_name=feature_grouping.name,
feature_service_name=self.name,
)
else:
raise ValueError(
f"The feature service {self.name} has been provided with an invalid type "
Expand Down
4 changes: 4 additions & 0 deletions sdk/python/feast/feature_view_projection.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class FeatureViewProjection:
name: The unique name of the feature view from which this projection is created.
name_alias: An optional alias for the name.
features: The list of features represented by the feature view projection.
desired_features: The list of features that this feature view projection intends to select.
If empty, the projection intends to select all features. This attribute is only used
for feature service inference. It should only be set if the underlying feature view
is not ready to be projected, i.e. still needs to go through feature inference.
join_key_map: A map to modify join key columns during retrieval of this feature
view projection.
"""
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from datetime import timedelta

from feast import Entity, FeatureService, FeatureView, Field, FileSource
from feast.types import Float32, Int32, Int64

driver_hourly_stats = FileSource(
path="data/driver_stats.parquet", # Fake path
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

driver = Entity(
name="driver_id",
)

driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=1),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
Field(name="driver_id", dtype=Int32),
],
online=True,
source=driver_hourly_stats,
tags={},
)

global_daily_stats = FileSource(
path="data/global_stats.parquet", # Fake path
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

global_stats_feature_view = FeatureView(
name="global_daily_stats",
entities=[],
ttl=timedelta(days=1),
schema=[
Field(name="num_rides", dtype=Int32),
Field(name="avg_ride_length", dtype=Float32),
],
online=True,
source=global_daily_stats,
tags={},
)

all_stats_service = FeatureService(
name="all_stats",
features=[driver_hourly_stats_view, global_stats_feature_view],
tags={"release": "production"},
)

some_stats_service = FeatureService(
name="some_stats",
features=[
driver_hourly_stats_view[["conv_rate"]],
global_stats_feature_view[["num_rides"]],
],
tags={"release": "production"},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from datetime import timedelta

from feast import Entity, FeatureService, FeatureView, FileSource

driver_hourly_stats = FileSource(
path="%PARQUET_PATH%", # placeholder to be replaced by the test
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

driver = Entity(
name="driver_id",
)

driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=1),
online=True,
source=driver_hourly_stats,
tags={},
)

global_daily_stats = FileSource(
path="%PARQUET_PATH_GLOBAL%", # placeholder to be replaced by the test
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

global_stats_feature_view = FeatureView(
name="global_daily_stats",
entities=[],
ttl=timedelta(days=1),
online=True,
source=global_daily_stats,
tags={},
)

all_stats_service = FeatureService(
name="all_stats",
features=[driver_hourly_stats_view, global_stats_feature_view],
tags={"release": "production"},
)

some_stats_service = FeatureService(
name="some_stats",
features=[
driver_hourly_stats_view[["conv_rate"]],
global_stats_feature_view[["num_rides"]],
],
tags={"release": "production"},
)
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
60 changes: 60 additions & 0 deletions sdk/python/tests/unit/infra/test_inference_unit_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ def test_feature_view_inference_on_feature_columns(simple_dataset_1):


def test_update_feature_services_with_inferred_features(simple_dataset_1):
"""
Tests that a feature service that references feature views without specified features will
be updated with the correct projections after feature inference.
"""
with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source:
entity1 = Entity(name="test1", join_keys=["id_join_key"])
feature_view_1 = FeatureView(
Expand Down Expand Up @@ -338,4 +342,60 @@ def test_update_feature_services_with_inferred_features(simple_dataset_1):
assert len(feature_service.feature_view_projections[1].features) == 3


def test_update_feature_services_with_specified_features(simple_dataset_1):
"""
Tests that a feature service that references feature views with specified features will
have the correct projections both before and after feature inference.
"""
with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source:
entity1 = Entity(name="test1", join_keys=["id_join_key"])
feature_view_1 = FeatureView(
name="test1",
entities=[entity1],
schema=[
Field(name="float_col", dtype=Float32),
Field(name="id_join_key", dtype=Int64),
],
source=file_source,
)
feature_view_2 = FeatureView(
name="test2",
entities=[entity1],
schema=[
Field(name="int64_col", dtype=Int64),
Field(name="id_join_key", dtype=Int64),
],
source=file_source,
)

feature_service = FeatureService(
name="fs_1", features=[feature_view_1[["float_col"]], feature_view_2]
)
assert len(feature_service.feature_view_projections) == 2
assert len(feature_service.feature_view_projections[0].features) == 1
assert len(feature_service.feature_view_projections[0].desired_features) == 0
assert len(feature_service.feature_view_projections[1].features) == 1
assert len(feature_service.feature_view_projections[1].desired_features) == 0

update_feature_views_with_inferred_features_and_entities(
[feature_view_1, feature_view_2],
[entity1],
RepoConfig(
provider="local", project="test", entity_key_serialization_version=2
),
)
assert len(feature_view_1.features) == 1
assert len(feature_view_2.features) == 1

feature_service.infer_features(
fvs_to_update={
feature_view_1.name: feature_view_1,
feature_view_2.name: feature_view_2,
}
)

assert len(feature_service.feature_view_projections[0].features) == 1
assert len(feature_service.feature_view_projections[1].features) == 1


# TODO(felixwang9817): Add tests that interact with field mapping.
Empty file.
Loading