Skip to content

Commit

Permalink
fix: Don't prevent apply from running given duplicate empty names in …
Browse files Browse the repository at this point in the history
…data sources. Also fix repeated apply of Spark data source. (#2415)

* fix: Print more warning statements on requirement for data sources to have name in future, but don't prevent apply from running if there are duplicate empty data sources. Also attach class type when applying data sources so repeated feast apply commands properly work for Spark

Signed-off-by: Danny Chiao <[email protected]>

* typo

Signed-off-by: Danny Chiao <[email protected]>

* typo

Signed-off-by: Danny Chiao <[email protected]>

* fix

Signed-off-by: Danny Chiao <[email protected]>

* fix

Signed-off-by: Danny Chiao <[email protected]>

* fix

Signed-off-by: Danny Chiao <[email protected]>

* More tests

Signed-off-by: Danny Chiao <[email protected]>

* fix

Signed-off-by: Danny Chiao <[email protected]>

* fix

Signed-off-by: Danny Chiao <[email protected]>

* fix

Signed-off-by: Danny Chiao <[email protected]>

* fix

Signed-off-by: Danny Chiao <[email protected]>

* revert

Signed-off-by: Danny Chiao <[email protected]>

* fix

Signed-off-by: Danny Chiao <[email protected]>

* fix

Signed-off-by: Danny Chiao <[email protected]>

* fix

Signed-off-by: Danny Chiao <[email protected]>
  • Loading branch information
adchia authored and felixwang9817 committed Apr 6, 2022
1 parent 0384bb9 commit 9baba23
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 32 deletions.
1 change: 0 additions & 1 deletion protos/feast/core/SavedDataset.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";

import "google/protobuf/timestamp.proto";
import "feast/core/DataSource.proto";
import "feast/core/FeatureService.proto";

message SavedDatasetSpec {
// Name of the dataset. Must be unique since it's possible to overwrite dataset by name
Expand Down
11 changes: 11 additions & 0 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import logging
import warnings
from datetime import datetime
from pathlib import Path
from typing import List, Optional
Expand Down Expand Up @@ -151,6 +152,11 @@ def data_source_describe(ctx: click.Context, name: str):
print(e)
exit(1)

warnings.warn(
"Describing data sources will only work properly if all data sources have names or table names specified. "
"Starting Feast 0.21, data source unique names will be required to encourage data source discovery.",
RuntimeWarning,
)
print(
yaml.dump(
yaml.safe_load(str(data_source)), default_flow_style=False, sort_keys=False
Expand All @@ -173,6 +179,11 @@ def data_source_list(ctx: click.Context):

from tabulate import tabulate

warnings.warn(
"Listing data sources will only work properly if all data sources have names or table names specified. "
"Starting Feast 0.21, data source unique names will be required to encourage data source discovery",
RuntimeWarning,
)
print(tabulate(table, headers=["NAME", "CLASS"], tablefmt="plain"))


Expand Down
5 changes: 5 additions & 0 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Iterable, Optional, Tuple

from google.protobuf.json_format import MessageToJson

from feast import type_map
from feast.data_format import StreamFormat
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
Expand Down Expand Up @@ -180,6 +182,9 @@ def __init__(
def __hash__(self):
return hash((id(self), self.name))

def __str__(self):
return str(MessageToJson(self.to_proto()))

def __eq__(self, other):
if not isinstance(other, DataSource):
raise TypeError("Comparisons should only involve DataSource class objects.")
Expand Down
10 changes: 8 additions & 2 deletions sdk/python/feast/diff/registry_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ def to_string(self):
continue
if feast_object_diff.transition_type == TransitionType.UNCHANGED:
continue
if feast_object_diff.feast_object_type == FeastObjectType.DATA_SOURCE:
# TODO(adchia): Print statements out starting in Feast 0.21
continue
action, color = message_action_map[feast_object_diff.transition_type]
log_string += f"{action} {feast_object_diff.feast_object_type.value} {Style.BRIGHT + color}{feast_object_diff.name}{Style.RESET_ALL}\n"
if feast_object_diff.transition_type == TransitionType.UPDATE:
Expand All @@ -78,8 +81,11 @@ def to_string(self):
def tag_objects_for_keep_delete_update_add(
existing_objs: Iterable[FeastObject], desired_objs: Iterable[FeastObject]
) -> Tuple[Set[FeastObject], Set[FeastObject], Set[FeastObject], Set[FeastObject]]:
existing_obj_names = {e.name for e in existing_objs}
desired_obj_names = {e.name for e in desired_objs}
# TODO(adchia): Remove the "if X.name" condition when data sources are forced to have names
existing_obj_names = {e.name for e in existing_objs if e.name}
desired_objs = [obj for obj in desired_objs if obj.name]
existing_objs = [obj for obj in existing_objs if obj.name]
desired_obj_names = {e.name for e in desired_objs if e.name}

objs_to_add = {e for e in desired_objs if e.name not in existing_obj_names}
objs_to_update = {e for e in desired_objs if e.name in existing_obj_names}
Expand Down
17 changes: 10 additions & 7 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1971,13 +1971,16 @@ def _validate_feature_views(feature_views: List[BaseFeatureView]):
def _validate_data_sources(data_sources: List[DataSource]):
""" Verify data sources have case-insensitively unique names"""
ds_names = set()
for fv in data_sources:
case_insensitive_ds_name = fv.name.lower()
for ds in data_sources:
case_insensitive_ds_name = ds.name.lower()
if case_insensitive_ds_name in ds_names:
raise ValueError(
f"More than one data source with name {case_insensitive_ds_name} found. "
f"Please ensure that all data source names are case-insensitively unique. "
f"It may be necessary to ignore certain files in your feature repository by using a .feastignore file."
)
if case_insensitive_ds_name.strip():
warnings.warn(
f"More than one data source with name {case_insensitive_ds_name} found. "
f"Please ensure that all data source names are case-insensitively unique. "
f"It may be necessary to ignore certain files in your feature repository by using a .feastignore "
f"file. Starting in Feast 0.21, unique names (perhaps inferred from the table name) will be "
f"required in data sources to encourage data source discovery"
)
else:
ds_names.add(case_insensitive_ds_name)
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/offline_stores/bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __init__(
else:
warnings.warn(
(
"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) or `table`."
f"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) or `table`: {self.query}"
),
DeprecationWarning,
)
Expand Down
16 changes: 8 additions & 8 deletions sdk/python/feast/infra/offline_stores/redshift_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ def __init__(
query (optional): The query to be executed to obtain the features.
name (optional): Name for the source. Defaults to the table_ref if not specified.
"""
# The default Redshift schema is named "public".
_schema = "public" if table and not schema else schema
self.redshift_options = RedshiftOptions(
table=table, schema=_schema, query=query
)

if table is None and query is None:
raise ValueError('No "table" argument provided.')
_name = name
Expand All @@ -50,7 +56,8 @@ def __init__(
else:
warnings.warn(
(
"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) or `table`."
f"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) "
f"or `table`: {self.query}"
),
DeprecationWarning,
)
Expand All @@ -63,13 +70,6 @@ def __init__(
date_partition_column,
)

# The default Redshift schema is named "public".
_schema = "public" if table and not schema else schema

self.redshift_options = RedshiftOptions(
table=table, schema=_schema, query=query
)

@staticmethod
def from_proto(data_source: DataSourceProto):
"""
Expand Down
16 changes: 8 additions & 8 deletions sdk/python/feast/infra/offline_stores/snowflake_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ def __init__(
"""
if table is None and query is None:
raise ValueError('No "table" argument provided.')
# The default Snowflake schema is named "PUBLIC".
_schema = "PUBLIC" if (database and table and not schema) else schema

self.snowflake_options = SnowflakeOptions(
database=database, schema=_schema, table=table, query=query
)

# If no name, use the table as the default name
_name = name
Expand All @@ -53,7 +59,8 @@ def __init__(
else:
warnings.warn(
(
"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) or `table`."
f"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) "
f"or `table`: {self.query}"
),
DeprecationWarning,
)
Expand All @@ -66,13 +73,6 @@ def __init__(
date_partition_column,
)

# The default Snowflake schema is named "PUBLIC".
_schema = "PUBLIC" if (database and table and not schema) else schema

self.snowflake_options = SnowflakeOptions(
database=database, schema=_schema, table=table, query=query
)

@staticmethod
def from_proto(data_source: DataSourceProto):
"""
Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,13 @@ def apply_data_source(
commit: Whether to immediately commit to the registry
"""
registry = self._prepare_registry_for_changes()

for idx, existing_data_source_proto in enumerate(registry.data_sources):
if existing_data_source_proto.name == data_source.name:
del registry.data_sources[idx]
data_source_proto = data_source.to_proto()
data_source_proto.data_source_class_type = (
f"{data_source.__class__.__module__}.{data_source.__class__.__name__}"
)
data_source_proto.project = project
registry.data_sources.append(data_source_proto)
if commit:
Expand Down
6 changes: 2 additions & 4 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,8 @@ def extract_objects_for_apply_delete(project, registry, repo):
return (
all_to_apply,
all_to_delete,
set(
objs_to_add[FeastObjectType.FEATURE_VIEW].union(
objs_to_update[FeastObjectType.FEATURE_VIEW]
)
set(objs_to_add[FeastObjectType.FEATURE_VIEW]).union(
set(objs_to_update[FeastObjectType.FEATURE_VIEW])
),
objs_to_delete[FeastObjectType.FEATURE_VIEW],
)
Expand Down
12 changes: 12 additions & 0 deletions sdk/python/tests/example_repos/example_feature_repo_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@
created_timestamp_column="created_timestamp",
)

driver_locations_source_query = BigQuerySource(
query="SELECT * from feast-oss.public.drivers",
event_timestamp_column="event_timestamp",
created_timestamp_column="created_timestamp",
)

driver_locations_source_query_2 = BigQuerySource(
query="SELECT lat * 2 FROM feast-oss.public.drivers",
event_timestamp_column="event_timestamp",
created_timestamp_column="created_timestamp",
)

customer_profile_source = BigQuerySource(
name="customer_profile_source",
table_ref="feast-oss.public.customers",
Expand Down

0 comments on commit 9baba23

Please sign in to comment.