Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enable stream feature view materialization #2798

Merged
merged 7 commits into from
Jun 16, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 73 additions & 39 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,19 @@ def _list_feature_views(
feature_views.append(fv)
return feature_views

def _list_stream_feature_views(
self, allow_cache: bool = False, hide_dummy_entity: bool = True,
) -> List[StreamFeatureView]:
stream_feature_views = []
for sfv in self._registry.list_stream_feature_views(
self.project, allow_cache=allow_cache
):
if hide_dummy_entity and sfv.entities[0] == DUMMY_ENTITY_NAME:
sfv.entities = []
sfv.entity_columns = []
stream_feature_views.append(sfv)
return stream_feature_views

@log_exceptions_and_usage
def list_on_demand_feature_views(
self, allow_cache: bool = False
Expand All @@ -289,9 +302,7 @@ def list_stream_feature_views(
Returns:
A list of stream feature views.
"""
return self._registry.list_stream_feature_views(
self.project, allow_cache=allow_cache
)
return self._list_stream_feature_views(allow_cache)

@log_exceptions_and_usage
def list_data_sources(self, allow_cache: bool = False) -> List[DataSource]:
Expand Down Expand Up @@ -558,6 +569,9 @@ def _make_inferences(
update_feature_views_with_inferred_features_and_entities(
views_to_update, entities + entities_to_update, self.config
)
update_feature_views_with_inferred_features_and_entities(
sfvs_to_update, entities + entities_to_update, self.config
)
# TODO(kevjumba): Update schema inferrence
for sfv in sfvs_to_update:
if not sfv.schema:
Expand All @@ -574,6 +588,53 @@ def _make_inferences(
for feature_service in feature_services_to_update:
feature_service.infer_features(fvs_to_update=fvs_to_update_map)

def _get_feature_views_to_materialize(
self, feature_views: Optional[List[str]],
) -> List[FeatureView]:
"""
Returns the list of feature views that should be materialized.

If no feature views are specified, all feature views will be returned.

Args:
feature_views: List of names of feature views to materialize.

Raises:
FeatureViewNotFoundException: One of the specified feature views could not be found.
ValueError: One of the specified feature views is not configured for materialization.
"""
feature_views_to_materialize: List[FeatureView] = []

if feature_views is None:
feature_views_to_materialize = self._list_feature_views(
hide_dummy_entity=False
)
feature_views_to_materialize = [
fv for fv in feature_views_to_materialize if fv.online
]
stream_feature_views_to_materialize = self._list_stream_feature_views(
hide_dummy_entity=False
)
feature_views_to_materialize += [
sfv for sfv in stream_feature_views_to_materialize if sfv.online
]
else:
for name in feature_views:
try:
feature_view = self._get_feature_view(name, hide_dummy_entity=False)
except FeatureViewNotFoundException:
feature_view = self._get_stream_feature_view(
name, hide_dummy_entity=False
)

if not feature_view.online:
raise ValueError(
f"FeatureView {feature_view.name} is not configured to be served online."
)
feature_views_to_materialize.append(feature_view)

return feature_views_to_materialize

@log_exceptions_and_usage
def _plan(
self, desired_repo_contents: RepoContents
Expand Down Expand Up @@ -873,8 +934,8 @@ def apply(

self._get_provider().update_infra(
project=self.project,
tables_to_delete=views_to_delete if not partial else [],
tables_to_keep=views_to_update,
tables_to_delete=views_to_delete + sfvs_to_delete if not partial else [],
tables_to_keep=views_to_update + sfvs_to_update,
entities_to_delete=entities_to_delete if not partial else [],
entities_to_keep=entities_to_update,
partial=partial,
Expand Down Expand Up @@ -1151,23 +1212,9 @@ def materialize_incremental(
<BLANKLINE>
...
"""
feature_views_to_materialize: List[FeatureView] = []
if feature_views is None:
feature_views_to_materialize = self._list_feature_views(
hide_dummy_entity=False
)
feature_views_to_materialize = [
fv for fv in feature_views_to_materialize if fv.online
]
else:
for name in feature_views:
feature_view = self._get_feature_view(name, hide_dummy_entity=False)
if not feature_view.online:
raise ValueError(
f"FeatureView {feature_view.name} is not configured to be served online."
)
feature_views_to_materialize.append(feature_view)

feature_views_to_materialize = self._get_feature_views_to_materialize(
feature_views
)
_print_materialization_log(
None,
end_date,
Expand Down Expand Up @@ -1258,23 +1305,9 @@ def materialize(
f"The given start_date {start_date} is greater than the given end_date {end_date}."
)

feature_views_to_materialize: List[FeatureView] = []
if feature_views is None:
feature_views_to_materialize = self._list_feature_views(
hide_dummy_entity=False
)
feature_views_to_materialize = [
fv for fv in feature_views_to_materialize if fv.online
]
else:
for name in feature_views:
feature_view = self._get_feature_view(name, hide_dummy_entity=False)
if not feature_view.online:
raise ValueError(
f"FeatureView {feature_view.name} is not configured to be served online."
)
feature_views_to_materialize.append(feature_view)

feature_views_to_materialize = self._get_feature_views_to_materialize(
feature_views
)
_print_materialization_log(
start_date,
end_date,
Expand Down Expand Up @@ -1327,6 +1360,7 @@ def push(
from feast.data_source import PushSource

all_fvs = self.list_feature_views(allow_cache=allow_registry_cache)
all_fvs += self.list_stream_feature_views(allow_cache=allow_registry_cache)

fvs_with_push_sources = {
fv
Expand Down
4 changes: 4 additions & 0 deletions sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ def update_feature_views_with_inferred_features_and_entities(
other columns except designated timestamp columns are considered to be feature columns. If
the feature view already has features, feature inference is skipped.

Note that this inference logic currently does not take transformations into account. For
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably clarify transformations and aggregations.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdym?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like clarify that stream feature views doesn't take into account udf or aggregations(transformations is a little ambiguous)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

example, even if a stream feature view has a transformation, this method assumes that the
batch source contains transformed data with the correct final schema.

Args:
fvs: The feature views to be updated.
entities: A list containing entities associated with the feature views.
Expand Down
24 changes: 24 additions & 0 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,30 @@ def apply_materialization(
self.commit()
return

for idx, existing_stream_feature_view_proto in enumerate(
self.cached_registry_proto.stream_feature_views
):
if (
existing_stream_feature_view_proto.spec.name == feature_view.name
and existing_stream_feature_view_proto.spec.project == project
):
existing_stream_feature_view = StreamFeatureView.from_proto(
existing_stream_feature_view_proto
)
existing_stream_feature_view.materialization_intervals.append(
(start_date, end_date)
)
existing_stream_feature_view.last_updated_timestamp = datetime.utcnow()
stream_feature_view_proto = existing_stream_feature_view.to_proto()
stream_feature_view_proto.spec.project = project
del self.cached_registry_proto.stream_feature_views[idx]
self.cached_registry_proto.stream_feature_views.append(
stream_feature_view_proto
)
if commit:
self.commit()
return

raise FeatureViewNotFoundException(feature_view.name, project)

def list_feature_views(
Expand Down
68 changes: 44 additions & 24 deletions sdk/python/feast/stream_feature_view.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import copy
import functools
import warnings
from datetime import timedelta
from datetime import datetime, timedelta
from types import MethodType
from typing import Dict, List, Optional, Union
from typing import Dict, List, Optional, Tuple, Union

import dill
from google.protobuf.duration_pb2 import Duration
Expand Down Expand Up @@ -42,26 +42,42 @@ class StreamFeatureView(FeatureView):
schemas with Feast.

Attributes:
name: str. The unique name of the stream feature view.
entities: Union[List[Entity], List[str]]. List of entities or entity join keys.
ttl: timedelta. The amount of time this group of features lives. A ttl of 0 indicates that
name: The unique name of the stream feature view.
entities: List of entities or entity join keys.
ttl: The amount of time this group of features lives. A ttl of 0 indicates that
this group of features lives forever. Note that large ttl's or a ttl of 0
can result in extremely computationally intensive queries.
tags: Dict[str, str]. A dictionary of key-value pairs to store arbitrary metadata.
online: bool. Defines whether this stream feature view is used in online feature retrieval.
description: str. A human-readable description.
schema: The schema of the feature view, including feature, timestamp, and entity
columns. If not specified, can be inferred from the underlying data source.
source: DataSource. The stream source of data where this group of features is stored.
aggregations: List of aggregations registered with the stream feature view.
mode: The mode of execution.
timestamp_field: Must be specified if aggregations are specified. Defines the timestamp column on which to aggregate windows.
online: Defines whether this stream feature view is used in online feature retrieval.
description: A human-readable description.
tags: A dictionary of key-value pairs to store arbitrary metadata.
owner: The owner of the on demand feature view, typically the email of the primary
maintainer.
schema: List[Field] The schema of the feature view, including feature, timestamp, and entity
columns. If not specified, can be inferred from the underlying data source.
source: DataSource. The stream source of data where this group of features
is stored.
aggregations (optional): List[Aggregation]. List of aggregations registered with the stream feature view.
mode(optional): str. The mode of execution.
timestamp_field (optional): Must be specified if aggregations are specified. Defines the timestamp column on which to aggregate windows.
udf (optional): MethodType The user defined transformation function. This transformation function should have all of the corresponding imports imported within the function.
udf: The user defined transformation function. This transformation function should have all of the corresponding imports imported within the function.
"""

name: str
entities: List[str]
ttl: Optional[timedelta]
source: DataSource
schema: List[Field]
entity_columns: List[Field]
features: List[Field]
online: bool
description: str
tags: Dict[str, str]
owner: str
aggregations: List[Aggregation]
mode: str
timestamp_field: str
materialization_intervals: List[Tuple[datetime, datetime]]
udf: Optional[MethodType]

def __init__(
self,
*,
Expand Down Expand Up @@ -222,7 +238,7 @@ def from_proto(cls, sfv_proto):
if sfv_proto.spec.HasField("user_defined_function")
else None
)
sfv_feature_view = cls(
stream_feature_view = cls(
name=sfv_proto.spec.name,
description=sfv_proto.spec.description,
tags=dict(sfv_proto.spec.tags),
Expand All @@ -247,23 +263,27 @@ def from_proto(cls, sfv_proto):
)

if batch_source:
sfv_feature_view.batch_source = batch_source
stream_feature_view.batch_source = batch_source

if stream_source:
sfv_feature_view.stream_source = stream_source
stream_feature_view.stream_source = stream_source

sfv_feature_view.entities = list(sfv_proto.spec.entities)
stream_feature_view.entities = list(sfv_proto.spec.entities)

sfv_feature_view.features = [
stream_feature_view.features = [
Field.from_proto(field_proto) for field_proto in sfv_proto.spec.features
]
stream_feature_view.entity_columns = [
Field.from_proto(field_proto)
for field_proto in sfv_proto.spec.entity_columns
]

if sfv_proto.meta.HasField("created_timestamp"):
sfv_feature_view.created_timestamp = (
stream_feature_view.created_timestamp = (
sfv_proto.meta.created_timestamp.ToDatetime()
)
if sfv_proto.meta.HasField("last_updated_timestamp"):
sfv_feature_view.last_updated_timestamp = (
stream_feature_view.last_updated_timestamp = (
sfv_proto.meta.last_updated_timestamp.ToDatetime()
)

Expand All @@ -275,7 +295,7 @@ def from_proto(cls, sfv_proto):
)
)

return sfv_feature_view
return stream_feature_view

def __copy__(self):
fv = StreamFeatureView(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Field,
OnDemandFeatureView,
PushSource,
StreamFeatureView,
ValueType,
)
from feast.data_source import DataSource, RequestSource
Expand Down Expand Up @@ -297,7 +298,7 @@ def create_pushable_feature_view(batch_source: DataSource):
push_source = PushSource(
name="location_stats_push_source", batch_source=batch_source,
)
return FeatureView(
return StreamFeatureView(
name="pushable_location_stats",
entities=[location()],
schema=[
Expand Down
Loading