Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Simplify DataSource.from_proto logic #2424

Merged
merged 11 commits into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ message DataSource {

// This is an internal field that is represents the python class for the data source object a proto object represents.
// This should be set by feast, and not by users.
// The field is used primarily by custom data sources and is mandatory for them to set. Feast may set it for
// first party sources as well.
string data_source_class_type = 17;

// Defines options for DataSource that sources features from a file
Expand Down
62 changes: 23 additions & 39 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,18 @@ def to_proto(self) -> DataSourceProto.KinesisOptions:
return kinesis_options_proto


_DATA_SOURCE_OPTIONS = {
DataSourceProto.SourceType.BATCH_FILE: "feast.infra.offline_stores.file_source.FileSource",
DataSourceProto.SourceType.BATCH_BIGQUERY: "feast.infra.offline_stores.bigquery_source.BigQuerySource",
DataSourceProto.SourceType.BATCH_REDSHIFT: "feast.infra.offline_stores.redshift_source.RedshiftSource",
DataSourceProto.SourceType.BATCH_SNOWFLAKE: "feast.infra.offline_stores.snowflake_source.SnowflakeSource",
DataSourceProto.SourceType.STREAM_KAFKA: "feast.data_source.KafkaSource",
DataSourceProto.SourceType.STREAM_KINESIS: "feast.data_source.KinesisSource",
DataSourceProto.SourceType.REQUEST_SOURCE: "feast.data_source.RequestDataSource",
DataSourceProto.SourceType.PUSH_SOURCE: "feast.data_source.PushSource",
}


class DataSource(ABC):
"""
DataSource that can be used to source features.
Expand Down Expand Up @@ -210,48 +222,20 @@ def from_proto(data_source: DataSourceProto) -> Any:
Raises:
ValueError: The type of DataSource could not be identified.
"""
if data_source.data_source_class_type:
cls = get_data_source_class_from_type(data_source.data_source_class_type)
return cls.from_proto(data_source)

if data_source.request_data_options and data_source.request_data_options.schema:
data_source_obj = RequestDataSource.from_proto(data_source)
elif data_source.file_options.file_format and data_source.file_options.file_url:
from feast.infra.offline_stores.file_source import FileSource

data_source_obj = FileSource.from_proto(data_source)
elif (
data_source.bigquery_options.table_ref or data_source.bigquery_options.query
):
from feast.infra.offline_stores.bigquery_source import BigQuerySource

data_source_obj = BigQuerySource.from_proto(data_source)
elif data_source.redshift_options.table or data_source.redshift_options.query:
from feast.infra.offline_stores.redshift_source import RedshiftSource

data_source_obj = RedshiftSource.from_proto(data_source)

elif data_source.snowflake_options.table or data_source.snowflake_options.query:
from feast.infra.offline_stores.snowflake_source import SnowflakeSource

data_source_obj = SnowflakeSource.from_proto(data_source)

elif (
data_source.kafka_options.bootstrap_servers
and data_source.kafka_options.topic
and data_source.kafka_options.message_format
data_source_type = data_source.type
if not data_source_type or (
data_source_type
not in list(_DATA_SOURCE_OPTIONS.keys())
+ [DataSourceProto.SourceType.CUSTOM_SOURCE]
):
data_source_obj = KafkaSource.from_proto(data_source)
elif (
data_source.kinesis_options.record_format
and data_source.kinesis_options.region
and data_source.kinesis_options.stream_name
):
data_source_obj = KinesisSource.from_proto(data_source)
else:
raise ValueError("Could not identify the source type being added.")

return data_source_obj
if data_source_type == DataSourceProto.SourceType.CUSTOM_SOURCE:
cls = get_data_source_class_from_type(data_source.data_source_class_type)
felixwang9817 marked this conversation as resolved.
Show resolved Hide resolved
return cls.from_proto(data_source)

cls = get_data_source_class_from_type(_DATA_SOURCE_OPTIONS[data_source_type])
return cls.from_proto(data_source)

@abstractmethod
def to_proto(self) -> DataSourceProto:
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,9 @@ def apply_data_source(
del registry.data_sources[idx]
data_source_proto = data_source.to_proto()
data_source_proto.project = project
data_source_proto.data_source_class_type = (
f"{data_source.__class__.__module__}.{data_source.__class__.__name__}"
)
registry.data_sources.append(data_source_proto)
if commit:
self.commit()
Expand Down