Skip to content

Commit

Permalink
chore: Rename stream data source parameters (#2804)
Browse files Browse the repository at this point in the history
* Deprecate `bootstrap_servers` parameter in KafkaSource

Signed-off-by: Felix Wang <[email protected]>

* Rename `watermark` to `watermark_delay_threshold` for KafkaSource

Signed-off-by: Felix Wang <[email protected]>

* Deprecate `date_partition_column` for all data sources

Signed-off-by: Felix Wang <[email protected]>

* Fix Java

Signed-off-by: Felix Wang <[email protected]>

* Fix SparkKafkaProcessor

Signed-off-by: Felix Wang <[email protected]>

* Clarify comment

Signed-off-by: Felix Wang <[email protected]>

* More clarifications

Signed-off-by: Felix Wang <[email protected]>
  • Loading branch information
felixwang9817 authored Jun 17, 2022
1 parent 19aedc2 commit a62a90d
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public static DataSource createKafkaDataSourceSpec(
.setKafkaOptions(
KafkaOptions.newBuilder()
.setTopic(topic)
.setBootstrapServers(servers)
.setKafkaBootstrapServers(servers)
.setMessageFormat(createProtoFormat("class.path"))
.build())
.setTimestampField(timestampColumn)
Expand Down
5 changes: 3 additions & 2 deletions protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,16 @@ message DataSource {
// Java Protobuf class at the given class path
message KafkaOptions {
// Comma separated list of Kafka bootstrap servers. Used for feature tables without a defined source host[:port]]
string bootstrap_servers = 1;
string kafka_bootstrap_servers = 1;

// Kafka topic to collect feature data from.
string topic = 2;

// Defines the stream data format encoding feature/entity data in Kafka messages.
StreamFormat message_format = 3;

google.protobuf.Duration watermark = 4;
// Watermark delay threshold for stream data
google.protobuf.Duration watermark_delay_threshold = 4;
}

// Defines options for DataSource that sources features from Kinesis records.
Expand Down
143 changes: 83 additions & 60 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ class KafkaOptions:

def __init__(
self,
bootstrap_servers: str,
kafka_bootstrap_servers: str,
message_format: StreamFormat,
topic: str,
watermark: Optional[timedelta] = None,
watermark_delay_threshold: Optional[timedelta] = None,
):
self.bootstrap_servers = bootstrap_servers
self.kafka_bootstrap_servers = kafka_bootstrap_servers
self.message_format = message_format
self.topic = topic
self.watermark = watermark or None
self.watermark_delay_threshold = watermark_delay_threshold or None

@classmethod
def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions):
Expand All @@ -71,18 +71,18 @@ def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions):
Returns:
Returns a BigQueryOptions object based on the kafka_options protobuf
"""
watermark = None
if kafka_options_proto.HasField("watermark"):
watermark = (
watermark_delay_threshold = None
if kafka_options_proto.HasField("watermark_delay_threshold"):
watermark_delay_threshold = (
timedelta(days=0)
if kafka_options_proto.watermark.ToNanoseconds() == 0
else kafka_options_proto.watermark.ToTimedelta()
if kafka_options_proto.watermark_delay_threshold.ToNanoseconds() == 0
else kafka_options_proto.watermark_delay_threshold.ToTimedelta()
)
kafka_options = cls(
bootstrap_servers=kafka_options_proto.bootstrap_servers,
kafka_bootstrap_servers=kafka_options_proto.kafka_bootstrap_servers,
message_format=StreamFormat.from_proto(kafka_options_proto.message_format),
topic=kafka_options_proto.topic,
watermark=watermark,
watermark_delay_threshold=watermark_delay_threshold,
)

return kafka_options
Expand All @@ -94,16 +94,16 @@ def to_proto(self) -> DataSourceProto.KafkaOptions:
Returns:
KafkaOptionsProto protobuf
"""
watermark_duration = None
if self.watermark is not None:
watermark_duration = Duration()
watermark_duration.FromTimedelta(self.watermark)
watermark_delay_threshold = None
if self.watermark_delay_threshold is not None:
watermark_delay_threshold = Duration()
watermark_delay_threshold.FromTimedelta(self.watermark_delay_threshold)

kafka_options_proto = DataSourceProto.KafkaOptions(
bootstrap_servers=self.bootstrap_servers,
kafka_bootstrap_servers=self.kafka_bootstrap_servers,
message_format=self.message_format.to_proto(),
topic=self.topic,
watermark=watermark_duration,
watermark_delay_threshold=watermark_delay_threshold,
)

return kafka_options_proto
Expand Down Expand Up @@ -178,8 +178,8 @@ class DataSource(ABC):
Args:
name: Name of data source, which should be unique within a project
event_timestamp_column (optional): (Deprecated) Event timestamp column used for point in time
joins of feature values.
event_timestamp_column (optional): (Deprecated in favor of timestamp_field) Event
timestamp column used for point in time joins of feature values.
created_timestamp_column (optional): Timestamp column indicating when the row
was created, used for deduplicating rows.
field_mapping (optional): A dictionary mapping of column names in this data
Expand Down Expand Up @@ -220,8 +220,8 @@ def __init__(
Creates a DataSource object.
Args:
name: Name of data source, which should be unique within a project
event_timestamp_column (optional): (Deprecated) Event timestamp column used for point in time
joins of feature values.
event_timestamp_column (optional): (Deprecated in favor of timestamp_field) Event
timestamp column used for point in time joins of feature values.
created_timestamp_column (optional): Timestamp column indicating when the row
was created, used for deduplicating rows.
field_mapping (optional): A dictionary mapping of column names in this data
Expand Down Expand Up @@ -260,6 +260,14 @@ def __init__(
self.date_partition_column = (
date_partition_column if date_partition_column else ""
)
if date_partition_column:
warnings.warn(
(
"The argument 'date_partition_column' is being deprecated. "
"Feast 0.25 and onwards will not support 'date_timestamp_column' for data sources."
),
DeprecationWarning,
)
self.description = description or ""
self.tags = tags or {}
self.owner = owner or ""
Expand Down Expand Up @@ -364,20 +372,13 @@ def get_table_query_string(self) -> str:


class KafkaSource(DataSource):
def validate(self, config: RepoConfig):
pass

def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
pass

def __init__(
self,
*args,
name: Optional[str] = None,
event_timestamp_column: Optional[str] = "",
bootstrap_servers: Optional[str] = None,
kafka_bootstrap_servers: Optional[str] = None,
message_format: Optional[StreamFormat] = None,
topic: Optional[str] = None,
created_timestamp_column: Optional[str] = "",
Expand All @@ -388,31 +389,34 @@ def __init__(
owner: Optional[str] = "",
timestamp_field: Optional[str] = "",
batch_source: Optional[DataSource] = None,
watermark: Optional[timedelta] = None,
watermark_delay_threshold: Optional[timedelta] = None,
):
"""
Creates a KafkaSource stream source object.
Creates a KafkaSource object.
Args:
name: str. Name of data source, which should be unique within a project
event_timestamp_column (optional): str. (Deprecated) Event timestamp column used for point in time
joins of feature values.
bootstrap_servers: str. The servers of the kafka broker in the form "localhost:9092".
message_format: StreamFormat. StreamFormat of serialized messages.
topic: str. The name of the topic to read from in the kafka source.
created_timestamp_column (optional): str. Timestamp column indicating when the row
name: Name of data source, which should be unique within a project
event_timestamp_column (optional): (Deprecated in favor of timestamp_field) Event
timestamp column used for point in time joins of feature values.
bootstrap_servers: (Deprecated) The servers of the kafka broker in the form "localhost:9092".
kafka_bootstrap_servers: The servers of the kafka broker in the form "localhost:9092".
message_format: StreamFormat of serialized messages.
topic: The name of the topic to read from in the kafka source.
created_timestamp_column (optional): Timestamp column indicating when the row
was created, used for deduplicating rows.
field_mapping (optional): dict(str, str). A dictionary mapping of column names in this data
field_mapping (optional): A dictionary mapping of column names in this data
source to feature names in a feature table or view. Only used for feature
columns, not entity or timestamp columns.
date_partition_column (optional): str. Timestamp column used for partitioning.
description (optional): str. A human-readable description.
tags (optional): dict(str, str). A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): str. The owner of the data source, typically the email of the primary
date_partition_column (optional): Timestamp column used for partitioning.
description (optional): A human-readable description.
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): The owner of the data source, typically the email of the primary
maintainer.
timestamp_field (optional): str. Event timestamp field used for point
timestamp_field (optional): Event timestamp field used for point
in time joins of feature values.
batch_source: DataSource. The datasource that acts as a batch source.
watermark: timedelta. The watermark for stream data. Specifically how late stream data can arrive without being discarded.
batch_source: The datasource that acts as a batch source.
watermark_delay_threshold: The watermark delay threshold for stream data. Specifically how
late stream data can arrive without being discarded.
"""
positional_attributes = [
"name",
Expand All @@ -423,10 +427,19 @@ def __init__(
]
_name = name
_event_timestamp_column = event_timestamp_column
_bootstrap_servers = bootstrap_servers or ""
_kafka_bootstrap_servers = kafka_bootstrap_servers or bootstrap_servers or ""
_message_format = message_format
_topic = topic or ""

if bootstrap_servers:
warnings.warn(
(
"The 'bootstrap_servers' parameter has been deprecated in favor of 'kafka_bootstrap_servers'. "
"Feast 0.25 and onwards will not support the 'bootstrap_servers' parameter."
),
DeprecationWarning,
)

if args:
warnings.warn(
(
Expand All @@ -445,7 +458,7 @@ def __init__(
if len(args) >= 2:
_event_timestamp_column = args[1]
if len(args) >= 3:
_bootstrap_servers = args[2]
_kafka_bootstrap_servers = args[2]
if len(args) >= 4:
_message_format = args[3]
if len(args) >= 5:
Expand All @@ -471,10 +484,10 @@ def __init__(
self.batch_source = batch_source

self.kafka_options = KafkaOptions(
bootstrap_servers=_bootstrap_servers,
kafka_bootstrap_servers=_kafka_bootstrap_servers,
message_format=_message_format,
topic=_topic,
watermark=watermark,
watermark_delay_threshold=watermark_delay_threshold,
)

def __eq__(self, other):
Expand All @@ -487,11 +500,12 @@ def __eq__(self, other):
return False

if (
self.kafka_options.bootstrap_servers
!= other.kafka_options.bootstrap_servers
self.kafka_options.kafka_bootstrap_servers
!= other.kafka_options.kafka_bootstrap_servers
or self.kafka_options.message_format != other.kafka_options.message_format
or self.kafka_options.topic != other.kafka_options.topic
or self.kafka_options.watermark != other.kafka_options.watermark
or self.kafka_options.watermark_delay_threshold
!= other.kafka_options.watermark_delay_threshold
):
return False

Expand All @@ -502,22 +516,23 @@ def __hash__(self):

@staticmethod
def from_proto(data_source: DataSourceProto):
watermark = None
if data_source.kafka_options.watermark:
watermark = (
watermark_delay_threshold = None
if data_source.kafka_options.watermark_delay_threshold:
watermark_delay_threshold = (
timedelta(days=0)
if data_source.kafka_options.watermark.ToNanoseconds() == 0
else data_source.kafka_options.watermark.ToTimedelta()
if data_source.kafka_options.watermark_delay_threshold.ToNanoseconds()
== 0
else data_source.kafka_options.watermark_delay_threshold.ToTimedelta()
)
return KafkaSource(
name=data_source.name,
event_timestamp_column=data_source.timestamp_field,
field_mapping=dict(data_source.field_mapping),
bootstrap_servers=data_source.kafka_options.bootstrap_servers,
kafka_bootstrap_servers=data_source.kafka_options.kafka_bootstrap_servers,
message_format=StreamFormat.from_proto(
data_source.kafka_options.message_format
),
watermark=watermark,
watermark_delay_threshold=watermark_delay_threshold,
topic=data_source.kafka_options.topic,
created_timestamp_column=data_source.created_timestamp_column,
timestamp_field=data_source.timestamp_field,
Expand Down Expand Up @@ -548,6 +563,14 @@ def to_proto(self) -> DataSourceProto:
data_source_proto.batch_source.MergeFrom(self.batch_source.to_proto())
return data_source_proto

def validate(self, config: RepoConfig):
pass

def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
pass

@staticmethod
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
return type_map.redshift_to_feast_value_type
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def _ingest_stream_data(self) -> StreamTable:
self.spark.readStream.format("kafka")
.option(
"kafka.bootstrap.servers",
self.data_source.kafka_options.bootstrap_servers,
self.data_source.kafka_options.kafka_bootstrap_servers,
)
.option("subscribe", self.data_source.kafka_options.topic)
.option("startingOffsets", "latest") # Query start
Expand All @@ -100,7 +100,7 @@ def _ingest_stream_data(self) -> StreamTable:
self.spark.readStream.format("kafka")
.option(
"kafka.bootstrap.servers",
self.data_source.kafka_options.bootstrap_servers,
self.data_source.kafka_options.kafka_bootstrap_servers,
)
.option("subscribe", self.data_source.kafka_options.topic)
.option("startingOffsets", "latest") # Query start
Expand Down
3 changes: 2 additions & 1 deletion sdk/python/feast/infra/offline_stores/bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def __init__(
Args:
table (optional): The BigQuery table where features can be found.
event_timestamp_column: (Deprecated) Event timestamp column used for point in time joins of feature values.
event_timestamp_column (optional): (Deprecated in favor of timestamp_field) Event
timestamp column used for point in time joins of feature values.
created_timestamp_column (optional): Timestamp column when row was created, used for deduplicating rows.
field_mapping: A dictionary mapping of column names in this data source to feature names in a feature table
or view. Only used for feature columns, not entities or timestamp columns.
Expand Down
3 changes: 2 additions & 1 deletion sdk/python/feast/infra/offline_stores/file_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def __init__(
path: File path to file containing feature data. Must contain an event_timestamp column, entity columns and
feature columns.
event_timestamp_column(optional): (Deprecated) Event timestamp column used for point in time joins of feature values.
event_timestamp_column (optional): (Deprecated in favor of timestamp_field) Event
timestamp column used for point in time joins of feature values.
created_timestamp_column (optional): Timestamp column when row was created, used for deduplicating rows.
file_format (optional): Explicitly set the file format. Allows Feast to bypass inferring the file format.
field_mapping: A dictionary mapping of column names in this data source to feature names in a feature table
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/redshift_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ def __init__(
Creates a RedshiftSource object.
Args:
event_timestamp_column (optional): (Deprecated) Event timestamp column used for point in
time joins of feature values.
event_timestamp_column (optional): (Deprecated in favor of timestamp_field) Event
timestamp column used for point in time joins of feature values.
table (optional): Redshift table where the features are stored.
schema (optional): Redshift schema in which the table is located.
created_timestamp_column (optional): Timestamp column indicating when the
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/snowflake_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ def __init__(
warehouse (optional): Snowflake warehouse where the database is stored.
schema (optional): Snowflake schema in which the table is located.
table (optional): Snowflake table where the features are stored.
event_timestamp_column (optional): (Deprecated) Event timestamp column used for point in
time joins of feature values.
event_timestamp_column (optional): (Deprecated in favor of timestamp_field) Event
timestamp column used for point in time joins of feature values.
query (optional): The query to be executed to obtain the features.
created_timestamp_column (optional): Timestamp column indicating when the
row was created, used for deduplicating rows.
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/tests/integration/registration/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,11 @@ def simple_udf(x: int):
stream_source = KafkaSource(
name="kafka",
timestamp_field="event_timestamp",
bootstrap_servers="",
kafka_bootstrap_servers="",
message_format=AvroFormat(""),
topic="topic",
batch_source=FileSource(path="some path"),
watermark=timedelta(days=1),
watermark_delay_threshold=timedelta(days=1),
)

sfv = StreamFeatureView(
Expand Down
Loading

0 comments on commit a62a90d

Please sign in to comment.