diff --git a/protos/feast/core/SavedDataset.proto b/protos/feast/core/SavedDataset.proto index e6d103a691..2e0f3885ed 100644 --- a/protos/feast/core/SavedDataset.proto +++ b/protos/feast/core/SavedDataset.proto @@ -24,7 +24,6 @@ option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core"; import "google/protobuf/timestamp.proto"; import "feast/core/DataSource.proto"; -import "feast/core/FeatureService.proto"; message SavedDatasetSpec { // Name of the dataset. Must be unique since it's possible to overwrite dataset by name diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index d2a71bc561..9d75e14c93 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +import warnings from datetime import datetime from pathlib import Path from typing import List, Optional @@ -151,6 +152,11 @@ def data_source_describe(ctx: click.Context, name: str): print(e) exit(1) + warnings.warn( + "Describing data sources will only work properly if all data sources have names or table names specified. " + "Starting Feast 0.21, data source unique names will be required to encourage data source discovery.", + RuntimeWarning, + ) print( yaml.dump( yaml.safe_load(str(data_source)), default_flow_style=False, sort_keys=False @@ -173,6 +179,11 @@ def data_source_list(ctx: click.Context): from tabulate import tabulate + warnings.warn( + "Listing data sources will only work properly if all data sources have names or table names specified. " + "Starting Feast 0.21, data source unique names will be required to encourage data source discovery", + RuntimeWarning, + ) print(tabulate(table, headers=["NAME", "CLASS"], tablefmt="plain")) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 15ce0c2377..f23b1771e1 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -17,6 +17,8 @@ from abc import ABC, abstractmethod from typing import Any, Callable, Dict, Iterable, Optional, Tuple +from google.protobuf.json_format import MessageToJson + from feast import type_map from feast.data_format import StreamFormat from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto @@ -180,6 +182,9 @@ def __init__( def __hash__(self): return hash((id(self), self.name)) + def __str__(self): + return str(MessageToJson(self.to_proto())) + def __eq__(self, other): if not isinstance(other, DataSource): raise TypeError("Comparisons should only involve DataSource class objects.") diff --git a/sdk/python/feast/diff/registry_diff.py b/sdk/python/feast/diff/registry_diff.py index 4558a149a5..10bd88c56f 100644 --- a/sdk/python/feast/diff/registry_diff.py +++ b/sdk/python/feast/diff/registry_diff.py @@ -60,6 +60,9 @@ def to_string(self): continue if feast_object_diff.transition_type == TransitionType.UNCHANGED: continue + if feast_object_diff.feast_object_type == FeastObjectType.DATA_SOURCE: + # TODO(adchia): Print statements out starting in Feast 0.21 + continue action, color = message_action_map[feast_object_diff.transition_type] log_string += f"{action} {feast_object_diff.feast_object_type.value} {Style.BRIGHT + color}{feast_object_diff.name}{Style.RESET_ALL}\n" if feast_object_diff.transition_type == TransitionType.UPDATE: @@ -78,8 +81,11 @@ def to_string(self): def tag_objects_for_keep_delete_update_add( existing_objs: Iterable[FeastObject], desired_objs: Iterable[FeastObject] ) -> Tuple[Set[FeastObject], Set[FeastObject], Set[FeastObject], Set[FeastObject]]: - existing_obj_names = {e.name for e in existing_objs} - desired_obj_names = {e.name for e in desired_objs} + # TODO(adchia): Remove the "if X.name" condition when data sources are forced to have names + existing_obj_names = {e.name for e in existing_objs if e.name} + desired_objs = [obj for obj in desired_objs if obj.name] + existing_objs = [obj for obj in existing_objs if obj.name] + desired_obj_names = {e.name for e in desired_objs if e.name} objs_to_add = {e for e in desired_objs if e.name not in existing_obj_names} objs_to_update = {e for e in desired_objs if e.name in existing_obj_names} diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 19741bcf12..c1a4ec7a63 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1971,13 +1971,16 @@ def _validate_feature_views(feature_views: List[BaseFeatureView]): def _validate_data_sources(data_sources: List[DataSource]): """ Verify data sources have case-insensitively unique names""" ds_names = set() - for fv in data_sources: - case_insensitive_ds_name = fv.name.lower() + for ds in data_sources: + case_insensitive_ds_name = ds.name.lower() if case_insensitive_ds_name in ds_names: - raise ValueError( - f"More than one data source with name {case_insensitive_ds_name} found. " - f"Please ensure that all data source names are case-insensitively unique. " - f"It may be necessary to ignore certain files in your feature repository by using a .feastignore file." - ) + if case_insensitive_ds_name.strip(): + warnings.warn( + f"More than one data source with name {case_insensitive_ds_name} found. " + f"Please ensure that all data source names are case-insensitively unique. " + f"It may be necessary to ignore certain files in your feature repository by using a .feastignore " + f"file. Starting in Feast 0.21, unique names (perhaps inferred from the table name) will be " + f"required in data sources to encourage data source discovery" + ) else: ds_names.add(case_insensitive_ds_name) diff --git a/sdk/python/feast/infra/offline_stores/bigquery_source.py b/sdk/python/feast/infra/offline_stores/bigquery_source.py index 92b6939fc3..1d797077f0 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery_source.py +++ b/sdk/python/feast/infra/offline_stores/bigquery_source.py @@ -64,7 +64,7 @@ def __init__( else: warnings.warn( ( - "Starting in Feast 0.21, Feast will require either a name for a data source (if using query) or `table`." + f"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) or `table`: {self.query}" ), DeprecationWarning, ) diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index 8573396aca..df42e18910 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -41,6 +41,12 @@ def __init__( query (optional): The query to be executed to obtain the features. name (optional): Name for the source. Defaults to the table_ref if not specified. """ + # The default Redshift schema is named "public". + _schema = "public" if table and not schema else schema + self.redshift_options = RedshiftOptions( + table=table, schema=_schema, query=query + ) + if table is None and query is None: raise ValueError('No "table" argument provided.') _name = name @@ -50,7 +56,8 @@ def __init__( else: warnings.warn( ( - "Starting in Feast 0.21, Feast will require either a name for a data source (if using query) or `table`." + f"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) " + f"or `table`: {self.query}" ), DeprecationWarning, ) @@ -63,13 +70,6 @@ def __init__( date_partition_column, ) - # The default Redshift schema is named "public". - _schema = "public" if table and not schema else schema - - self.redshift_options = RedshiftOptions( - table=table, schema=_schema, query=query - ) - @staticmethod def from_proto(data_source: DataSourceProto): """ diff --git a/sdk/python/feast/infra/offline_stores/snowflake_source.py b/sdk/python/feast/infra/offline_stores/snowflake_source.py index 6ca5df7d6f..a972df191b 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake_source.py +++ b/sdk/python/feast/infra/offline_stores/snowflake_source.py @@ -44,6 +44,12 @@ def __init__( """ if table is None and query is None: raise ValueError('No "table" argument provided.') + # The default Snowflake schema is named "PUBLIC". + _schema = "PUBLIC" if (database and table and not schema) else schema + + self.snowflake_options = SnowflakeOptions( + database=database, schema=_schema, table=table, query=query + ) # If no name, use the table as the default name _name = name @@ -53,7 +59,8 @@ def __init__( else: warnings.warn( ( - "Starting in Feast 0.21, Feast will require either a name for a data source (if using query) or `table`." + f"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) " + f"or `table`: {self.query}" ), DeprecationWarning, ) @@ -66,13 +73,6 @@ def __init__( date_partition_column, ) - # The default Snowflake schema is named "PUBLIC". - _schema = "PUBLIC" if (database and table and not schema) else schema - - self.snowflake_options = SnowflakeOptions( - database=database, schema=_schema, table=table, query=query - ) - @staticmethod def from_proto(data_source: DataSourceProto): """ diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index cb1261d8c9..0f5657fb78 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -314,11 +314,13 @@ def apply_data_source( commit: Whether to immediately commit to the registry """ registry = self._prepare_registry_for_changes() - for idx, existing_data_source_proto in enumerate(registry.data_sources): if existing_data_source_proto.name == data_source.name: del registry.data_sources[idx] data_source_proto = data_source.to_proto() + data_source_proto.data_source_class_type = ( + f"{data_source.__class__.__module__}.{data_source.__class__.__name__}" + ) data_source_proto.project = project registry.data_sources.append(data_source_proto) if commit: diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 4bee79bd60..fc49c73db1 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -185,10 +185,8 @@ def extract_objects_for_apply_delete(project, registry, repo): return ( all_to_apply, all_to_delete, - set( - objs_to_add[FeastObjectType.FEATURE_VIEW].union( - objs_to_update[FeastObjectType.FEATURE_VIEW] - ) + set(objs_to_add[FeastObjectType.FEATURE_VIEW]).union( + set(objs_to_update[FeastObjectType.FEATURE_VIEW]) ), objs_to_delete[FeastObjectType.FEATURE_VIEW], ) diff --git a/sdk/python/tests/example_repos/example_feature_repo_1.py b/sdk/python/tests/example_repos/example_feature_repo_1.py index 8179906fa4..8f7951854f 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_1.py +++ b/sdk/python/tests/example_repos/example_feature_repo_1.py @@ -15,6 +15,18 @@ created_timestamp_column="created_timestamp", ) +driver_locations_source_query = BigQuerySource( + query="SELECT * from feast-oss.public.drivers", + event_timestamp_column="event_timestamp", + created_timestamp_column="created_timestamp", +) + +driver_locations_source_query_2 = BigQuerySource( + query="SELECT lat * 2 FROM feast-oss.public.drivers", + event_timestamp_column="event_timestamp", + created_timestamp_column="created_timestamp", +) + customer_profile_source = BigQuerySource( name="customer_profile_source", table_ref="feast-oss.public.customers",