Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly exclude entities from feature inference #2048

Merged
merged 3 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from feast.inference import (
update_data_sources_with_inferred_event_timestamp_col,
update_entities_with_inferred_types_from_feature_views,
update_feature_views_with_inferred_features,
)
from feast.infra.provider import Provider, RetrievalJob, get_provider
from feast.on_demand_feature_view import OnDemandFeatureView
Expand Down Expand Up @@ -479,8 +480,9 @@ def apply(
[view.batch_source for view in views_to_update], self.config
)

for view in views_to_update:
view.infer_features_from_batch_source(self.config)
update_feature_views_with_inferred_features(
views_to_update, entities_to_update, self.config
)

for odfv in odfvs_to_update:
odfv.infer_features()
Expand Down
69 changes: 0 additions & 69 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import re
import warnings
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Type, Union
Expand All @@ -22,7 +21,6 @@
from feast import utils
from feast.base_feature_view import BaseFeatureView
from feast.data_source import DataSource
from feast.errors import RegistryInferenceFailure
from feast.feature import Feature
from feast.feature_view_projection import FeatureViewProjection
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
Expand All @@ -35,7 +33,6 @@
from feast.protos.feast.core.FeatureView_pb2 import (
MaterializationInterval as MaterializationIntervalProto,
)
from feast.repo_config import RepoConfig
from feast.usage import log_exceptions
from feast.value_type import ValueType

Expand Down Expand Up @@ -406,69 +403,3 @@ def most_recent_end_time(self) -> Optional[datetime]:
if len(self.materialization_intervals) == 0:
return None
return max([interval[1] for interval in self.materialization_intervals])

def infer_features_from_batch_source(self, config: RepoConfig):
"""
Infers the set of features associated to this feature view from the input source.

Args:
config: Configuration object used to configure the feature store.

Raises:
RegistryInferenceFailure: The set of features could not be inferred.
"""
if not self.features:
columns_to_exclude = {
self.batch_source.event_timestamp_column,
self.batch_source.created_timestamp_column,
} | set(self.entities)

if (
self.batch_source.event_timestamp_column
in self.batch_source.field_mapping
):
columns_to_exclude.add(
self.batch_source.field_mapping[
self.batch_source.event_timestamp_column
]
)
if (
self.batch_source.created_timestamp_column
in self.batch_source.field_mapping
):
columns_to_exclude.add(
self.batch_source.field_mapping[
self.batch_source.created_timestamp_column
]
)
for e in self.entities:
if e in self.batch_source.field_mapping:
columns_to_exclude.add(self.batch_source.field_mapping[e])

for (
col_name,
col_datatype,
) in self.batch_source.get_table_column_names_and_types(config):
if col_name not in columns_to_exclude and not re.match(
"^__|__$",
col_name, # double underscores often signal an internal-use column
):
feature_name = (
self.batch_source.field_mapping[col_name]
if col_name in self.batch_source.field_mapping
else col_name
)
self.features.append(
Feature(
feature_name,
self.batch_source.source_datatype_to_feast_value_type()(
col_datatype
),
)
)

if not self.features:
raise RegistryInferenceFailure(
"FeatureView",
f"Could not infer Features for the FeatureView named {self.name}.",
)
66 changes: 65 additions & 1 deletion sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import re
from typing import List

from feast import BigQuerySource, Entity, FileSource, RedshiftSource
from feast import BigQuerySource, Entity, Feature, FileSource, RedshiftSource
from feast.data_source import DataSource
from feast.errors import RegistryInferenceFailure
from feast.feature_view import FeatureView
Expand Down Expand Up @@ -118,3 +118,67 @@ def update_data_sources_with_inferred_event_timestamp_col(
{ERROR_MSG_PREFIX} due to an absence of columns that satisfy the criteria.
""",
)


def update_feature_views_with_inferred_features(
fvs: List[FeatureView], entities: List[Entity], config: RepoConfig
) -> None:
"""
Infers the set of features associated to each FeatureView and updates the FeatureView with those features.
Inference occurs through considering each column of the underlying data source as a feature except columns that are
associated with the data source's timestamp columns and the FeatureView's entity columns.
"""
entity_name_to_join_key_map = {entity.name: entity.join_key for entity in entities}

for fv in fvs:
if not fv.features:
columns_to_exclude = {
fv.batch_source.event_timestamp_column,
fv.batch_source.created_timestamp_column,
} | {
entity_name_to_join_key_map[entity_name] for entity_name in fv.entities
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the line that changed from the previous version of this method. Grabbing the join_keys to exclude rather than entity name. Everything else is mainly copy and paste.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mavysavydav is there a reason for the move to inference.py? I am not against it, but it makes reviewing the PR harder. Would it make sense to at least split these two changes into different commits to make it easier on the reviewer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea i agree that review becomes more difficult. I had to move this method as part of this fix b/c it needs the list of entity objects which isn't available in feature_view.py. I can split out the odfv method move into a separate PR since that's more independent if you'd like

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have taken out the odfv code move

}

if fv.batch_source.event_timestamp_column in fv.batch_source.field_mapping:
columns_to_exclude.add(
fv.batch_source.field_mapping[
fv.batch_source.event_timestamp_column
]
)
if (
fv.batch_source.created_timestamp_column
in fv.batch_source.field_mapping
):
columns_to_exclude.add(
fv.batch_source.field_mapping[
fv.batch_source.created_timestamp_column
]
)

for (
col_name,
col_datatype,
) in fv.batch_source.get_table_column_names_and_types(config):
if col_name not in columns_to_exclude and not re.match(
"^__|__$",
col_name, # double underscores often signal an internal-use column
):
feature_name = (
fv.batch_source.field_mapping[col_name]
if col_name in fv.batch_source.field_mapping
else col_name
)
fv.features.append(
Feature(
feature_name,
fv.batch_source.source_datatype_to_feast_value_type()(
col_datatype
),
)
)

if not fv.features:
raise RegistryInferenceFailure(
"FeatureView",
f"Could not infer Features for the FeatureView named {fv.name}.",
)
4 changes: 2 additions & 2 deletions sdk/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def simple_dataset_1() -> pd.DataFrame:
now = datetime.utcnow()
ts = pd.Timestamp(now).round("ms")
data = {
"id": [1, 2, 1, 3, 3],
"id_join_key": [1, 2, 1, 3, 3],
"float_col": [0.1, 0.2, 0.3, 4, 5],
"int64_col": [1, 2, 3, 4, 5],
"string_col": ["a", "b", "c", "d", "e"],
Expand All @@ -119,7 +119,7 @@ def simple_dataset_2() -> pd.DataFrame:
now = datetime.utcnow()
ts = pd.Timestamp(now).round("ms")
data = {
"id": ["a", "b", "c", "d", "e"],
"id_join_key": ["a", "b", "c", "d", "e"],
"float_col": [0.1, 0.2, 0.3, 4, 5],
"int64_col": [1, 2, 3, 4, 5],
"string_col": ["a", "b", "c", "d", "e"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source):
with prep_file_source(
df=dataframe_source, event_timestamp_column="ts_1"
) as file_source:
entity = Entity(name="id", join_key="id_join_key", value_type=ValueType.INT64)

fv1 = FeatureView(
name="fv1",
entities=["id"],
Expand Down Expand Up @@ -245,7 +247,7 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source):
tags={},
)

test_feature_store.apply([fv1, fv2, fv3]) # Register Feature Views
test_feature_store.apply([entity, fv1, fv2, fv3]) # Register Feature Views
feature_view_1 = test_feature_store.list_feature_views()[0]
feature_view_2 = test_feature_store.list_feature_views()[1]
feature_view_3 = test_feature_store.list_feature_views()[2]
Expand Down Expand Up @@ -433,7 +435,7 @@ def test_reapply_feature_view_success(test_feature_store, dataframe_source):
df=dataframe_source, event_timestamp_column="ts_1"
) as file_source:

e = Entity(name="id", value_type=ValueType.STRING)
e = Entity(name="id", join_key="id_join_key", value_type=ValueType.STRING)

# Create Feature View
fv1 = FeatureView(
Expand Down
14 changes: 9 additions & 5 deletions sdk/python/tests/integration/registration/test_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,26 @@ def test_update_entities_with_inferred_types_from_feature_views(
name="fv2", entities=["id"], batch_source=file_source_2, ttl=None,
)

actual_1 = Entity(name="id")
actual_2 = Entity(name="id")
actual_1 = Entity(name="id", join_key="id_join_key")
actual_2 = Entity(name="id", join_key="id_join_key")

update_entities_with_inferred_types_from_feature_views(
[actual_1], [fv1], RepoConfig(provider="local", project="test")
)
update_entities_with_inferred_types_from_feature_views(
[actual_2], [fv2], RepoConfig(provider="local", project="test")
)
assert actual_1 == Entity(name="id", value_type=ValueType.INT64)
assert actual_2 == Entity(name="id", value_type=ValueType.STRING)
assert actual_1 == Entity(
name="id", join_key="id_join_key", value_type=ValueType.INT64
)
assert actual_2 == Entity(
name="id", join_key="id_join_key", value_type=ValueType.STRING
)

with pytest.raises(RegistryInferenceFailure):
# two viable data types
update_entities_with_inferred_types_from_feature_views(
[Entity(name="id")],
[Entity(name="id", join_key="id_join_key")],
[fv1, fv2],
RepoConfig(provider="local", project="test"),
)
Expand Down