Skip to content

Commit

Permalink
Change internal references from input to batch_source (#1729)
Browse files Browse the repository at this point in the history
* Change internal references from input to batch_source

Signed-off-by: Felix Wang <[email protected]>

* Add deprecation warning

Signed-off-by: Felix Wang <[email protected]>

* Change deprecation release to 0.12

Signed-off-by: Felix Wang <[email protected]>

* Change deprecation release to 0.13

Signed-off-by: Felix Wang <[email protected]>
  • Loading branch information
felixwang9817 authored Jul 27, 2021
1 parent 47f30b3 commit 489a0f8
Show file tree
Hide file tree
Showing 28 changed files with 108 additions and 88 deletions.
2 changes: 1 addition & 1 deletion docs/concepts/feature-view.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ driver_stats_fv = FeatureView(
Feature(name="trips_today", dtype=ValueType.INT64),
Feature(name="rating", dtype=ValueType.FLOAT),
],
input=BigQuerySource(
batch_source=BigQuerySource(
table_ref="feast-oss.demo_data.driver_activity"
)
)
Expand Down
4 changes: 2 additions & 2 deletions docs/concepts/feature-views.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ driver_stats_fv = FeatureView(
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
],

# Inputs are used to find feature values. In the case of this feature
# Batch sources are used to find feature values. In the case of this feature
# view we will query a source table on BigQuery for driver statistics
# features
input=driver_stats_source,
batch_source=driver_stats_source,

# Tags are user defined key/value pairs that are attached to each
# feature view
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/feature-repository.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ driver_locations = FeatureView(
Feature(name="lat", dtype=ValueType.FLOAT),
Feature(name="lon", dtype=ValueType.STRING),
],
input=driver_locations_source,
batch_source=driver_locations_source,
)
```
{% endcode %}
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/feature-repository/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ driver_locations = FeatureView(
Feature(name="lat", dtype=ValueType.FLOAT),
Feature(name="lon", dtype=ValueType.STRING),
],
input=driver_locations_source,
batch_source=driver_locations_source,
)
```
{% endcode %}
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def apply(
>>> name="customer_fv",
>>> entities=["customer"],
>>> features=[Feature(name="age", dtype=ValueType.INT64)],
>>> input=FileSource(path="file.parquet", event_timestamp_column="timestamp"),
>>> batch_source=FileSource(path="file.parquet", event_timestamp_column="timestamp"),
>>> ttl=timedelta(days=1)
>>> )
>>> fs.apply([customer_entity, customer_feature_view])
Expand All @@ -284,11 +284,11 @@ def apply(
)

update_data_sources_with_inferred_event_timestamp_col(
[view.input for view in views_to_update], self.config
[view.batch_source for view in views_to_update], self.config
)

for view in views_to_update:
view.infer_features_from_input_source(self.config)
view.infer_features_from_batch_source(self.config)

if len(views_to_update) + len(entities_to_update) + len(
services_to_update
Expand Down
48 changes: 29 additions & 19 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import warnings
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Union

Expand All @@ -38,6 +39,8 @@
from feast.usage import log_exceptions
from feast.value_type import ValueType

warnings.simplefilter("once", DeprecationWarning)


class FeatureView:
"""
Expand All @@ -51,7 +54,7 @@ class FeatureView:
ttl: Optional[timedelta]
online: bool
input: DataSource
batch_source: Optional[DataSource] = None
batch_source: DataSource
stream_source: Optional[DataSource] = None
created_timestamp: Optional[Timestamp] = None
last_updated_timestamp: Optional[Timestamp] = None
Expand All @@ -63,13 +66,21 @@ def __init__(
name: str,
entities: List[str],
ttl: Optional[Union[Duration, timedelta]],
input: DataSource,
input: Optional[DataSource] = None,
batch_source: Optional[DataSource] = None,
stream_source: Optional[DataSource] = None,
features: List[Feature] = None,
tags: Optional[Dict[str, str]] = None,
online: bool = True,
):
warnings.warn(

This comment has been minimized.

Copy link
@tedhtchang

tedhtchang Jul 28, 2021

Contributor

@felixwang9817 Should we warn users only when the input argument is assigned ?

(
"The argument 'input' is being deprecated. Please use 'batch_source' "
"instead. Feast 0.13 and onwards will not support the argument 'input'."
),
DeprecationWarning,
)

_input = input or batch_source
assert _input is not None

Expand Down Expand Up @@ -139,7 +150,7 @@ def __eq__(self, other):
return False
if sorted(self.features) != sorted(other.features):
return False
if self.input != other.input:
if self.batch_source != other.batch_source:
return False
if self.stream_source != other.stream_source:
return False
Expand Down Expand Up @@ -182,10 +193,8 @@ def to_proto(self) -> FeatureViewProto:
ttl_duration = Duration()
ttl_duration.FromTimedelta(self.ttl)

batch_source_proto = self.input.to_proto()
batch_source_proto.data_source_class_type = (
f"{self.input.__class__.__module__}.{self.input.__class__.__name__}"
)
batch_source_proto = self.batch_source.to_proto()
batch_source_proto.data_source_class_type = f"{self.batch_source.__class__.__module__}.{self.batch_source.__class__.__name__}"

stream_source_proto = None
if self.stream_source:
Expand Down Expand Up @@ -217,7 +226,7 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
Returns a FeatureViewProto object based on the feature view protobuf
"""

_input = DataSource.from_proto(feature_view_proto.spec.batch_source)
batch_source = DataSource.from_proto(feature_view_proto.spec.batch_source)
stream_source = (
DataSource.from_proto(feature_view_proto.spec.stream_source)
if feature_view_proto.spec.HasField("stream_source")
Expand All @@ -242,8 +251,8 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
and feature_view_proto.spec.ttl.nanos == 0
else feature_view_proto.spec.ttl
),
input=_input,
batch_source=_input,
input=batch_source,
batch_source=batch_source,
stream_source=stream_source,
)

Expand All @@ -265,29 +274,30 @@ def most_recent_end_time(self) -> Optional[datetime]:
return None
return max([interval[1] for interval in self.materialization_intervals])

def infer_features_from_input_source(self, config: RepoConfig):
def infer_features_from_batch_source(self, config: RepoConfig):
if not self.features:
columns_to_exclude = {
self.input.event_timestamp_column,
self.input.created_timestamp_column,
self.batch_source.event_timestamp_column,
self.batch_source.created_timestamp_column,
} | set(self.entities)

for col_name, col_datatype in self.input.get_table_column_names_and_types(
config
):
for (
col_name,
col_datatype,
) in self.batch_source.get_table_column_names_and_types(config):
if col_name not in columns_to_exclude and not re.match(
"^__|__$",
col_name, # double underscores often signal an internal-use column
):
feature_name = (
self.input.field_mapping[col_name]
if col_name in self.input.field_mapping.keys()
self.batch_source.field_mapping[col_name]
if col_name in self.batch_source.field_mapping.keys()
else col_name
)
self.features.append(
Feature(
feature_name,
self.input.source_datatype_to_feast_value_type()(
self.batch_source.source_datatype_to_feast_value_type()(
col_datatype
),
)
Expand Down
10 changes: 5 additions & 5 deletions sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def update_entities_with_inferred_types_from_feature_views(
entities: List[Entity], feature_views: List[FeatureView], config: RepoConfig
) -> None:
"""
Infer entity value type by examining schema of feature view input sources
Infer entity value type by examining schema of feature view batch sources
"""
incomplete_entities = {
entity.name: entity
Expand All @@ -26,22 +26,22 @@ def update_entities_with_inferred_types_from_feature_views(
if not (incomplete_entities_keys & set(view.entities)):
continue # skip if view doesn't contain any entities that need inference

col_names_and_types = view.input.get_table_column_names_and_types(config)
col_names_and_types = view.batch_source.get_table_column_names_and_types(config)
for entity_name in view.entities:
if entity_name in incomplete_entities:
# get entity information from information extracted from the view input source
# get entity information from information extracted from the view batch source
extracted_entity_name_type_pairs = list(
filter(lambda tup: tup[0] == entity_name, col_names_and_types)
)
if len(extracted_entity_name_type_pairs) == 0:
# Doesn't mention inference error because would also be an error without inferencing
raise ValueError(
f"""No column in the input source for the {view.name} feature view matches
f"""No column in the batch source for the {view.name} feature view matches
its entity's name."""
)

entity = incomplete_entities[entity_name]
inferred_value_type = view.input.source_datatype_to_feast_value_type()(
inferred_value_type = view.batch_source.source_datatype_to_feast_value_type()(
extracted_entity_name_type_pairs[0][1]
)

Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def materialize_single_feature_view(

offline_job = self.offline_store.pull_latest_from_table_or_query(
config=config,
data_source=feature_view.input,
data_source=feature_view.batch_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
event_timestamp_column=event_timestamp_column,
Expand All @@ -110,8 +110,8 @@ def materialize_single_feature_view(

table = offline_job.to_arrow()

if feature_view.input.field_mapping is not None:
table = _run_field_mapping(table, feature_view.input.field_mapping)
if feature_view.batch_source.field_mapping is not None:
table = _run_field_mapping(table, feature_view.batch_source.field_mapping)

join_keys = [entity.join_key for entity in entities]
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def materialize_single_feature_view(

offline_job = self.offline_store.pull_latest_from_table_or_query(
config=config,
data_source=feature_view.input,
data_source=feature_view.batch_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
event_timestamp_column=event_timestamp_column,
Expand All @@ -112,8 +112,8 @@ def materialize_single_feature_view(
)
table = offline_job.to_arrow()

if feature_view.input.field_mapping is not None:
table = _run_field_mapping(table, feature_view.input.field_mapping)
if feature_view.batch_source.field_mapping is not None:
table = _run_field_mapping(table, feature_view.batch_source.field_mapping)

join_keys = [entity.join_key for entity in entities]
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def materialize_single_feature_view(
) = _get_column_names(feature_view, entities)

offline_job = self.offline_store.pull_latest_from_table_or_query(
data_source=feature_view.input,
data_source=feature_view.batch_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
event_timestamp_column=event_timestamp_column,
Expand All @@ -111,8 +111,8 @@ def materialize_single_feature_view(
)
table = offline_job.to_arrow()

if feature_view.input.field_mapping is not None:
table = _run_field_mapping(table, feature_view.input.field_mapping)
if feature_view.batch_source.field_mapping is not None:
table = _run_field_mapping(table, feature_view.batch_source.field_mapping)

join_keys = [entity.join_key for entity in entities]
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)
Expand Down
16 changes: 11 additions & 5 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,21 @@ def evaluate_historical_retrieval():

# Load feature view data from sources and join them incrementally
for feature_view, features in feature_views_to_features.items():
event_timestamp_column = feature_view.input.event_timestamp_column
created_timestamp_column = feature_view.input.created_timestamp_column
event_timestamp_column = (
feature_view.batch_source.event_timestamp_column
)
created_timestamp_column = (
feature_view.batch_source.created_timestamp_column
)

# Read offline parquet data in pyarrow format
table = pyarrow.parquet.read_table(feature_view.input.path)
table = pyarrow.parquet.read_table(feature_view.batch_source.path)

# Rename columns by the field mapping dictionary if it exists
if feature_view.input.field_mapping is not None:
table = _run_field_mapping(table, feature_view.input.field_mapping)
if feature_view.batch_source.field_mapping is not None:
table = _run_field_mapping(
table, feature_view.batch_source.field_mapping
)

# Convert pyarrow table to pandas dataframe
df_to_join = table.to_pandas()
Expand Down
14 changes: 7 additions & 7 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,13 @@ def _get_column_names(
the query to the offline store.
"""
# if we have mapped fields, use the original field names in the call to the offline store
event_timestamp_column = feature_view.input.event_timestamp_column
event_timestamp_column = feature_view.batch_source.event_timestamp_column
feature_names = [feature.name for feature in feature_view.features]
created_timestamp_column = feature_view.input.created_timestamp_column
created_timestamp_column = feature_view.batch_source.created_timestamp_column
join_keys = [entity.join_key for entity in entities]
if feature_view.input.field_mapping is not None:
if feature_view.batch_source.field_mapping is not None:
reverse_field_mapping = {
v: k for k, v in feature_view.input.field_mapping.items()
v: k for k, v in feature_view.batch_source.field_mapping.items()
}
event_timestamp_column = (
reverse_field_mapping[event_timestamp_column]
Expand Down Expand Up @@ -292,13 +292,13 @@ def _coerce_datetime(ts):
value = python_value_to_proto_value(row[idx], feature.dtype)
feature_dict[feature.name] = value
event_timestamp_idx = table.column_names.index(
feature_view.input.event_timestamp_column
feature_view.batch_source.event_timestamp_column
)
event_timestamp = _coerce_datetime(row[event_timestamp_idx])

if feature_view.input.created_timestamp_column:
if feature_view.batch_source.created_timestamp_column:
created_timestamp_idx = table.column_names.index(
feature_view.input.created_timestamp_column
feature_view.batch_source.created_timestamp_column
)
created_timestamp = _coerce_datetime(row[created_timestamp_idx])
else:
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
registry._initialize_registry()
sys.dont_write_bytecode = True
repo = parse_repo(repo_path)
data_sources = [t.input for t in repo.feature_views]
data_sources = [t.batch_source for t in repo.feature_views]

if not skip_source_validation:
# Make sure the data source used by this feature view is supported by Feast
Expand All @@ -175,7 +175,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
)
update_data_sources_with_inferred_event_timestamp_col(data_sources, repo_config)
for view in repo.feature_views:
view.infer_features_from_input_source(repo_config)
view.infer_features_from_batch_source(repo_config)

repo_table_names = set(t.name for t in repo.feature_tables)

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/templates/aws/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
],
online=True,
input=driver_hourly_stats,
batch_source=driver_hourly_stats,
tags={},
)
4 changes: 2 additions & 2 deletions sdk/python/feast/templates/gcp/driver_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@
Feature(name="acc_rate", dtype=ValueType.FLOAT),
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
],
# Inputs are used to find feature values. In the case of this feature
# Batch sources are used to find feature values. In the case of this feature
# view we will query a source table on BigQuery for driver statistics
# features
input=driver_stats_source,
batch_source=driver_stats_source,
# Tags are user defined key/value pairs that are attached to each
# feature view
tags={"team": "driver_performance"},
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/templates/local/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
],
online=True,
input=driver_hourly_stats,
batch_source=driver_hourly_stats,
tags={},
)
Loading

0 comments on commit 489a0f8

Please sign in to comment.