Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infer features for on demand feature views, support multiple output features #1845

Merged
merged 13 commits into from
Sep 9, 2021
153 changes: 71 additions & 82 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,89 +127,78 @@ pprint(feature_vector)

## 📦 Functionality and Roadmap

The list below contains Feast functionality that contributors are planning to develop
* Items below may indicate if it is planned for development or whether development is in progress.
* We welcome contribution to all items in the roadmap, especially those that are not currently planned or in development.
The list below contains the functionality that contributors are planning to develop for Feast

* Items below that are in development \(or planned for development\) will be indicated in parentheses.
* We welcome contribution to all items in the roadmap!
* Want to influence our roadmap and prioritization? Submit your feedback to [this form](https://docs.google.com/forms/d/e/1FAIpQLSfa1nRQ0sKz-JEFnMMCi4Jseag_yDssO_3nV9qMfxfrkil-wA/viewform).
* Want to speak to a Feast contributor? We are more than happy to jump on a quick call. Please schedule a time using [Calendly](https://calendly.com/d/x2ry-g5bb/meet-with-feast-team).

- **Data Sources**
- [x] [Redshift source](https://docs.feast.dev/reference/data-sources/redshift)
- [x] [BigQuery source](https://docs.feast.dev/reference/data-sources/bigquery)
- [x] [Parquet file source](https://docs.feast.dev/reference/data-sources/file)
- [ ] Kafka source (Planned for Q4 2021)
- [ ] HTTP source
- [ ] Snowflake source
- [ ] Synapse source


- **Offline Stores**
- [x] [Redshift](https://docs.feast.dev/reference/offline-stores/redshift)
- [x] [BigQuery](https://docs.feast.dev/reference/offline-stores/bigquery)
- [x] [In-memory / Pandas](https://docs.feast.dev/reference/offline-stores/file)
- [x] [Custom offline store support](https://docs.feast.dev/how-to-guides/adding-a-new-offline-store)
- [x] [Hive (community maintained)](https://github.com/baineng/feast-hive)
- [ ] Snowflake
- [ ] Synapse


- **Online Stores**
- [x] [DynamoDB](https://docs.feast.dev/reference/online-stores/dynamodb)
- [x] [Redis](https://docs.feast.dev/reference/online-stores/redis)
- [x] [Datastore](https://docs.feast.dev/reference/online-stores/datastore)
- [x] [SQLite](https://docs.feast.dev/reference/online-stores/sqlite)
- [x] [Custom online store support](https://docs.feast.dev/how-to-guides/adding-support-for-a-new-online-store)
- [ ] Postgres
- [ ] Bigtable
- [ ] Cassandra


- **Streaming**
- [ ] [Custom streaming ingestion job support](https://docs.feast.dev/how-to-guides/creating-a-custom-provider)
- [ ] Streaming ingestion on AWS (Planned for Q4 2021)
- [ ] Streaming ingestion on GCP


- **Feature Engineering**
- [ ] On-demand Transformations (Development in progress. See [RFC](https://docs.google.com/document/d/1lgfIw0Drc65LpaxbUu49RCeJgMew547meSJttnUqz7c/edit#))
- [ ] Batch transformation (SQL)
- [ ] Streaming transformation


- **Deployments**
- [ ] AWS Lambda (Development in progress. See [RFC](https://docs.google.com/document/d/1eZWKWzfBif66LDN32IajpaG-j82LSHCCOzY6R7Ax7MI/edit))
- [ ] Cloud Run
- [ ] Kubernetes
- [ ] KNative


- **Feature Serving**
- [x] Python Client
- [ ] REST Feature Server (Python) (Development in progress. See [RFC](https://docs.google.com/document/d/1iXvFhAsJ5jgAhPOpTdB3j-Wj1S9x3Ev_Wr6ZpnLzER4/edit))
- [ ] gRPC Feature Server (Java) (See [#1497](https://github.com/feast-dev/feast/issues/1497))
- [ ] Java Client
- [ ] Go Client
- [ ] Push API
- [ ] Delete API
- [ ] Feature Logging (for training)


- **Data Quality Management**
- [ ] Data profiling and validation (Great Expectations) (Planned for Q4 2021)
- [ ] Metric production
- [ ] Training-serving skew detection
- [ ] Drift detection
- [ ] Alerting


- **Feature Discovery and Governance**
- [x] Python SDK for browsing feature registry
- [x] CLI for browsing feature registry
- [x] Model-centric feature tracking (feature services)
- [ ] REST API for browsing feature registry
- [ ] Feast Web UI (Planned for Q4 2021)
- [ ] Feature versioning
- [ ] Amundsen integration
* Want to speak to a Feast contributor? We are more than happy to jump on a call. Please schedule a time using [Calendly](https://calendly.com/d/x2ry-g5bb/meet-with-feast-team).



* **Data Sources**
* [x] [Redshift source](https://docs.feast.dev/reference/data-sources/redshift)
* [x] [BigQuery source](https://docs.feast.dev/reference/data-sources/bigquery)
* [x] [Parquet file source](https://docs.feast.dev/reference/data-sources/file)
* [ ] Kafka source \(Planned for Q4 2021\)
* [ ] HTTP source
* [ ] Snowflake source
* [ ] Synapse source
* **Offline Stores**
* [x] [Redshift](https://docs.feast.dev/reference/offline-stores/redshift)
* [x] [BigQuery](https://docs.feast.dev/reference/offline-stores/bigquery)
* [x] [In-memory / Pandas](https://docs.feast.dev/reference/offline-stores/file)
* [x] [Custom offline store support](https://docs.feast.dev/how-to-guides/adding-a-new-offline-store)
* [x] [Hive \(community maintained\)](https://github.com/baineng/feast-hive)
* [ ] Snowflake
* [ ] Synapse
* **Online Stores**
* [x] [DynamoDB](https://docs.feast.dev/reference/online-stores/dynamodb)
* [x] [Redis](https://docs.feast.dev/reference/online-stores/redis)
* [x] [Datastore](https://docs.feast.dev/reference/online-stores/datastore)
* [x] [SQLite](https://docs.feast.dev/reference/online-stores/sqlite)
* [x] [Custom online store support](https://docs.feast.dev/how-to-guides/adding-support-for-a-new-online-store)
* [ ] Postgres
* [ ] Bigtable
* [ ] Cassandra
* **Streaming**
* [ ] [Custom streaming ingestion job support](https://docs.feast.dev/how-to-guides/creating-a-custom-provider)
* [ ] Streaming ingestion on AWS \(Planned for Q4 2021\)
* [ ] Streaming ingestion on GCP
* **Feature Engineering**
* [ ] On-demand Transformations \(Development in progress. See [RFC](https://docs.google.com/document/d/1lgfIw0Drc65LpaxbUu49RCeJgMew547meSJttnUqz7c/edit#)\)
* [ ] Batch transformation \(SQL\)
* [ ] Streaming transformation
* **Deployments**
* [ ] AWS Lambda \(Development in progress. See [RFC](https://docs.google.com/document/d/1eZWKWzfBif66LDN32IajpaG-j82LSHCCOzY6R7Ax7MI/edit)\)
* [ ] Cloud Run
* [ ] Kubernetes
* [ ] KNative
* **Feature Serving**
* [x] Python Client
* [ ] REST Feature Server \(Python\) \(Development in progress. See [RFC](https://docs.google.com/document/d/1iXvFhAsJ5jgAhPOpTdB3j-Wj1S9x3Ev_Wr6ZpnLzER4/edit)\)
* [ ] gRPC Feature Server \(Java\) \(See [\#1497](https://github.com/feast-dev/feast/issues/1497)\)
* [ ] Java Client
* [ ] Go Client
* [ ] Push API
* [ ] Delete API
* [ ] Feature Logging \(for training\)
* **Data Quality Management**
* [ ] Data profiling and validation \(Great Expectations\) \(Planned for Q4 2021\)
* [ ] Metric production
* [ ] Training-serving skew detection
* [ ] Drift detection
* [ ] Alerting
* **Feature Discovery and Governance**
* [x] Python SDK for browsing feature registry
* [x] CLI for browsing feature registry
* [x] Model-centric feature tracking \(feature services\)
* [ ] REST API for browsing feature registry
* [ ] Feast Web UI \(Planned for Q4 2021\)
* [ ] Feature versioning
* [ ] Amundsen integration



## 🎓 Important Resources

Expand Down
8 changes: 8 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ def __init__(self, feature_refs_collisions: List[str], full_feature_names: bool)
)


class SpecifiedFeaturesNotPresentError(Exception):
def __init__(self, specified_features: List[str], feature_view_name: str):
features = ", ".join(specified_features)
super().__init__(
f"Explicitly specified features {features} not found in inferred list of features for '{feature_view_name}'"
)


class FeastOnlineStoreInvalidName(Exception):
def __init__(self, online_store_class_name: str):
super().__init__(
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __lt__(self, other):

def __repr__(self):
# return string representation of the reference
return self.name
return f"{self.name}-{self.dtype}"

def __str__(self):
# readable string of the reference
Expand Down
117 changes: 84 additions & 33 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ def apply(
for view in views_to_update:
view.infer_features_from_batch_source(self.config)

for odfv in odfvs_to_update:
odfv.infer_features_from_batch_source(self.config)

if len(views_to_update) + len(entities_to_update) + len(
services_to_update
) + len(odfvs_to_update) != len(objects):
Expand Down Expand Up @@ -533,9 +536,17 @@ def get_historical_features(
_feature_refs = self._get_features(features, feature_refs)

all_feature_views = self.list_feature_views()
feature_views = list(
view for view, _ in _group_feature_refs(_feature_refs, all_feature_views)
all_on_demand_feature_views = self._registry.list_on_demand_feature_views(
project=self.project
)

# TODO(achal): _group_feature_refs returns the on demand feature views, but it's no passed into the provider.
# This is a weird interface quirk - we should revisit the `get_historical_features` to
# pass in the on demand feature views as well.
fvs, _ = _group_feature_refs(
_feature_refs, all_feature_views, all_on_demand_feature_views
)
feature_views = list(view for view, _ in fvs)

_validate_feature_refs(_feature_refs, full_feature_names)

Expand Down Expand Up @@ -771,8 +782,14 @@ def get_online_features(
all_feature_views = self._list_feature_views(
allow_cache=True, hide_dummy_entity=False
)
all_on_demand_feature_views = self._registry.list_on_demand_feature_views(
project=self.project, allow_cache=True
)

_validate_feature_refs(_feature_refs, full_feature_names)
grouped_refs = _group_feature_refs(_feature_refs, all_feature_views)
grouped_refs, _ = _group_feature_refs(
_feature_refs, all_feature_views, all_on_demand_feature_views
)
feature_views = list(view for view, _ in grouped_refs)
entityless_case = DUMMY_ENTITY_NAME in [
entity_name
Expand Down Expand Up @@ -861,32 +878,52 @@ def _augment_response_with_on_demand_transforms(
initial_response: OnlineResponse,
result_rows: List[GetOnlineFeaturesResponse.FieldValues],
) -> OnlineResponse:
all_on_demand_feature_views = self._registry.list_on_demand_feature_views(
project=self.project, allow_cache=True
)
all_on_demand_feature_views = {
view.name: view
for view in self._registry.list_on_demand_feature_views(
project=self.project, allow_cache=True
)
}
all_odfv_feature_names = all_on_demand_feature_views.keys()

if len(all_on_demand_feature_views) == 0:
return initial_response
initial_response_df = initial_response.to_df()

odfv_feature_refs = defaultdict(list)
for feature_ref in feature_refs:
view_name, feature_name = (
feature_ref.split(":")[0],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: view_name, feature_name = feature_ref.split(":") I tihnk should work

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool

feature_ref.split(":")[1],
)
if view_name in all_odfv_feature_names:
odfv_feature_refs[view_name].append(feature_name)

# Apply on demand transformations
for odfv in all_on_demand_feature_views:
feature_ref = odfv.name
if feature_ref in feature_refs:
transformed_features_df = odfv.get_transformed_features_df(
full_feature_names, initial_response_df
)
for row_idx in range(len(result_rows)):
result_row = result_rows[row_idx]
# TODO(adchia): support multiple output features in an ODFV, which requires different naming
# conventions
result_row.fields[odfv.name].CopyFrom(
python_value_to_proto_value(
transformed_features_df[odfv.features[0].name].values[
row_idx
]
)
for odfv_name, _feature_refs in odfv_feature_refs.items():
odfv = all_on_demand_feature_views[odfv_name]
transformed_features_df = odfv.get_transformed_features_df(
full_feature_names, initial_response_df
)
for row_idx in range(len(result_rows)):
result_row = result_rows[row_idx]

selected_subset = [
f for f in transformed_features_df.columns if f in _feature_refs
]

for transformed_feature in selected_subset:
transformed_feature_name = (
f"{odfv.name}__{transformed_feature}"
if full_feature_names
else transformed_feature
)
proto_value = python_value_to_proto_value(
transformed_features_df[transformed_feature].values[row_idx]
)
result_row.fields[transformed_feature_name].CopyFrom(proto_value)
result_row.statuses[
feature_ref
transformed_feature_name
] = GetOnlineFeaturesResponse.FieldStatus.PRESENT
return OnlineResponse(GetOnlineFeaturesResponse(field_values=result_rows))

Expand Down Expand Up @@ -939,36 +976,50 @@ def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = F


def _group_feature_refs(
features: Union[List[str], FeatureService], all_feature_views: List[FeatureView]
) -> List[Tuple[FeatureView, List[str]]]:
features: Union[List[str], FeatureService],
all_feature_views: List[FeatureView],
all_on_demand_feature_views: List[OnDemandFeatureView],
) -> Tuple[
List[Tuple[FeatureView, List[str]]], List[Tuple[OnDemandFeatureView, List[str]]]
]:
""" Get list of feature views and corresponding feature names based on feature references"""

# view name to view proto
view_index = {view.name: view for view in all_feature_views}

# on demand view to on demand view proto
on_demand_view_index = {view.name: view for view in all_on_demand_feature_views}

# view name to feature names
views_features = defaultdict(list)

# on demand view name to feature names
on_demand_view_features = defaultdict(list)

if isinstance(features, list) and isinstance(features[0], str):
for ref in features:
if ":" not in ref:
# This is an on demand feature view ref
continue
view_name, feat_name = ref.split(":")
if view_name not in view_index:
if view_name in view_index:
views_features[view_name].append(feat_name)
elif view_name in on_demand_view_index:
on_demand_view_features[view_name].append(feat_name)
else:
raise FeatureViewNotFoundException(view_name)
views_features[view_name].append(feat_name)
elif isinstance(features, FeatureService):
for feature_projection in features.features:
projected_features = feature_projection.features
views_features[feature_projection.name].extend(
[f.name for f in projected_features]
)

result = []
fvs_result: List[Tuple[FeatureView, List[str]]] = []
odfvs_result: List[Tuple[OnDemandFeatureView, List[str]]] = []

for view_name, feature_names in views_features.items():
result.append((view_index[view_name], feature_names))
return result
fvs_result.append((view_index[view_name], feature_names))
for view_name, feature_names in on_demand_view_features.items():
odfvs_result.append((on_demand_view_index[view_name], feature_names))
return fvs_result, odfvs_result


def _get_feature_refs_from_feature_services(
Expand Down
9 changes: 7 additions & 2 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,13 @@ def get_historical_features(
raise ValueError(
f"Please provide an entity_df with a column named {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL} representing the time of events."
)
feature_views_to_features = _get_requested_feature_views_to_features_dict(
feature_refs, feature_views
(
feature_views_to_features,
on_demand_feature_views_to_features,
) = _get_requested_feature_views_to_features_dict(
feature_refs,
feature_views,
registry.list_on_demand_feature_views(config.project),
)

# Create lazy function that is only called from the RetrievalJob object
Expand Down
Loading