Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Use get_any_feature_view in online flow instead of listing them #4545

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 47 additions & 44 deletions sdk/python/feast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@
if typing.TYPE_CHECKING:
from feast.feature_service import FeatureService
from feast.feature_view import FeatureView
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView


APPLICATION_NAME = "feast-dev/feast"
USER_AGENT = "{}/{}".format(APPLICATION_NAME, get_version())

Expand Down Expand Up @@ -755,61 +755,64 @@ def _list_feature_views(


def _get_feature_views_to_use(
registry,
registry: "BaseRegistry",
project,
features: Optional[Union[List[str], "FeatureService"]],
allow_cache=False,
hide_dummy_entity: bool = True,
) -> Tuple[List["FeatureView"], List["OnDemandFeatureView"]]:
from feast.feature_service import FeatureService

fvs = {
fv.name: fv
for fv in [
*_list_feature_views(registry, project, allow_cache, hide_dummy_entity),
*registry.list_stream_feature_views(
project=project, allow_cache=allow_cache
),
]
}

od_fvs = {
fv.name: fv
for fv in registry.list_on_demand_feature_views(
project=project, allow_cache=allow_cache
)
}
from feast.feature_view import DUMMY_ENTITY_NAME
from feast.on_demand_feature_view import OnDemandFeatureView

if isinstance(features, FeatureService):
fvs_to_use, od_fvs_to_use = [], []
for fv_name, projection in [
feature_views = [
(projection.name, projection)
for projection in features.feature_view_projections
]:
if fv_name in fvs:
fvs_to_use.append(fvs[fv_name].with_projection(copy.copy(projection)))
elif fv_name in od_fvs:
odfv = od_fvs[fv_name].with_projection(copy.copy(projection))
od_fvs_to_use.append(odfv)
# Let's make sure to include an FVs which the ODFV requires Features from.
for projection in odfv.source_feature_view_projections.values():
fv = fvs[projection.name].with_projection(copy.copy(projection))
if fv not in fvs_to_use:
fvs_to_use.append(fv)
else:
raise ValueError(
f"The provided feature service {features.name} contains a reference to a feature view"
f"{fv_name} which doesn't exist. Please make sure that you have created the feature view"
f'{fv_name} and that you have registered it by running "apply".'
)
views_to_use = (fvs_to_use, od_fvs_to_use)
]
else:
views_to_use = (
[*fvs.values()],
[*od_fvs.values()],
)
assert features is not None
feature_views = [(feature.split(":")[0], None) for feature in features] # type: ignore[misc]

fvs_to_use, od_fvs_to_use = [], []
for name, projection in feature_views:
fv = registry.get_any_feature_view(name, project, allow_cache)

if isinstance(fv, OnDemandFeatureView):
od_fvs_to_use.append(
fv.with_projection(copy.copy(projection)) if projection else fv
)

for source_projection in fv.source_feature_view_projections.values():
source_fv = registry.get_any_feature_view(
source_projection.name, project, allow_cache
)
# TODO better way to handler dummy entities
if (
hide_dummy_entity
and source_fv.entities # type: ignore[attr-defined]
and source_fv.entities[0] == DUMMY_ENTITY_NAME # type: ignore[attr-defined]
):
source_fv.entities = [] # type: ignore[attr-defined]
source_fv.entity_columns = [] # type: ignore[attr-defined]

if source_fv not in fvs_to_use:
fvs_to_use.append(
source_fv.with_projection(copy.copy(source_projection))
)
else:
if (
hide_dummy_entity
and fv.entities # type: ignore[attr-defined]
and fv.entities[0] == DUMMY_ENTITY_NAME # type: ignore[attr-defined]
):
fv.entities = [] # type: ignore[attr-defined]
fv.entity_columns = [] # type: ignore[attr-defined]
fvs_to_use.append(
fv.with_projection(copy.copy(projection)) if projection else fv
)

return views_to_use
return (fvs_to_use, od_fvs_to_use)


def _get_online_request_context(
Expand Down
Loading