Skip to content

Commit

Permalink
fix: Making a name for data sources not a breaking change (#2379)
Browse files Browse the repository at this point in the history
* fix: Making a name for data sources not a breaking change

Signed-off-by: Danny Chiao <[email protected]>

* fix test

Signed-off-by: Danny Chiao <[email protected]>
  • Loading branch information
adchia committed Mar 9, 2022
1 parent a6211cf commit 993b8cc
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 27 deletions.
16 changes: 10 additions & 6 deletions sdk/python/feast/infra/offline_stores/bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from feast import type_map
from feast.data_source import DataSource
from feast.errors import DataSourceNoNameException, DataSourceNotFoundException
from feast.errors import DataSourceNotFoundException
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.protos.feast.core.SavedDataset_pb2 import (
SavedDatasetStorage as SavedDatasetStorageProto,
Expand All @@ -16,19 +16,18 @@
class BigQuerySource(DataSource):
def __init__(
self,
name: Optional[str] = None,
event_timestamp_column: Optional[str] = "",
table: Optional[str] = None,
table_ref: Optional[str] = None,
created_timestamp_column: Optional[str] = "",
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = "",
query: Optional[str] = None,
name: Optional[str] = None,
):
"""Create a BigQuerySource from an existing table or query.
Args:
name (optional): Name for the source. Defaults to the table_ref if not specified.
table (optional): The BigQuery table where features can be found.
table_ref (optional): (Deprecated) The BigQuery table where features can be found.
event_timestamp_column: Event timestamp column used for point in time joins of feature values.
Expand All @@ -37,13 +36,13 @@ def __init__(
or view. Only used for feature columns, not entities or timestamp columns.
date_partition_column (optional): Timestamp column used for partitioning.
query (optional): SQL query to execute to generate data for this data source.
name (optional): Name for the source. Defaults to the table_ref if not specified.
Example:
>>> from feast import BigQuerySource
>>> my_bigquery_source = BigQuerySource(table="gcp_project:bq_dataset.bq_table")
"""
if table is None and table_ref is None and query is None:
raise ValueError('No "table" argument provided.')
raise ValueError('No "table" or "query" argument provided.')
if not table and table_ref:
warnings.warn(
(
Expand All @@ -63,7 +62,12 @@ def __init__(
elif table_ref:
_name = table_ref
else:
raise DataSourceNoNameException()
warnings.warn(
(
"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) or `table`."
),
DeprecationWarning,
)

super().__init__(
_name if _name else "",
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/file_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,18 @@ class FileSource(DataSource):
def __init__(
self,
path: str,
name: Optional[str] = "",
event_timestamp_column: Optional[str] = "",
file_format: Optional[FileFormat] = None,
created_timestamp_column: Optional[str] = "",
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = "",
s3_endpoint_override: Optional[str] = None,
name: Optional[str] = "",
):
"""Create a FileSource from a file containing feature data. Only Parquet format supported.
Args:
name (optional): Name for the file source. Defaults to the path.
path: File path to file containing feature data. Must contain an event_timestamp column, entity columns and
feature columns.
event_timestamp_column: Event timestamp column used for point in time joins of feature values.
Expand All @@ -42,6 +41,7 @@ def __init__(
or view. Only used for feature columns, not entities or timestamp columns.
date_partition_column (optional): Timestamp column used for partitioning.
s3_endpoint_override (optional): Overrides AWS S3 enpoint with custom S3 storage
name (optional): Name for the file source. Defaults to the path.
Examples:
>>> from feast import FileSource
Expand Down
21 changes: 11 additions & 10 deletions sdk/python/feast/infra/offline_stores/redshift_source.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import warnings
from typing import Callable, Dict, Iterable, Optional, Tuple

from feast import type_map
from feast.data_source import DataSource
from feast.errors import (
DataSourceNoNameException,
DataSourceNotFoundException,
RedshiftCredentialsError,
)
from feast.errors import DataSourceNotFoundException, RedshiftCredentialsError
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.protos.feast.core.SavedDataset_pb2 import (
SavedDatasetStorage as SavedDatasetStorageProto,
Expand All @@ -19,20 +16,19 @@
class RedshiftSource(DataSource):
def __init__(
self,
name: Optional[str] = None,
event_timestamp_column: Optional[str] = "",
table: Optional[str] = None,
schema: Optional[str] = None,
created_timestamp_column: Optional[str] = "",
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = "",
query: Optional[str] = None,
name: Optional[str] = None,
):
"""
Creates a RedshiftSource object.
Args:
name (optional): Name for the source. Defaults to the table_ref if not specified.
event_timestamp_column (optional): Event timestamp column used for point in
time joins of feature values.
table (optional): Redshift table where the features are stored.
Expand All @@ -43,6 +39,7 @@ def __init__(
source to column names in a feature table or view.
date_partition_column (optional): Timestamp column used for partitioning.
query (optional): The query to be executed to obtain the features.
name (optional): Name for the source. Defaults to the table_ref if not specified.
"""
if table is None and query is None:
raise ValueError('No "table" argument provided.')
Expand All @@ -51,11 +48,15 @@ def __init__(
if table:
_name = table
else:
raise DataSourceNoNameException()
warnings.warn(
(
"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) or `table`."
),
DeprecationWarning,
)

# TODO(adchia): figure out what to do if user uses the query to start
super().__init__(
_name,
_name if _name else "",
event_timestamp_column,
created_timestamp_column,
field_mapping,
Expand Down
16 changes: 10 additions & 6 deletions sdk/python/feast/infra/offline_stores/snowflake_source.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import warnings
from typing import Callable, Dict, Iterable, Optional, Tuple

from feast import type_map
from feast.data_source import DataSource
from feast.errors import DataSourceNoNameException
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.protos.feast.core.SavedDataset_pb2 import (
SavedDatasetStorage as SavedDatasetStorageProto,
Expand All @@ -15,7 +15,6 @@
class SnowflakeSource(DataSource):
def __init__(
self,
name: Optional[str] = None,
database: Optional[str] = None,
schema: Optional[str] = None,
table: Optional[str] = None,
Expand All @@ -24,12 +23,12 @@ def __init__(
created_timestamp_column: Optional[str] = "",
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = "",
name: Optional[str] = None,
):
"""
Creates a SnowflakeSource object.
Args:
name (optional): Name for the source. Defaults to the table if not specified.
database (optional): Snowflake database where the features are stored.
schema (optional): Snowflake schema in which the table is located.
table (optional): Snowflake table where the features are stored.
Expand All @@ -41,7 +40,7 @@ def __init__(
field_mapping (optional): A dictionary mapping of column names in this data
source to column names in a feature table or view.
date_partition_column (optional): Timestamp column used for partitioning.
name (optional): Name for the source. Defaults to the table if not specified.
"""
if table is None and query is None:
raise ValueError('No "table" argument provided.')
Expand All @@ -52,10 +51,15 @@ def __init__(
if table:
_name = table
else:
raise DataSourceNoNameException()
warnings.warn(
(
"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) or `table`."
),
DeprecationWarning,
)

super().__init__(
_name,
_name if _name else "",
event_timestamp_column,
created_timestamp_column,
field_mapping,
Expand Down
13 changes: 10 additions & 3 deletions sdk/python/tests/integration/registration/test_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
update_data_sources_with_inferred_event_timestamp_col,
update_entities_with_inferred_types_from_feature_views,
)
from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
SparkSource,
)
from feast.on_demand_feature_view import on_demand_feature_view
from tests.utils.data_source_utils import (
prep_file_source,
Expand Down Expand Up @@ -83,7 +86,7 @@ def test_infer_datasource_names_file():

def test_infer_datasource_names_dwh():
table = "project.table"
dwh_classes = [BigQuerySource, RedshiftSource, SnowflakeSource]
dwh_classes = [BigQuerySource, RedshiftSource, SnowflakeSource, SparkSource]

for dwh_class in dwh_classes:
data_source = dwh_class(table=table)
Expand All @@ -98,9 +101,13 @@ def test_infer_datasource_names_dwh():
assert data_source_with_query.name == source_name

# If we have a query and no name, throw an error
with pytest.raises(DataSourceNoNameException):
print(f"Testing dwh {dwh_class}")
if dwh_class == SparkSource:
with pytest.raises(DataSourceNoNameException):
print(f"Testing dwh {dwh_class}")
data_source = dwh_class(query="test_query")
else:
data_source = dwh_class(query="test_query")
assert data_source.name == ""


@pytest.mark.integration
Expand Down

0 comments on commit 993b8cc

Please sign in to comment.