diff --git a/docs/concepts/feature-view.md b/docs/concepts/feature-view.md index 8326723fd6..fa1c38f78e 100644 --- a/docs/concepts/feature-view.md +++ b/docs/concepts/feature-view.md @@ -14,7 +14,7 @@ driver_stats_fv = FeatureView( Feature(name="trips_today", dtype=ValueType.INT64), Feature(name="rating", dtype=ValueType.FLOAT), ], - input=BigQuerySource( + batch_source=BigQuerySource( table_ref="feast-oss.demo_data.driver_activity" ) ) diff --git a/docs/concepts/feature-views.md b/docs/concepts/feature-views.md index dc145264cc..96a930b290 100644 --- a/docs/concepts/feature-views.md +++ b/docs/concepts/feature-views.md @@ -108,10 +108,10 @@ driver_stats_fv = FeatureView( Feature(name="avg_daily_trips", dtype=ValueType.INT64), ], - # Inputs are used to find feature values. In the case of this feature + # Batch sources are used to find feature values. In the case of this feature # view we will query a source table on BigQuery for driver statistics # features - input=driver_stats_source, + batch_source=driver_stats_source, # Tags are user defined key/value pairs that are attached to each # feature view diff --git a/docs/reference/feature-repository.md b/docs/reference/feature-repository.md index 3e894a4f75..f6f880d56f 100644 --- a/docs/reference/feature-repository.md +++ b/docs/reference/feature-repository.md @@ -111,7 +111,7 @@ driver_locations = FeatureView( Feature(name="lat", dtype=ValueType.FLOAT), Feature(name="lon", dtype=ValueType.STRING), ], - input=driver_locations_source, + batch_source=driver_locations_source, ) ``` {% endcode %} diff --git a/docs/reference/feature-repository/README.md b/docs/reference/feature-repository/README.md index b5ec5faa8c..c8fb64a68e 100644 --- a/docs/reference/feature-repository/README.md +++ b/docs/reference/feature-repository/README.md @@ -111,7 +111,7 @@ driver_locations = FeatureView( Feature(name="lat", dtype=ValueType.FLOAT), Feature(name="lon", dtype=ValueType.STRING), ], - input=driver_locations_source, + batch_source=driver_locations_source, ) ``` {% endcode %} diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index cd41f783b6..39261acdc1 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -261,7 +261,7 @@ def apply( >>> name="customer_fv", >>> entities=["customer"], >>> features=[Feature(name="age", dtype=ValueType.INT64)], - >>> input=FileSource(path="file.parquet", event_timestamp_column="timestamp"), + >>> batch_source=FileSource(path="file.parquet", event_timestamp_column="timestamp"), >>> ttl=timedelta(days=1) >>> ) >>> fs.apply([customer_entity, customer_feature_view]) @@ -284,11 +284,11 @@ def apply( ) update_data_sources_with_inferred_event_timestamp_col( - [view.input for view in views_to_update], self.config + [view.batch_source for view in views_to_update], self.config ) for view in views_to_update: - view.infer_features_from_input_source(self.config) + view.infer_features_from_batch_source(self.config) if len(views_to_update) + len(entities_to_update) + len( services_to_update diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index b1c8a47902..c1cfbd1167 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import re +import warnings from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Union @@ -38,6 +39,8 @@ from feast.usage import log_exceptions from feast.value_type import ValueType +warnings.simplefilter("once", DeprecationWarning) + class FeatureView: """ @@ -51,7 +54,7 @@ class FeatureView: ttl: Optional[timedelta] online: bool input: DataSource - batch_source: Optional[DataSource] = None + batch_source: DataSource stream_source: Optional[DataSource] = None created_timestamp: Optional[Timestamp] = None last_updated_timestamp: Optional[Timestamp] = None @@ -63,13 +66,21 @@ def __init__( name: str, entities: List[str], ttl: Optional[Union[Duration, timedelta]], - input: DataSource, + input: Optional[DataSource] = None, batch_source: Optional[DataSource] = None, stream_source: Optional[DataSource] = None, features: List[Feature] = None, tags: Optional[Dict[str, str]] = None, online: bool = True, ): + warnings.warn( + ( + "The argument 'input' is being deprecated. Please use 'batch_source' " + "instead. Feast 0.13 and onwards will not support the argument 'input'." + ), + DeprecationWarning, + ) + _input = input or batch_source assert _input is not None @@ -139,7 +150,7 @@ def __eq__(self, other): return False if sorted(self.features) != sorted(other.features): return False - if self.input != other.input: + if self.batch_source != other.batch_source: return False if self.stream_source != other.stream_source: return False @@ -182,10 +193,8 @@ def to_proto(self) -> FeatureViewProto: ttl_duration = Duration() ttl_duration.FromTimedelta(self.ttl) - batch_source_proto = self.input.to_proto() - batch_source_proto.data_source_class_type = ( - f"{self.input.__class__.__module__}.{self.input.__class__.__name__}" - ) + batch_source_proto = self.batch_source.to_proto() + batch_source_proto.data_source_class_type = f"{self.batch_source.__class__.__module__}.{self.batch_source.__class__.__name__}" stream_source_proto = None if self.stream_source: @@ -217,7 +226,7 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): Returns a FeatureViewProto object based on the feature view protobuf """ - _input = DataSource.from_proto(feature_view_proto.spec.batch_source) + batch_source = DataSource.from_proto(feature_view_proto.spec.batch_source) stream_source = ( DataSource.from_proto(feature_view_proto.spec.stream_source) if feature_view_proto.spec.HasField("stream_source") @@ -242,8 +251,8 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): and feature_view_proto.spec.ttl.nanos == 0 else feature_view_proto.spec.ttl ), - input=_input, - batch_source=_input, + input=batch_source, + batch_source=batch_source, stream_source=stream_source, ) @@ -265,29 +274,30 @@ def most_recent_end_time(self) -> Optional[datetime]: return None return max([interval[1] for interval in self.materialization_intervals]) - def infer_features_from_input_source(self, config: RepoConfig): + def infer_features_from_batch_source(self, config: RepoConfig): if not self.features: columns_to_exclude = { - self.input.event_timestamp_column, - self.input.created_timestamp_column, + self.batch_source.event_timestamp_column, + self.batch_source.created_timestamp_column, } | set(self.entities) - for col_name, col_datatype in self.input.get_table_column_names_and_types( - config - ): + for ( + col_name, + col_datatype, + ) in self.batch_source.get_table_column_names_and_types(config): if col_name not in columns_to_exclude and not re.match( "^__|__$", col_name, # double underscores often signal an internal-use column ): feature_name = ( - self.input.field_mapping[col_name] - if col_name in self.input.field_mapping.keys() + self.batch_source.field_mapping[col_name] + if col_name in self.batch_source.field_mapping.keys() else col_name ) self.features.append( Feature( feature_name, - self.input.source_datatype_to_feast_value_type()( + self.batch_source.source_datatype_to_feast_value_type()( col_datatype ), ) diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 721c34fb1a..421e01a68c 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -13,7 +13,7 @@ def update_entities_with_inferred_types_from_feature_views( entities: List[Entity], feature_views: List[FeatureView], config: RepoConfig ) -> None: """ - Infer entity value type by examining schema of feature view input sources + Infer entity value type by examining schema of feature view batch sources """ incomplete_entities = { entity.name: entity @@ -26,22 +26,22 @@ def update_entities_with_inferred_types_from_feature_views( if not (incomplete_entities_keys & set(view.entities)): continue # skip if view doesn't contain any entities that need inference - col_names_and_types = view.input.get_table_column_names_and_types(config) + col_names_and_types = view.batch_source.get_table_column_names_and_types(config) for entity_name in view.entities: if entity_name in incomplete_entities: - # get entity information from information extracted from the view input source + # get entity information from information extracted from the view batch source extracted_entity_name_type_pairs = list( filter(lambda tup: tup[0] == entity_name, col_names_and_types) ) if len(extracted_entity_name_type_pairs) == 0: # Doesn't mention inference error because would also be an error without inferencing raise ValueError( - f"""No column in the input source for the {view.name} feature view matches + f"""No column in the batch source for the {view.name} feature view matches its entity's name.""" ) entity = incomplete_entities[entity_name] - inferred_value_type = view.input.source_datatype_to_feast_value_type()( + inferred_value_type = view.batch_source.source_datatype_to_feast_value_type()( extracted_entity_name_type_pairs[0][1] ) diff --git a/sdk/python/feast/infra/aws.py b/sdk/python/feast/infra/aws.py index 5318c9c81d..94cb7e0ba5 100644 --- a/sdk/python/feast/infra/aws.py +++ b/sdk/python/feast/infra/aws.py @@ -99,7 +99,7 @@ def materialize_single_feature_view( offline_job = self.offline_store.pull_latest_from_table_or_query( config=config, - data_source=feature_view.input, + data_source=feature_view.batch_source, join_key_columns=join_key_columns, feature_name_columns=feature_name_columns, event_timestamp_column=event_timestamp_column, @@ -110,8 +110,8 @@ def materialize_single_feature_view( table = offline_job.to_arrow() - if feature_view.input.field_mapping is not None: - table = _run_field_mapping(table, feature_view.input.field_mapping) + if feature_view.batch_source.field_mapping is not None: + table = _run_field_mapping(table, feature_view.batch_source.field_mapping) join_keys = [entity.join_key for entity in entities] rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index 2c679216ca..a62cd585a1 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -102,7 +102,7 @@ def materialize_single_feature_view( offline_job = self.offline_store.pull_latest_from_table_or_query( config=config, - data_source=feature_view.input, + data_source=feature_view.batch_source, join_key_columns=join_key_columns, feature_name_columns=feature_name_columns, event_timestamp_column=event_timestamp_column, @@ -112,8 +112,8 @@ def materialize_single_feature_view( ) table = offline_job.to_arrow() - if feature_view.input.field_mapping is not None: - table = _run_field_mapping(table, feature_view.input.field_mapping) + if feature_view.batch_source.field_mapping is not None: + table = _run_field_mapping(table, feature_view.batch_source.field_mapping) join_keys = [entity.join_key for entity in entities] rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) diff --git a/sdk/python/feast/infra/local.py b/sdk/python/feast/infra/local.py index 32a526dcb0..c17d032568 100644 --- a/sdk/python/feast/infra/local.py +++ b/sdk/python/feast/infra/local.py @@ -100,7 +100,7 @@ def materialize_single_feature_view( ) = _get_column_names(feature_view, entities) offline_job = self.offline_store.pull_latest_from_table_or_query( - data_source=feature_view.input, + data_source=feature_view.batch_source, join_key_columns=join_key_columns, feature_name_columns=feature_name_columns, event_timestamp_column=event_timestamp_column, @@ -111,8 +111,8 @@ def materialize_single_feature_view( ) table = offline_job.to_arrow() - if feature_view.input.field_mapping is not None: - table = _run_field_mapping(table, feature_view.input.field_mapping) + if feature_view.batch_source.field_mapping is not None: + table = _run_field_mapping(table, feature_view.batch_source.field_mapping) join_keys = [entity.join_key for entity in entities] rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 590ba7f3b7..e2ab39e07c 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -104,15 +104,21 @@ def evaluate_historical_retrieval(): # Load feature view data from sources and join them incrementally for feature_view, features in feature_views_to_features.items(): - event_timestamp_column = feature_view.input.event_timestamp_column - created_timestamp_column = feature_view.input.created_timestamp_column + event_timestamp_column = ( + feature_view.batch_source.event_timestamp_column + ) + created_timestamp_column = ( + feature_view.batch_source.created_timestamp_column + ) # Read offline parquet data in pyarrow format - table = pyarrow.parquet.read_table(feature_view.input.path) + table = pyarrow.parquet.read_table(feature_view.batch_source.path) # Rename columns by the field mapping dictionary if it exists - if feature_view.input.field_mapping is not None: - table = _run_field_mapping(table, feature_view.input.field_mapping) + if feature_view.batch_source.field_mapping is not None: + table = _run_field_mapping( + table, feature_view.batch_source.field_mapping + ) # Convert pyarrow table to pandas dataframe df_to_join = table.to_pandas() diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 40b0659d20..5d8a3eeed8 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -211,13 +211,13 @@ def _get_column_names( the query to the offline store. """ # if we have mapped fields, use the original field names in the call to the offline store - event_timestamp_column = feature_view.input.event_timestamp_column + event_timestamp_column = feature_view.batch_source.event_timestamp_column feature_names = [feature.name for feature in feature_view.features] - created_timestamp_column = feature_view.input.created_timestamp_column + created_timestamp_column = feature_view.batch_source.created_timestamp_column join_keys = [entity.join_key for entity in entities] - if feature_view.input.field_mapping is not None: + if feature_view.batch_source.field_mapping is not None: reverse_field_mapping = { - v: k for k, v in feature_view.input.field_mapping.items() + v: k for k, v in feature_view.batch_source.field_mapping.items() } event_timestamp_column = ( reverse_field_mapping[event_timestamp_column] @@ -292,13 +292,13 @@ def _coerce_datetime(ts): value = python_value_to_proto_value(row[idx], feature.dtype) feature_dict[feature.name] = value event_timestamp_idx = table.column_names.index( - feature_view.input.event_timestamp_column + feature_view.batch_source.event_timestamp_column ) event_timestamp = _coerce_datetime(row[event_timestamp_idx]) - if feature_view.input.created_timestamp_column: + if feature_view.batch_source.created_timestamp_column: created_timestamp_idx = table.column_names.index( - feature_view.input.created_timestamp_column + feature_view.batch_source.created_timestamp_column ) created_timestamp = _coerce_datetime(row[created_timestamp_idx]) else: diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 683ab3bc8d..c8037d8bff 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -162,7 +162,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation registry._initialize_registry() sys.dont_write_bytecode = True repo = parse_repo(repo_path) - data_sources = [t.input for t in repo.feature_views] + data_sources = [t.batch_source for t in repo.feature_views] if not skip_source_validation: # Make sure the data source used by this feature view is supported by Feast @@ -175,7 +175,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation ) update_data_sources_with_inferred_event_timestamp_col(data_sources, repo_config) for view in repo.feature_views: - view.infer_features_from_input_source(repo_config) + view.infer_features_from_batch_source(repo_config) repo_table_names = set(t.name for t in repo.feature_tables) diff --git a/sdk/python/feast/templates/aws/example.py b/sdk/python/feast/templates/aws/example.py index f9f2b3b6eb..de7565022b 100644 --- a/sdk/python/feast/templates/aws/example.py +++ b/sdk/python/feast/templates/aws/example.py @@ -30,6 +30,6 @@ Feature(name="avg_daily_trips", dtype=ValueType.INT64), ], online=True, - input=driver_hourly_stats, + batch_source=driver_hourly_stats, tags={}, ) diff --git a/sdk/python/feast/templates/gcp/driver_repo.py b/sdk/python/feast/templates/gcp/driver_repo.py index b36758046d..1aa64da41b 100644 --- a/sdk/python/feast/templates/gcp/driver_repo.py +++ b/sdk/python/feast/templates/gcp/driver_repo.py @@ -54,10 +54,10 @@ Feature(name="acc_rate", dtype=ValueType.FLOAT), Feature(name="avg_daily_trips", dtype=ValueType.INT64), ], - # Inputs are used to find feature values. In the case of this feature + # Batch sources are used to find feature values. In the case of this feature # view we will query a source table on BigQuery for driver statistics # features - input=driver_stats_source, + batch_source=driver_stats_source, # Tags are user defined key/value pairs that are attached to each # feature view tags={"team": "driver_performance"}, diff --git a/sdk/python/feast/templates/local/example.py b/sdk/python/feast/templates/local/example.py index f9f2b3b6eb..de7565022b 100644 --- a/sdk/python/feast/templates/local/example.py +++ b/sdk/python/feast/templates/local/example.py @@ -30,6 +30,6 @@ Feature(name="avg_daily_trips", dtype=ValueType.INT64), ], online=True, - input=driver_hourly_stats, + batch_source=driver_hourly_stats, tags={}, ) diff --git a/sdk/python/tests/example_repos/example_feature_repo_1.py b/sdk/python/tests/example_repos/example_feature_repo_1.py index e42e557bfe..e0e6b92380 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_1.py +++ b/sdk/python/tests/example_repos/example_feature_repo_1.py @@ -45,7 +45,7 @@ Feature(name="lon", dtype=ValueType.STRING), ], online=True, - input=driver_locations_source, + batch_source=driver_locations_source, tags={}, ) @@ -59,7 +59,7 @@ Feature(name="age", dtype=ValueType.INT64), ], online=True, - input=customer_profile_source, + batch_source=customer_profile_source, tags={}, ) @@ -69,7 +69,7 @@ ttl=timedelta(days=1), features=[Feature(name="trips", dtype=ValueType.INT64)], online=True, - input=customer_driver_combined_source, + batch_source=customer_driver_combined_source, tags={}, ) diff --git a/sdk/python/tests/example_repos/example_feature_repo_2.py b/sdk/python/tests/example_repos/example_feature_repo_2.py index 420d71de0a..9698060e0f 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_2.py +++ b/sdk/python/tests/example_repos/example_feature_repo_2.py @@ -21,6 +21,6 @@ Feature(name="avg_daily_trips", dtype=ValueType.INT64), ], online=True, - input=driver_hourly_stats, + batch_source=driver_hourly_stats, tags={}, ) diff --git a/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py b/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py index 10be18ca2e..4c147e8d1f 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py +++ b/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py @@ -28,6 +28,6 @@ Feature(name="avg_daily_trips", dtype=ValueType.INT64), ], online=True, - input=driver_hourly_stats, + batch_source=driver_hourly_stats, tags={}, ) diff --git a/sdk/python/tests/example_repos/example_feature_repo_with_inference.py b/sdk/python/tests/example_repos/example_feature_repo_with_inference.py index b46519b468..f2be472f55 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_with_inference.py +++ b/sdk/python/tests/example_repos/example_feature_repo_with_inference.py @@ -15,6 +15,6 @@ entities=["driver_id"], ttl=Duration(seconds=86400 * 1), online=True, - input=driver_hourly_stats, + batch_source=driver_hourly_stats, tags={}, ) diff --git a/sdk/python/tests/example_repos/example_feature_repo_with_missing_bq_source.py b/sdk/python/tests/example_repos/example_feature_repo_with_missing_bq_source.py index 3d1dc3394b..46efe5b275 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_with_missing_bq_source.py +++ b/sdk/python/tests/example_repos/example_feature_repo_with_missing_bq_source.py @@ -16,5 +16,5 @@ Feature(name="lat", dtype=ValueType.FLOAT), Feature(name="lon", dtype=ValueType.STRING), ], - input=nonexistent_source, + batch_source=nonexistent_source, ) diff --git a/sdk/python/tests/integration/materialization/test_offline_online_store_consistency.py b/sdk/python/tests/integration/materialization/test_offline_online_store_consistency.py index e0acadf7c9..154b3b8c22 100644 --- a/sdk/python/tests/integration/materialization/test_offline_online_store_consistency.py +++ b/sdk/python/tests/integration/materialization/test_offline_online_store_consistency.py @@ -60,7 +60,7 @@ def get_feature_view(data_source: DataSource) -> FeatureView: entities=["driver"], features=[Feature("value", ValueType.FLOAT)], ttl=timedelta(days=5), - input=data_source, + batch_source=data_source, ) diff --git a/sdk/python/tests/integration/offline_store/test_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_historical_retrieval.py index 8cc5194b46..5a14cf2ef0 100644 --- a/sdk/python/tests/integration/offline_store/test_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_historical_retrieval.py @@ -89,7 +89,7 @@ def create_driver_hourly_stats_feature_view(source): Feature(name="acc_rate", dtype=ValueType.FLOAT), Feature(name="avg_daily_trips", dtype=ValueType.INT32), ], - input=source, + batch_source=source, ttl=timedelta(hours=2), ) return driver_stats_feature_view @@ -127,7 +127,7 @@ def create_customer_daily_profile_feature_view(source): Feature(name="lifetime_trip_count", dtype=ValueType.INT32), Feature(name="avg_daily_trips", dtype=ValueType.INT32), ], - input=source, + batch_source=source, ttl=timedelta(days=2), ) return customer_profile_feature_view @@ -164,17 +164,17 @@ def get_expected_training_df( orders_df.to_dict("records"), event_timestamp ) driver_records = convert_timestamp_records_to_utc( - driver_df.to_dict("records"), driver_fv.input.event_timestamp_column + driver_df.to_dict("records"), driver_fv.batch_source.event_timestamp_column ) customer_records = convert_timestamp_records_to_utc( - customer_df.to_dict("records"), customer_fv.input.event_timestamp_column + customer_df.to_dict("records"), customer_fv.batch_source.event_timestamp_column ) # Manually do point-in-time join of orders to drivers and customers records for order_record in order_records: driver_record = find_asof_record( driver_records, - ts_key=driver_fv.input.event_timestamp_column, + ts_key=driver_fv.batch_source.event_timestamp_column, ts_start=order_record[event_timestamp] - driver_fv.ttl, ts_end=order_record[event_timestamp], filter_key="driver_id", @@ -182,7 +182,7 @@ def get_expected_training_df( ) customer_record = find_asof_record( customer_records, - ts_key=customer_fv.input.event_timestamp_column, + ts_key=customer_fv.batch_source.event_timestamp_column, ts_start=order_record[event_timestamp] - customer_fv.ttl, ts_end=order_record[event_timestamp], filter_key="customer_id", diff --git a/sdk/python/tests/integration/registration/test_feature_store.py b/sdk/python/tests/integration/registration/test_feature_store.py index 1c431a9d6a..3e06473d18 100644 --- a/sdk/python/tests/integration/registration/test_feature_store.py +++ b/sdk/python/tests/integration/registration/test_feature_store.py @@ -182,7 +182,7 @@ def test_apply_feature_view_success(test_feature_store): ], entities=["fs1_my_entity_1"], tags={"team": "matchmaking"}, - input=batch_source, + batch_source=batch_source, ttl=timedelta(minutes=5), ) @@ -223,7 +223,7 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source): entities=["id"], ttl=timedelta(minutes=5), online=True, - input=file_source, + batch_source=file_source, tags={}, ) @@ -232,7 +232,7 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source): entities=["id"], ttl=timedelta(minutes=5), online=True, - input=simple_bq_source_using_table_ref_arg(dataframe_source, "ts_1"), + batch_source=simple_bq_source_using_table_ref_arg(dataframe_source, "ts_1"), tags={}, ) @@ -241,7 +241,7 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source): entities=["id"], ttl=timedelta(minutes=5), online=True, - input=simple_bq_source_using_query_arg(dataframe_source, "ts_1"), + batch_source=simple_bq_source_using_query_arg(dataframe_source, "ts_1"), tags={}, ) @@ -303,7 +303,7 @@ def test_apply_feature_view_integration(test_feature_store): ], entities=["fs1_my_entity_1"], tags={"team": "matchmaking"}, - input=batch_source, + batch_source=batch_source, ttl=timedelta(minutes=5), ) @@ -379,7 +379,7 @@ def test_apply_object_and_read(test_feature_store): ], entities=["fs1_my_entity_1"], tags={"team": "matchmaking"}, - input=batch_source, + batch_source=batch_source, ttl=timedelta(minutes=5), ) @@ -393,7 +393,7 @@ def test_apply_object_and_read(test_feature_store): ], entities=["fs1_my_entity_1"], tags={"team": "matchmaking"}, - input=batch_source, + batch_source=batch_source, ttl=timedelta(minutes=5), ) @@ -440,7 +440,7 @@ def test_reapply_feature_view_success(test_feature_store, dataframe_source): name="my_feature_view_1", features=[Feature(name="string_col", dtype=ValueType.STRING)], entities=["id"], - input=file_source, + batch_source=file_source, ttl=timedelta(minutes=5), ) @@ -470,7 +470,7 @@ def test_reapply_feature_view_success(test_feature_store, dataframe_source): name="my_feature_view_1", features=[Feature(name="int64_col", dtype=ValueType.INT64)], entities=["id"], - input=file_source, + batch_source=file_source, ttl=timedelta(minutes=5), ) test_feature_store.apply([fv1]) diff --git a/sdk/python/tests/integration/registration/test_inference.py b/sdk/python/tests/integration/registration/test_inference.py index cff5f33f74..65ffdbf051 100644 --- a/sdk/python/tests/integration/registration/test_inference.py +++ b/sdk/python/tests/integration/registration/test_inference.py @@ -23,8 +23,12 @@ def test_update_entities_with_inferred_types_from_feature_views( df=simple_dataset_2, event_timestamp_column="ts_1" ) as file_source_2: - fv1 = FeatureView(name="fv1", entities=["id"], input=file_source, ttl=None,) - fv2 = FeatureView(name="fv2", entities=["id"], input=file_source_2, ttl=None,) + fv1 = FeatureView( + name="fv1", entities=["id"], batch_source=file_source, ttl=None, + ) + fv2 = FeatureView( + name="fv2", entities=["id"], batch_source=file_source_2, ttl=None, + ) actual_1 = Entity(name="id") actual_2 = Entity(name="id") diff --git a/sdk/python/tests/integration/registration/test_registry.py b/sdk/python/tests/integration/registration/test_registry.py index 2c11591b52..5b5655ee94 100644 --- a/sdk/python/tests/integration/registration/test_registry.py +++ b/sdk/python/tests/integration/registration/test_registry.py @@ -171,7 +171,7 @@ def test_apply_feature_view_success(test_registry): ], entities=["fs1_my_entity_1"], tags={"team": "matchmaking"}, - input=batch_source, + batch_source=batch_source, ttl=timedelta(minutes=5), ) @@ -246,7 +246,7 @@ def test_apply_feature_view_integration(test_registry): ], entities=["fs1_my_entity_1"], tags={"team": "matchmaking"}, - input=batch_source, + batch_source=batch_source, ttl=timedelta(minutes=5), ) diff --git a/sdk/python/tests/integration/scaffolding/test_partial_apply.py b/sdk/python/tests/integration/scaffolding/test_partial_apply.py index cd8be2fe23..bfd078c7da 100644 --- a/sdk/python/tests/integration/scaffolding/test_partial_apply.py +++ b/sdk/python/tests/integration/scaffolding/test_partial_apply.py @@ -34,7 +34,7 @@ def test_partial() -> None: Feature(name="name", dtype=ValueType.STRING), ], online=True, - input=driver_locations_source, + batch_source=driver_locations_source, tags={}, ) diff --git a/sdk/python/tests/utils/online_write_benchmark.py b/sdk/python/tests/utils/online_write_benchmark.py index d8041e51cd..d5268f217a 100644 --- a/sdk/python/tests/utils/online_write_benchmark.py +++ b/sdk/python/tests/utils/online_write_benchmark.py @@ -28,7 +28,7 @@ def create_driver_hourly_stats_feature_view(source): Feature(name="acc_rate", dtype=ValueType.FLOAT), Feature(name="avg_daily_trips", dtype=ValueType.INT32), ], - input=source, + batch_source=source, ttl=timedelta(hours=2), ) return driver_stats_feature_view