From 00ed65a77177cfe04877e9550d1c8c1e903dadf8 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Sat, 23 Apr 2022 20:51:52 -0700 Subject: [PATCH] fix: Fix broken proto conversion methods for data sources (#2603) * Fix Snowflake proto conversion and add test Signed-off-by: Felix Wang * Add proto conversion test for FileSource Signed-off-by: Felix Wang * Fix Redshift proto conversion and add test Signed-off-by: Felix Wang * Add proto conversion test for BigQuerySource Signed-off-by: Felix Wang * Fix tests to use DataSource.from_proto Signed-off-by: Felix Wang * Add proto conversion test for KafkaSource Signed-off-by: Felix Wang * Add proto conversion test for KinesisSource Signed-off-by: Felix Wang * Add proto conversion test for PushSource Signed-off-by: Felix Wang * Add proto conversion test for PushSource Signed-off-by: Felix Wang * Add name and other fixes Signed-off-by: Felix Wang * Fix proto conversion tests Signed-off-by: Felix Wang * Add tags to test Signed-off-by: Felix Wang * Fix BigQuerySource bug Signed-off-by: Felix Wang * Fix bug in RedshiftSource and TrinoSource Signed-off-by: Felix Wang * Remove references to event_timestamp_column Signed-off-by: Felix Wang --- go/cmd/server/logging/feature_repo/example.py | 2 +- sdk/python/feast/data_source.py | 23 ++-- sdk/python/feast/inference.py | 2 +- .../feast/infra/offline_stores/bigquery.py | 22 ++-- .../infra/offline_stores/bigquery_source.py | 56 ++------- .../postgres_offline_store/postgres.py | 22 ++-- .../contrib/spark_offline_store/spark.py | 26 ++--- .../contrib/trino_offline_store/trino.py | 22 ++-- .../trino_offline_store/trino_source.py | 2 +- sdk/python/feast/infra/offline_stores/file.py | 88 ++++++-------- .../feast/infra/offline_stores/file_source.py | 70 ++--------- .../infra/offline_stores/offline_store.py | 12 +- .../infra/offline_stores/offline_utils.py | 4 +- .../feast/infra/offline_stores/redshift.py | 22 ++-- .../infra/offline_stores/redshift_source.py | 67 ++--------- .../feast/infra/offline_stores/snowflake.py | 24 ++-- .../infra/offline_stores/snowflake_source.py | 83 +++---------- .../feast/infra/passthrough_provider.py | 6 +- sdk/python/feast/infra/provider.py | 14 +-- .../universal/data_source_creator.py | 2 +- .../universal/data_sources/trino.py | 2 +- .../online_store/test_universal_online.py | 4 +- .../registration/test_feature_store.py | 8 +- .../registration/test_inference.py | 4 +- .../tests/unit/diff/test_registry_diff.py | 8 +- sdk/python/tests/unit/test_data_sources.py | 110 +++++++++++++++++- sdk/python/tests/unit/test_feature_views.py | 4 +- sdk/python/tests/utils/data_source_utils.py | 16 ++- 28 files changed, 313 insertions(+), 412 deletions(-) diff --git a/go/cmd/server/logging/feature_repo/example.py b/go/cmd/server/logging/feature_repo/example.py index f3ca612308..f78470efd5 100644 --- a/go/cmd/server/logging/feature_repo/example.py +++ b/go/cmd/server/logging/feature_repo/example.py @@ -9,7 +9,7 @@ # for more info. driver_hourly_stats = FileSource( path="driver_stats.parquet", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created", ) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 79c6cbdf51..6a2b9a0d14 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -409,7 +409,7 @@ def __init__( if _message_format is None: raise ValueError("Message format must be specified for Kafka source") - print("Asdfasdf") + super().__init__( event_timestamp_column=_event_timestamp_column, created_timestamp_column=created_timestamp_column, @@ -467,7 +467,9 @@ def from_proto(data_source: DataSourceProto): description=data_source.description, tags=dict(data_source.tags), owner=data_source.owner, - batch_source=DataSource.from_proto(data_source.batch_source), + batch_source=DataSource.from_proto(data_source.batch_source) + if data_source.batch_source + else None, ) def to_proto(self) -> DataSourceProto: @@ -500,17 +502,20 @@ class RequestSource(DataSource): """ RequestSource that can be used to provide input features for on demand transforms - Args: + Attributes: name: Name of the request data source - schema Union[Dict[str, ValueType], List[Field]]: Schema mapping from the input feature name to a ValueType - description (optional): A human-readable description. - tags (optional): A dictionary of key-value pairs to store arbitrary metadata. - owner (optional): The owner of the request data source, typically the email of the primary + schema: Schema mapping from the input feature name to a ValueType + description: A human-readable description. + tags: A dictionary of key-value pairs to store arbitrary metadata. + owner: The owner of the request data source, typically the email of the primary maintainer. """ name: str schema: List[Field] + description: str + tags: Dict[str, str] + owner: str def __init__( self, @@ -697,7 +702,9 @@ def from_proto(data_source: DataSourceProto): description=data_source.description, tags=dict(data_source.tags), owner=data_source.owner, - batch_source=DataSource.from_proto(data_source.batch_source), + batch_source=DataSource.from_proto(data_source.batch_source) + if data_source.batch_source + else None, ) @staticmethod diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 711b37c0b4..5c0f0e1d28 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -71,7 +71,7 @@ def update_entities_with_inferred_types_from_feature_views( def update_data_sources_with_inferred_event_timestamp_col( data_sources: List[DataSource], config: RepoConfig ) -> None: - ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column" + ERROR_MSG_PREFIX = "Unable to infer DataSource timestamp_field" for data_source in data_sources: if isinstance(data_source, RequestSource): diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 1e27fc326b..29d0e029d9 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -83,7 +83,7 @@ def pull_latest_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime, end_date: datetime, @@ -96,7 +96,7 @@ def pull_latest_from_table_or_query( partition_by_join_key_string = ( "PARTITION BY " + partition_by_join_key_string ) - timestamps = [event_timestamp_column] + timestamps = [timestamp_field] if created_timestamp_column: timestamps.append(created_timestamp_column) timestamp_desc_string = " DESC, ".join(timestamps) + " DESC" @@ -114,7 +114,7 @@ def pull_latest_from_table_or_query( SELECT {field_string}, ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row FROM {from_expression} - WHERE {event_timestamp_column} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}') + WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}') ) WHERE _feast_row = 1 """ @@ -131,7 +131,7 @@ def pull_all_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, start_date: datetime, end_date: datetime, ) -> RetrievalJob: @@ -143,12 +143,12 @@ def pull_all_from_table_or_query( location=config.offline_store.location, ) field_string = ", ".join( - join_key_columns + feature_name_columns + [event_timestamp_column] + join_key_columns + feature_name_columns + [timestamp_field] ) query = f""" SELECT {field_string} FROM {from_expression} - WHERE {event_timestamp_column} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}') + WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}') """ return BigQueryRetrievalJob( query=query, client=client, config=config, full_feature_names=False, @@ -583,9 +583,9 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str] 1. We first join the current feature_view to the entity dataframe that has been passed. This JOIN has the following logic: - - For each row of the entity dataframe, only keep the rows where the `event_timestamp_column` + - For each row of the entity dataframe, only keep the rows where the `timestamp_field` is less than the one provided in the entity dataframe - - If there a TTL for the current feature_view, also keep the rows where the `event_timestamp_column` + - If there a TTL for the current feature_view, also keep the rows where the `timestamp_field` is higher the the one provided minus the TTL - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been computed previously @@ -596,16 +596,16 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str] {{ featureview.name }}__subquery AS ( SELECT - {{ featureview.event_timestamp_column }} as event_timestamp, + {{ featureview.timestamp_field }} as event_timestamp, {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }} {{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %} {% for feature in featureview.features %} {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %} {% endfor %} FROM {{ featureview.table_subquery }} - WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}' + WHERE {{ featureview.timestamp_field }} <= '{{ featureview.max_event_timestamp }}' {% if featureview.ttl == 0 %}{% else %} - AND {{ featureview.event_timestamp_column }} >= '{{ featureview.min_event_timestamp }}' + AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}' {% endif %} ), diff --git a/sdk/python/feast/infra/offline_stores/bigquery_source.py b/sdk/python/feast/infra/offline_stores/bigquery_source.py index cb4cd1b5be..001576c98f 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery_source.py +++ b/sdk/python/feast/infra/offline_stores/bigquery_source.py @@ -99,15 +99,9 @@ def __eq__(self, other): ) return ( - self.name == other.name - and self.bigquery_options.table == other.bigquery_options.table - and self.bigquery_options.query == other.bigquery_options.query - and self.timestamp_field == other.timestamp_field - and self.created_timestamp_column == other.created_timestamp_column - and self.field_mapping == other.field_mapping - and self.description == other.description - and self.tags == other.tags - and self.owner == other.owner + super().__eq__(other) + and self.table == other.table + and self.query == other.query ) @property @@ -120,7 +114,6 @@ def query(self): @staticmethod def from_proto(data_source: DataSourceProto): - assert data_source.HasField("bigquery_options") return BigQuerySource( @@ -144,11 +137,10 @@ def to_proto(self) -> DataSourceProto: description=self.description, tags=self.tags, owner=self.owner, + timestamp_field=self.timestamp_field, + created_timestamp_column=self.created_timestamp_column, ) - data_source_proto.timestamp_field = self.timestamp_field - data_source_proto.created_timestamp_column = self.created_timestamp_column - return data_source_proto def validate(self, config: RepoConfig): @@ -179,7 +171,7 @@ def get_table_column_names_and_types( from google.cloud import bigquery client = bigquery.Client() - if self.table is not None: + if self.table: schema = client.get_table(self.table).schema if not isinstance(schema[0], bigquery.schema.SchemaField): raise TypeError("Could not parse BigQuery table schema.") @@ -200,42 +192,14 @@ def get_table_column_names_and_types( class BigQueryOptions: """ - DataSource BigQuery options used to source features from BigQuery query + Configuration options for a BigQuery data source. """ def __init__( self, table: Optional[str], query: Optional[str], ): - self._table = table - self._query = query - - @property - def query(self): - """ - Returns the BigQuery SQL query referenced by this source - """ - return self._query - - @query.setter - def query(self, query): - """ - Sets the BigQuery SQL query referenced by this source - """ - self._query = query - - @property - def table(self): - """ - Returns the table ref of this BQ table - """ - return self._table - - @table.setter - def table(self, table): - """ - Sets the table ref of this BQ table - """ - self._table = table + self.table = table or "" + self.query = query or "" @classmethod def from_proto(cls, bigquery_options_proto: DataSourceProto.BigQueryOptions): @@ -248,7 +212,6 @@ def from_proto(cls, bigquery_options_proto: DataSourceProto.BigQueryOptions): Returns: Returns a BigQueryOptions object based on the bigquery_options protobuf """ - bigquery_options = cls( table=bigquery_options_proto.table, query=bigquery_options_proto.query, ) @@ -262,7 +225,6 @@ def to_proto(self) -> DataSourceProto.BigQueryOptions: Returns: BigQueryOptionsProto protobuf """ - bigquery_options_proto = DataSourceProto.BigQueryOptions( table=self.table, query=self.query, ) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 5e99addcb4..9a3df40c03 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -55,7 +55,7 @@ def pull_latest_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime, end_date: datetime, @@ -68,7 +68,7 @@ def pull_latest_from_table_or_query( partition_by_join_key_string = ( "PARTITION BY " + partition_by_join_key_string ) - timestamps = [event_timestamp_column] + timestamps = [timestamp_field] if created_timestamp_column: timestamps.append(created_timestamp_column) timestamp_desc_string = " DESC, ".join(_append_alias(timestamps, "a")) + " DESC" @@ -87,7 +87,7 @@ def pull_latest_from_table_or_query( SELECT {a_field_string}, ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row FROM ({from_expression}) a - WHERE a."{event_timestamp_column}" BETWEEN '{start_date}'::timestamptz AND '{end_date}'::timestamptz + WHERE a."{timestamp_field}" BETWEEN '{start_date}'::timestamptz AND '{end_date}'::timestamptz ) b WHERE _feast_row = 1 """ @@ -191,7 +191,7 @@ def pull_all_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, start_date: datetime, end_date: datetime, ) -> RetrievalJob: @@ -199,7 +199,7 @@ def pull_all_from_table_or_query( from_expression = data_source.get_table_query_string() field_string = ", ".join( - join_key_columns + feature_name_columns + [event_timestamp_column] + join_key_columns + feature_name_columns + [timestamp_field] ) start_date = start_date.astimezone(tz=utc) @@ -208,7 +208,7 @@ def pull_all_from_table_or_query( query = f""" SELECT {field_string} FROM {from_expression} - WHERE "{event_timestamp_column}" BETWEEN '{start_date}'::timestamptz AND '{end_date}'::timestamptz + WHERE "{timestamp_field}" BETWEEN '{start_date}'::timestamptz AND '{end_date}'::timestamptz """ return PostgreSQLRetrievalJob( @@ -415,9 +415,9 @@ def build_point_in_time_query( 1. We first join the current feature_view to the entity dataframe that has been passed. This JOIN has the following logic: - - For each row of the entity dataframe, only keep the rows where the `event_timestamp_column` + - For each row of the entity dataframe, only keep the rows where the `timestamp_field` is less than the one provided in the entity dataframe - - If there a TTL for the current feature_view, also keep the rows where the `event_timestamp_column` + - If there a TTL for the current feature_view, also keep the rows where the `timestamp_field` is higher the the one provided minus the TTL - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been computed previously @@ -428,16 +428,16 @@ def build_point_in_time_query( "{{ featureview.name }}__subquery" AS ( SELECT - "{{ featureview.event_timestamp_column }}" as event_timestamp, + "{{ featureview.timestamp_field }}" as event_timestamp, {{ '"' ~ featureview.created_timestamp_column ~ '" as created_timestamp,' if featureview.created_timestamp_column else '' }} {{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %} {% for feature in featureview.features %} "{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %} {% endfor %} FROM {{ featureview.table_subquery }} AS sub - WHERE "{{ featureview.event_timestamp_column }}" <= (SELECT MAX(entity_timestamp) FROM entity_dataframe) + WHERE "{{ featureview.timestamp_field }}" <= (SELECT MAX(entity_timestamp) FROM entity_dataframe) {% if featureview.ttl == 0 %}{% else %} - AND "{{ featureview.event_timestamp_column }}" >= (SELECT MIN(entity_timestamp) FROM entity_dataframe) - {{ featureview.ttl }} * interval '1' second + AND "{{ featureview.timestamp_field }}" >= (SELECT MIN(entity_timestamp) FROM entity_dataframe) - {{ featureview.ttl }} * interval '1' second {% endif %} ), diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 1b977ba622..770bd8adc2 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -50,7 +50,7 @@ def pull_latest_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime, end_date: datetime, @@ -76,7 +76,7 @@ def pull_latest_from_table_or_query( partition_by_join_key_string = ( "PARTITION BY " + partition_by_join_key_string ) - timestamps = [event_timestamp_column] + timestamps = [timestamp_field] if created_timestamp_column: timestamps.append(created_timestamp_column) timestamp_desc_string = " DESC, ".join(timestamps) + " DESC" @@ -92,7 +92,7 @@ def pull_latest_from_table_or_query( SELECT {field_string}, ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS feast_row_ FROM {from_expression} t1 - WHERE {event_timestamp_column} BETWEEN TIMESTAMP('{start_date_str}') AND TIMESTAMP('{end_date_str}') + WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date_str}') AND TIMESTAMP('{end_date_str}') ) t2 WHERE feast_row_ = 1 """ @@ -190,12 +190,12 @@ def pull_all_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, start_date: datetime, end_date: datetime, ) -> RetrievalJob: """ - Note that join_key_columns, feature_name_columns, event_timestamp_column, and + Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function. """ @@ -210,9 +210,7 @@ def pull_all_from_table_or_query( store_config=config.offline_store ) - fields = ", ".join( - join_key_columns + feature_name_columns + [event_timestamp_column] - ) + fields = ", ".join(join_key_columns + feature_name_columns + [timestamp_field]) from_expression = data_source.get_table_query_string() start_date = start_date.astimezone(tz=utc) end_date = end_date.astimezone(tz=utc) @@ -220,7 +218,7 @@ def pull_all_from_table_or_query( query = f""" SELECT {fields} FROM {from_expression} - WHERE {event_timestamp_column} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' + WHERE {timestamp_field} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' """ return SparkRetrievalJob( @@ -422,9 +420,9 @@ def _format_datetime(t: datetime) -> str: 1. We first join the current feature_view to the entity dataframe that has been passed. This JOIN has the following logic: - - For each row of the entity dataframe, only keep the rows where the `event_timestamp_column` + - For each row of the entity dataframe, only keep the rows where the `timestamp_field` is less than the one provided in the entity dataframe - - If there a TTL for the current feature_view, also keep the rows where the `event_timestamp_column` + - If there a TTL for the current feature_view, also keep the rows where the `timestamp_field` is higher the the one provided minus the TTL - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been computed previously @@ -435,16 +433,16 @@ def _format_datetime(t: datetime) -> str: {{ featureview.name }}__subquery AS ( SELECT - {{ featureview.event_timestamp_column }} as event_timestamp, + {{ featureview.timestamp_field }} as event_timestamp, {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }} {{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %} {% for feature in featureview.features %} {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %} {% endfor %} FROM {{ featureview.table_subquery }} - WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}' + WHERE {{ featureview.timestamp_field }} <= '{{ featureview.max_event_timestamp }}' {% if featureview.ttl == 0 %}{% else %} - AND {{ featureview.event_timestamp_column }} >= '{{ featureview.min_event_timestamp }}' + AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}' {% endif %} ), diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py index 442bdf6656..87a99b820e 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py @@ -153,7 +153,7 @@ def pull_latest_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime, end_date: datetime, @@ -177,7 +177,7 @@ def pull_latest_from_table_or_query( partition_by_join_key_string = ( "PARTITION BY " + partition_by_join_key_string ) - timestamps = [event_timestamp_column] + timestamps = [timestamp_field] if created_timestamp_column: timestamps.append(created_timestamp_column) timestamp_desc_string = " DESC, ".join(timestamps) + " DESC" @@ -195,7 +195,7 @@ def pull_latest_from_table_or_query( SELECT {field_string}, ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row FROM {from_expression} - WHERE {event_timestamp_column} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' + WHERE {timestamp_field} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' ) WHERE _feast_row = 1 """ @@ -302,7 +302,7 @@ def pull_all_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, start_date: datetime, end_date: datetime, user: str = "user", @@ -319,12 +319,12 @@ def pull_all_from_table_or_query( config=config, user=user, auth=auth, http_scheme=http_scheme ) field_string = ", ".join( - join_key_columns + feature_name_columns + [event_timestamp_column] + join_key_columns + feature_name_columns + [timestamp_field] ) query = f""" SELECT {field_string} FROM {from_expression} - WHERE {event_timestamp_column} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' + WHERE {timestamp_field} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' """ return TrinoRetrievalJob( query=query, client=client, config=config, full_feature_names=False, @@ -458,9 +458,9 @@ def _get_entity_df_event_timestamp_range( to the provided entity table. 1. We first join the current feature_view to the entity dataframe that has been passed. This JOIN has the following logic: - - For each row of the entity dataframe, only keep the rows where the `event_timestamp_column` + - For each row of the entity dataframe, only keep the rows where the `timestamp_field` is less than the one provided in the entity dataframe - - If there a TTL for the current feature_view, also keep the rows where the `event_timestamp_column` + - If there a TTL for the current feature_view, also keep the rows where the `timestamp_field` is higher the the one provided minus the TTL - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been computed previously @@ -469,16 +469,16 @@ def _get_entity_df_event_timestamp_range( */ {{ featureview.name }}__subquery AS ( SELECT - {{ featureview.event_timestamp_column }} as event_timestamp, + {{ featureview.timestamp_field }} as event_timestamp, {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }} {{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %} {% for feature in featureview.features %} {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %} {% endfor %} FROM {{ featureview.table_subquery }} - WHERE {{ featureview.event_timestamp_column }} <= from_iso8601_timestamp('{{ featureview.max_event_timestamp }}') + WHERE {{ featureview.timestamp_field }} <= from_iso8601_timestamp('{{ featureview.max_event_timestamp }}') {% if featureview.ttl == 0 %}{% else %} - AND {{ featureview.event_timestamp_column }} >= from_iso8601_timestamp('{{ featureview.min_event_timestamp }}') + AND {{ featureview.timestamp_field }} >= from_iso8601_timestamp('{{ featureview.min_event_timestamp }}') {% endif %} ), {{ featureview.name }}__base AS ( diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_source.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_source.py index b8fddee89f..b559d0e59e 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_source.py @@ -205,7 +205,7 @@ def get_table_column_names_and_types( host=config.offline_store.host, port=config.offline_store.port, ) - if self.table is not None: + if self.table: table_schema = client.execute_query( f"SELECT * FROM {self.table} LIMIT 1" ).schema diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index a7d8b25abf..052d546748 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -186,7 +186,7 @@ def evaluate_historical_retrieval(): # Load feature view data from sources and join them incrementally for feature_view, features in feature_views_to_features.items(): - event_timestamp_column = feature_view.batch_source.timestamp_field + timestamp_field = feature_view.batch_source.timestamp_field created_timestamp_column = ( feature_view.batch_source.created_timestamp_column ) @@ -202,7 +202,7 @@ def evaluate_historical_retrieval(): join_keys.append(join_key) right_entity_key_columns = [ - event_timestamp_column, + timestamp_field, created_timestamp_column, ] + join_keys right_entity_key_columns = [c for c in right_entity_key_columns if c] @@ -211,39 +211,39 @@ def evaluate_historical_retrieval(): df_to_join = _read_datasource(feature_view.batch_source) - df_to_join, event_timestamp_column = _field_mapping( + df_to_join, timestamp_field = _field_mapping( df_to_join, feature_view, features, right_entity_key_columns, entity_df_event_timestamp_col, - event_timestamp_column, + timestamp_field, full_feature_names, ) df_to_join = _merge(entity_df_with_features, df_to_join, join_keys) df_to_join = _normalize_timestamp( - df_to_join, event_timestamp_column, created_timestamp_column + df_to_join, timestamp_field, created_timestamp_column ) df_to_join = _filter_ttl( df_to_join, feature_view, entity_df_event_timestamp_col, - event_timestamp_column, + timestamp_field, ) df_to_join = _drop_duplicates( df_to_join, all_join_keys, - event_timestamp_column, + timestamp_field, created_timestamp_column, entity_df_event_timestamp_col, ) entity_df_with_features = _drop_columns( - df_to_join, event_timestamp_column, created_timestamp_column + df_to_join, timestamp_field, created_timestamp_column ) # Ensure that we delete dataframes to free up memory @@ -273,7 +273,7 @@ def pull_latest_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime, end_date: datetime, @@ -285,7 +285,7 @@ def evaluate_offline_job(): source_df = _read_datasource(data_source) source_df = _normalize_timestamp( - source_df, event_timestamp_column, created_timestamp_column + source_df, timestamp_field, created_timestamp_column ) source_columns = set(source_df.columns) @@ -295,9 +295,9 @@ def evaluate_offline_job(): ) ts_columns = ( - [event_timestamp_column, created_timestamp_column] + [timestamp_field, created_timestamp_column] if created_timestamp_column - else [event_timestamp_column] + else [timestamp_field] ) # try-catch block is added to deal with this issue https://github.com/dask/dask/issues/8939. # TODO(kevjumba): remove try catch when fix is merged upstream in Dask. @@ -305,7 +305,7 @@ def evaluate_offline_job(): if created_timestamp_column: source_df = source_df.sort_values(by=created_timestamp_column,) - source_df = source_df.sort_values(by=event_timestamp_column) + source_df = source_df.sort_values(by=timestamp_field) except ZeroDivisionError: # Use 1 partition to get around case where everything in timestamp column is the same so the partition algorithm doesn't @@ -315,13 +315,11 @@ def evaluate_offline_job(): by=created_timestamp_column, npartitions=1 ) - source_df = source_df.sort_values( - by=event_timestamp_column, npartitions=1 - ) + source_df = source_df.sort_values(by=timestamp_field, npartitions=1) source_df = source_df[ - (source_df[event_timestamp_column] >= start_date) - & (source_df[event_timestamp_column] < end_date) + (source_df[timestamp_field] >= start_date) + & (source_df[timestamp_field] < end_date) ] source_df = source_df.persist() @@ -353,7 +351,7 @@ def pull_all_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, start_date: datetime, end_date: datetime, ) -> RetrievalJob: @@ -361,9 +359,9 @@ def pull_all_from_table_or_query( config=config, data_source=data_source, join_key_columns=join_key_columns - + [event_timestamp_column], # avoid deduplication + + [timestamp_field], # avoid deduplication feature_name_columns=feature_name_columns, - event_timestamp_column=event_timestamp_column, + timestamp_field=timestamp_field, created_timestamp_column=None, start_date=start_date, end_date=end_date, @@ -410,7 +408,7 @@ def _field_mapping( features: List[str], right_entity_key_columns: List[str], entity_df_event_timestamp_col: str, - event_timestamp_column: str, + timestamp_field: str, full_feature_names: bool, ) -> dd.DataFrame: # Rename columns by the field mapping dictionary if it exists @@ -449,13 +447,13 @@ def _field_mapping( df_to_join = df_to_join.persist() # Make sure to not have duplicated columns - if entity_df_event_timestamp_col == event_timestamp_column: + if entity_df_event_timestamp_col == timestamp_field: df_to_join = _run_dask_field_mapping( - df_to_join, {event_timestamp_column: f"__{event_timestamp_column}"}, + df_to_join, {timestamp_field: f"__{timestamp_field}"}, ) - event_timestamp_column = f"__{event_timestamp_column}" + timestamp_field = f"__{timestamp_field}" - return df_to_join.persist(), event_timestamp_column + return df_to_join.persist(), timestamp_field def _merge( @@ -489,24 +487,19 @@ def _merge( def _normalize_timestamp( - df_to_join: dd.DataFrame, - event_timestamp_column: str, - created_timestamp_column: str, + df_to_join: dd.DataFrame, timestamp_field: str, created_timestamp_column: str, ) -> dd.DataFrame: df_to_join_types = df_to_join.dtypes - event_timestamp_column_type = df_to_join_types[event_timestamp_column] + timestamp_field_type = df_to_join_types[timestamp_field] if created_timestamp_column: created_timestamp_column_type = df_to_join_types[created_timestamp_column] - if ( - not hasattr(event_timestamp_column_type, "tz") - or event_timestamp_column_type.tz != pytz.UTC - ): + if not hasattr(timestamp_field_type, "tz") or timestamp_field_type.tz != pytz.UTC: # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC - df_to_join[event_timestamp_column] = df_to_join[event_timestamp_column].apply( + df_to_join[timestamp_field] = df_to_join[timestamp_field].apply( lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), - meta=(event_timestamp_column, "datetime64[ns, UTC]"), + meta=(timestamp_field, "datetime64[ns, UTC]"), ) if created_timestamp_column and ( @@ -517,7 +510,7 @@ def _normalize_timestamp( created_timestamp_column ].apply( lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), - meta=(event_timestamp_column, "datetime64[ns, UTC]"), + meta=(timestamp_field, "datetime64[ns, UTC]"), ) return df_to_join.persist() @@ -527,19 +520,16 @@ def _filter_ttl( df_to_join: dd.DataFrame, feature_view: FeatureView, entity_df_event_timestamp_col: str, - event_timestamp_column: str, + timestamp_field: str, ) -> dd.DataFrame: # Filter rows by defined timestamp tolerance if feature_view.ttl and feature_view.ttl.total_seconds() != 0: df_to_join = df_to_join[ ( - df_to_join[event_timestamp_column] + df_to_join[timestamp_field] >= df_to_join[entity_df_event_timestamp_col] - feature_view.ttl ) - & ( - df_to_join[event_timestamp_column] - <= df_to_join[entity_df_event_timestamp_col] - ) + & (df_to_join[timestamp_field] <= df_to_join[entity_df_event_timestamp_col]) ] df_to_join = df_to_join.persist() @@ -550,7 +540,7 @@ def _filter_ttl( def _drop_duplicates( df_to_join: dd.DataFrame, all_join_keys: List[str], - event_timestamp_column: str, + timestamp_field: str, created_timestamp_column: str, entity_df_event_timestamp_col: str, ) -> dd.DataFrame: @@ -560,7 +550,7 @@ def _drop_duplicates( ) df_to_join = df_to_join.persist() - df_to_join = df_to_join.sort_values(by=event_timestamp_column, na_position="first") + df_to_join = df_to_join.sort_values(by=timestamp_field, na_position="first") df_to_join = df_to_join.persist() df_to_join = df_to_join.drop_duplicates( @@ -571,13 +561,9 @@ def _drop_duplicates( def _drop_columns( - df_to_join: dd.DataFrame, - event_timestamp_column: str, - created_timestamp_column: str, + df_to_join: dd.DataFrame, timestamp_field: str, created_timestamp_column: str, ) -> dd.DataFrame: - entity_df_with_features = df_to_join.drop( - [event_timestamp_column], axis=1 - ).persist() + entity_df_with_features = df_to_join.drop([timestamp_field], axis=1).persist() if created_timestamp_column: entity_df_with_features = entity_df_with_features.drop( diff --git a/sdk/python/feast/infra/offline_stores/file_source.py b/sdk/python/feast/infra/offline_stores/file_source.py index e177642a32..a6fc7a1600 100644 --- a/sdk/python/feast/infra/offline_stores/file_source.py +++ b/sdk/python/feast/infra/offline_stores/file_source.py @@ -116,16 +116,11 @@ def __eq__(self, other): raise TypeError("Comparisons should only involve FileSource class objects.") return ( - self.name == other.name + super().__eq__(other) + and self.path == other.path and self.file_options.file_format == other.file_options.file_format - and self.timestamp_field == other.timestamp_field - and self.created_timestamp_column == other.created_timestamp_column - and self.field_mapping == other.field_mapping and self.file_options.s3_endpoint_override == other.file_options.s3_endpoint_override - and self.description == other.description - and self.tags == other.tags - and self.owner == other.owner ) @property @@ -203,7 +198,7 @@ def get_table_query_string(self) -> str: class FileOptions: """ - DataSource File options used to source features from a file + Configuration options for a file data source. """ def __init__( @@ -213,66 +208,23 @@ def __init__( uri: Optional[str], ): """ - FileOptions initialization method + Initializes a FileOptions object. Args: - file_format (FileFormat, optional): file source format eg. parquet - s3_endpoint_override (str, optional): custom s3 endpoint (used only with s3 uri) - uri (str, optional): file source url eg. s3:// or local file - - """ - self._file_format = file_format - self._uri = uri - self._s3_endpoint_override = s3_endpoint_override - - @property - def file_format(self): - """ - Returns the file format of this file - """ - return self._file_format - - @file_format.setter - def file_format(self, file_format): - """ - Sets the file format of this file - """ - self._file_format = file_format - - @property - def uri(self): - """ - Returns the file url of this file + file_format (optional): File source format, e.g. parquet. + s3_endpoint_override (optional): Custom s3 endpoint (used only with s3 uri). + uri (optional): File source url, e.g. s3:// or local file. """ - return self._uri - - @uri.setter - def uri(self, uri): - """ - Sets the file url of this file - """ - self._uri = uri - - @property - def s3_endpoint_override(self): - """ - Returns the s3 endpoint override - """ - return None if self._s3_endpoint_override == "" else self._s3_endpoint_override - - @s3_endpoint_override.setter - def s3_endpoint_override(self, s3_endpoint_override): - """ - Sets the s3 endpoint override - """ - self._s3_endpoint_override = s3_endpoint_override + self.file_format = file_format + self.uri = uri or "" + self.s3_endpoint_override = s3_endpoint_override or "" @classmethod def from_proto(cls, file_options_proto: DataSourceProto.FileOptions): """ Creates a FileOptions from a protobuf representation of a file option - args: + Args: file_options_proto: a protobuf representation of a datasource Returns: diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index e5937712f6..83f20bb3e5 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -173,7 +173,7 @@ def pull_latest_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime, end_date: datetime, @@ -185,7 +185,7 @@ def pull_latest_from_table_or_query( FeatureStore.materialize() method. This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store. - Note that join_key_columns, feature_name_columns, event_timestamp_column, and created_timestamp_column + Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function. @@ -194,7 +194,7 @@ class is used to write this data into the online store. data_source: Data source to pull all of the columns from join_key_columns: Columns of the join keys feature_name_columns: Columns of the feature names needed - event_timestamp_column: Timestamp column + timestamp_field: Timestamp column start_date: Starting date of query end_date: Ending date of query """ @@ -220,14 +220,14 @@ def pull_all_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, start_date: datetime, end_date: datetime, ) -> RetrievalJob: """ Returns a Retrieval Job for all join key columns, feature name columns, and the event timestamp columns that occur between the start_date and end_date. - Note that join_key_columns, feature_name_columns, event_timestamp_column, and created_timestamp_column + Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function. @@ -236,7 +236,7 @@ def pull_all_from_table_or_query( data_source: Data source to pull all of the columns from join_key_columns: Columns of the join keys feature_name_columns: Columns of the feature names needed - event_timestamp_column: Timestamp column + timestamp_field: Timestamp column start_date: Starting date of query end_date: Ending date of query """ diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index c62d0223a0..b6c3d300d4 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -86,7 +86,7 @@ class FeatureViewQueryContext: entities: List[str] features: List[str] # feature reference format field_mapping: Dict[str, str] - event_timestamp_column: str + timestamp_field: str created_timestamp_column: Optional[str] table_subquery: str entity_selections: List[str] @@ -154,7 +154,7 @@ def get_feature_view_query_context( entities=join_keys, features=features, field_mapping=feature_view.batch_source.field_mapping, - event_timestamp_column=timestamp_field, + timestamp_field=timestamp_field, created_timestamp_column=created_timestamp_column, # TODO: Make created column optional and not hardcoded table_subquery=feature_view.batch_source.get_table_query_string(), diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index e67cf13f5c..cd309c92b2 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -71,7 +71,7 @@ def pull_latest_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime, end_date: datetime, @@ -86,7 +86,7 @@ def pull_latest_from_table_or_query( partition_by_join_key_string = ( "PARTITION BY " + partition_by_join_key_string ) - timestamp_columns = [event_timestamp_column] + timestamp_columns = [timestamp_field] if created_timestamp_column: timestamp_columns.append(created_timestamp_column) timestamp_desc_string = " DESC, ".join(timestamp_columns) + " DESC" @@ -110,7 +110,7 @@ def pull_latest_from_table_or_query( SELECT {field_string}, ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row FROM {from_expression} - WHERE {event_timestamp_column} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' + WHERE {timestamp_field} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' ) WHERE _feast_row = 1 """ @@ -130,7 +130,7 @@ def pull_all_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, start_date: datetime, end_date: datetime, ) -> RetrievalJob: @@ -138,7 +138,7 @@ def pull_all_from_table_or_query( from_expression = data_source.get_table_query_string() field_string = ", ".join( - join_key_columns + feature_name_columns + [event_timestamp_column] + join_key_columns + feature_name_columns + [timestamp_field] ) redshift_client = aws_utils.get_redshift_data_client( @@ -152,7 +152,7 @@ def pull_all_from_table_or_query( query = f""" SELECT {field_string} FROM {from_expression} - WHERE {event_timestamp_column} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' + WHERE {timestamp_field} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' """ return RedshiftRetrievalJob( @@ -546,9 +546,9 @@ def _get_entity_df_event_timestamp_range( 1. We first join the current feature_view to the entity dataframe that has been passed. This JOIN has the following logic: - - For each row of the entity dataframe, only keep the rows where the `event_timestamp_column` + - For each row of the entity dataframe, only keep the rows where the `timestamp_field` is less than the one provided in the entity dataframe - - If there a TTL for the current feature_view, also keep the rows where the `event_timestamp_column` + - If there a TTL for the current feature_view, also keep the rows where the `timestamp_field` is higher the the one provided minus the TTL - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been computed previously @@ -559,16 +559,16 @@ def _get_entity_df_event_timestamp_range( {{ featureview.name }}__subquery AS ( SELECT - {{ featureview.event_timestamp_column }} as event_timestamp, + {{ featureview.timestamp_field }} as event_timestamp, {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }} {{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %} {% for feature in featureview.features %} {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %} {% endfor %} FROM {{ featureview.table_subquery }} - WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}' + WHERE {{ featureview.timestamp_field }} <= '{{ featureview.max_event_timestamp }}' {% if featureview.ttl == 0 %}{% else %} - AND {{ featureview.event_timestamp_column }} >= '{{ featureview.min_event_timestamp }}' + AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}' {% endif %} ), diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index dcfcb50aa6..00af8c1abf 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -106,6 +106,7 @@ def from_proto(data_source: DataSourceProto): A RedshiftSource object based on the data_source protobuf. """ return RedshiftSource( + name=data_source.name, field_mapping=dict(data_source.field_mapping), table=data_source.redshift_options.table, schema=data_source.redshift_options.schema, @@ -129,17 +130,11 @@ def __eq__(self, other): ) return ( - self.name == other.name + super().__eq__(other) and self.redshift_options.table == other.redshift_options.table and self.redshift_options.schema == other.redshift_options.schema and self.redshift_options.query == other.redshift_options.query and self.redshift_options.database == other.redshift_options.database - and self.timestamp_field == other.timestamp_field - and self.created_timestamp_column == other.created_timestamp_column - and self.field_mapping == other.field_mapping - and self.description == other.description - and self.tags == other.tags - and self.owner == other.owner ) @property @@ -170,17 +165,17 @@ def to_proto(self) -> DataSourceProto: A DataSourceProto object. """ data_source_proto = DataSourceProto( + name=self.name, type=DataSourceProto.BATCH_REDSHIFT, field_mapping=self.field_mapping, redshift_options=self.redshift_options.to_proto(), description=self.description, tags=self.tags, owner=self.owner, + timestamp_field=self.timestamp_field, + created_timestamp_column=self.created_timestamp_column, ) - data_source_proto.timestamp_field = self.timestamp_field - data_source_proto.created_timestamp_column = self.created_timestamp_column - return data_source_proto def validate(self, config: RepoConfig): @@ -216,7 +211,7 @@ def get_table_column_names_and_types( assert isinstance(config.offline_store, RedshiftOfflineStoreConfig) client = aws_utils.get_redshift_data_client(config.offline_store.region) - if self.table is not None: + if self.table: try: table = client.describe_table( ClusterIdentifier=config.offline_store.cluster_id, @@ -256,7 +251,7 @@ def get_table_column_names_and_types( class RedshiftOptions: """ - DataSource Redshift options used to source features from Redshift query. + Configuration options for a Redshift data source. """ def __init__( @@ -266,50 +261,10 @@ def __init__( query: Optional[str], database: Optional[str], ): - self._table = table - self._schema = schema - self._query = query - self._database = database - - @property - def query(self): - """Returns the Redshift SQL query referenced by this source.""" - return self._query - - @query.setter - def query(self, query): - """Sets the Redshift SQL query referenced by this source.""" - self._query = query - - @property - def table(self): - """Returns the table name of this Redshift table.""" - return self._table - - @table.setter - def table(self, table_name): - """Sets the table ref of this Redshift table.""" - self._table = table_name - - @property - def schema(self): - """Returns the schema name of this Redshift table.""" - return self._schema - - @schema.setter - def schema(self, schema): - """Sets the schema of this Redshift table.""" - self._schema = schema - - @property - def database(self): - """Returns the schema name of this Redshift table.""" - return self._database - - @database.setter - def database(self, database): - """Sets the database name of this Redshift table.""" - self._database = database + self.table = table or "" + self.schema = schema or "" + self.query = query or "" + self.database = database or "" @classmethod def from_proto(cls, redshift_options_proto: DataSourceProto.RedshiftOptions): diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index c88e1b1844..a07f7a57c6 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -97,7 +97,7 @@ def pull_latest_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime, end_date: datetime, @@ -117,7 +117,7 @@ def pull_latest_from_table_or_query( else: partition_by_join_key_string = "" - timestamp_columns = [event_timestamp_column] + timestamp_columns = [timestamp_field] if created_timestamp_column: timestamp_columns.append(created_timestamp_column) @@ -141,7 +141,7 @@ def pull_latest_from_table_or_query( SELECT {field_string}, ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS "_feast_row" FROM {from_expression} - WHERE "{event_timestamp_column}" BETWEEN TO_TIMESTAMP_NTZ({start_date.timestamp()}) AND TO_TIMESTAMP_NTZ({end_date.timestamp()}) + WHERE "{timestamp_field}" BETWEEN TO_TIMESTAMP_NTZ({start_date.timestamp()}) AND TO_TIMESTAMP_NTZ({end_date.timestamp()}) ) WHERE "_feast_row" = 1 """ @@ -161,7 +161,7 @@ def pull_all_from_table_or_query( data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, start_date: datetime, end_date: datetime, ) -> RetrievalJob: @@ -170,9 +170,7 @@ def pull_all_from_table_or_query( field_string = ( '"' - + '", "'.join( - join_key_columns + feature_name_columns + [event_timestamp_column] - ) + + '", "'.join(join_key_columns + feature_name_columns + [timestamp_field]) + '"' ) @@ -187,7 +185,7 @@ def pull_all_from_table_or_query( query = f""" SELECT {field_string} FROM {from_expression} - WHERE "{event_timestamp_column}" BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' + WHERE "{timestamp_field}" BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' """ return SnowflakeRetrievalJob( @@ -512,9 +510,9 @@ def _get_entity_df_event_timestamp_range( 1. We first join the current feature_view to the entity dataframe that has been passed. This JOIN has the following logic: - - For each row of the entity dataframe, only keep the rows where the `event_timestamp_column` + - For each row of the entity dataframe, only keep the rows where the `timestamp_field` is less than the one provided in the entity dataframe - - If there a TTL for the current feature_view, also keep the rows where the `event_timestamp_column` + - If there a TTL for the current feature_view, also keep the rows where the `timestamp_field` is higher the the one provided minus the TTL - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been computed previously @@ -525,16 +523,16 @@ def _get_entity_df_event_timestamp_range( "{{ featureview.name }}__subquery" AS ( SELECT - "{{ featureview.event_timestamp_column }}" as "event_timestamp", + "{{ featureview.timestamp_field }}" as "event_timestamp", {{'"' ~ featureview.created_timestamp_column ~ '" as "created_timestamp",' if featureview.created_timestamp_column else '' }} {{featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %} {% for feature in featureview.features %} "{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %} {% endfor %} FROM {{ featureview.table_subquery }} - WHERE "{{ featureview.event_timestamp_column }}" <= '{{ featureview.max_event_timestamp }}' + WHERE "{{ featureview.timestamp_field }}" <= '{{ featureview.max_event_timestamp }}' {% if featureview.ttl == 0 %}{% else %} - AND "{{ featureview.event_timestamp_column }}" >= '{{ featureview.min_event_timestamp }}' + AND "{{ featureview.timestamp_field }}" >= '{{ featureview.min_event_timestamp }}' {% endif %} ), diff --git a/sdk/python/feast/infra/offline_stores/snowflake_source.py b/sdk/python/feast/infra/offline_stores/snowflake_source.py index 8f3f2f0bb5..904fc48043 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake_source.py +++ b/sdk/python/feast/infra/offline_stores/snowflake_source.py @@ -55,6 +55,7 @@ def __init__( """ if table is None and query is None: raise ValueError('No "table" argument provided.') + # The default Snowflake schema is named "PUBLIC". _schema = "PUBLIC" if (database and table and not schema) else schema @@ -112,6 +113,7 @@ def from_proto(data_source: DataSourceProto): A SnowflakeSource object based on the data_source protobuf. """ return SnowflakeSource( + name=data_source.name, field_mapping=dict(data_source.field_mapping), database=data_source.snowflake_options.database, schema=data_source.snowflake_options.schema, @@ -136,18 +138,12 @@ def __eq__(self, other): ) return ( - self.name == other.name - and self.snowflake_options.database == other.snowflake_options.database - and self.snowflake_options.schema == other.snowflake_options.schema - and self.snowflake_options.table == other.snowflake_options.table - and self.snowflake_options.query == other.snowflake_options.query - and self.snowflake_options.warehouse == other.snowflake_options.warehouse - and self.timestamp_field == other.timestamp_field - and self.created_timestamp_column == other.created_timestamp_column - and self.field_mapping == other.field_mapping - and self.description == other.description - and self.tags == other.tags - and self.owner == other.owner + super().__eq__(other) + and self.database == other.database + and self.schema == other.schema + and self.table == other.table + and self.query == other.query + and self.warehouse == other.warehouse ) @property @@ -183,6 +179,7 @@ def to_proto(self) -> DataSourceProto: A DataSourceProto object. """ data_source_proto = DataSourceProto( + name=self.name, type=DataSourceProto.BATCH_SNOWFLAKE, field_mapping=self.field_mapping, snowflake_options=self.snowflake_options.to_proto(), @@ -252,7 +249,7 @@ def get_table_column_names_and_types( class SnowflakeOptions: """ - DataSource snowflake options used to source features from snowflake query. + Configuration options for a Snowflake data source. """ def __init__( @@ -263,61 +260,11 @@ def __init__( query: Optional[str], warehouse: Optional[str], ): - self._database = database - self._schema = schema - self._table = table - self._query = query - self._warehouse = warehouse - - @property - def query(self): - """Returns the snowflake SQL query referenced by this source.""" - return self._query - - @query.setter - def query(self, query): - """Sets the snowflake SQL query referenced by this source.""" - self._query = query - - @property - def database(self): - """Returns the database name of this snowflake table.""" - return self._database - - @database.setter - def database(self, database): - """Sets the database ref of this snowflake table.""" - self._database = database - - @property - def schema(self): - """Returns the schema name of this snowflake table.""" - return self._schema - - @schema.setter - def schema(self, schema): - """Sets the schema of this snowflake table.""" - self._schema = schema - - @property - def table(self): - """Returns the table name of this snowflake table.""" - return self._table - - @table.setter - def table(self, table): - """Sets the table ref of this snowflake table.""" - self._table = table - - @property - def warehouse(self): - """Returns the warehouse name of this snowflake table.""" - return self._warehouse - - @warehouse.setter - def warehouse(self, warehouse): - """Sets the warehouse name of this snowflake table.""" - self._warehouse = warehouse + self.database = database or "" + self.schema = schema or "" + self.table = table or "" + self.query = query or "" + self.warehouse = warehouse or "" @classmethod def from_proto(cls, snowflake_options_proto: DataSourceProto.SnowflakeOptions): diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 3468b9dc92..09ca98d86d 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -136,7 +136,7 @@ def materialize_single_feature_view( ( join_key_columns, feature_name_columns, - event_timestamp_column, + timestamp_field, created_timestamp_column, ) = _get_column_names(feature_view, entities) @@ -145,7 +145,7 @@ def materialize_single_feature_view( data_source=feature_view.batch_source, join_key_columns=join_key_columns, feature_name_columns=feature_name_columns, - event_timestamp_column=event_timestamp_column, + timestamp_field=timestamp_field, created_timestamp_column=created_timestamp_column, start_date=start_date, end_date=end_date, @@ -210,7 +210,7 @@ def retrieve_saved_dataset( data_source=dataset.storage.to_data_source(), join_key_columns=dataset.join_keys, feature_name_columns=feature_name_columns, - event_timestamp_column=event_ts_column, + timestamp_field=event_ts_column, start_date=make_tzaware(dataset.min_event_timestamp), # type: ignore end_date=make_tzaware(dataset.max_event_timestamp + timedelta(seconds=1)), # type: ignore ) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index b379193ba3..a71bd6d2d0 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -258,7 +258,7 @@ def _get_column_names( the query to the offline store. """ # if we have mapped fields, use the original field names in the call to the offline store - event_timestamp_column = feature_view.batch_source.timestamp_field + timestamp_field = feature_view.batch_source.timestamp_field feature_names = [feature.name for feature in feature_view.features] created_timestamp_column = feature_view.batch_source.created_timestamp_column join_keys = [ @@ -268,10 +268,10 @@ def _get_column_names( reverse_field_mapping = { v: k for k, v in feature_view.batch_source.field_mapping.items() } - event_timestamp_column = ( - reverse_field_mapping[event_timestamp_column] - if event_timestamp_column in reverse_field_mapping.keys() - else event_timestamp_column + timestamp_field = ( + reverse_field_mapping[timestamp_field] + if timestamp_field in reverse_field_mapping.keys() + else timestamp_field ) created_timestamp_column = ( reverse_field_mapping[created_timestamp_column] @@ -294,13 +294,13 @@ def _get_column_names( name for name in feature_names if name not in join_keys - and name != event_timestamp_column + and name != timestamp_field and name != created_timestamp_column ] return ( join_keys, feature_names, - event_timestamp_column, + timestamp_field, created_timestamp_column, ) diff --git a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py index e2a700d067..ba36f8e89b 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py @@ -34,7 +34,7 @@ def create_data_source( event_timestamp_column: (Deprecated) Pass through for the underlying data source. created_timestamp_column: Pass through for the underlying data source. field_mapping: Pass through for the underlying data source. - timestamp_field: (Deprecated) Pass through for the underlying data source. + timestamp_field: Pass through for the underlying data source. Returns: diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/trino.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/trino.py index 07ae210b12..ddcfafd31a 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/trino.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/trino.py @@ -92,7 +92,7 @@ def create_data_source( return TrinoSource( name="ci_trino_offline_store", table=destination_name, - event_timestamp_column=timestamp_field, + timestamp_field=timestamp_field, created_timestamp_column=created_timestamp_column, query=f"SELECT * FROM {destination_name}", field_mapping=field_mapping or {"ts_1": "ts"}, diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 774c3f9a42..f4440dbfbc 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -114,9 +114,7 @@ def test_write_to_online_store_event_check(local_redis_environment): "ts_1": [hour_ago, now, now], } dataframe_source = pd.DataFrame(data) - with prep_file_source( - df=dataframe_source, event_timestamp_column="ts_1" - ) as file_source: + with prep_file_source(df=dataframe_source, timestamp_field="ts_1") as file_source: e = Entity(name="id", value_type=ValueType.STRING) # Create Feature View diff --git a/sdk/python/tests/integration/registration/test_feature_store.py b/sdk/python/tests/integration/registration/test_feature_store.py index ca61734c78..db4c6700ce 100644 --- a/sdk/python/tests/integration/registration/test_feature_store.py +++ b/sdk/python/tests/integration/registration/test_feature_store.py @@ -216,9 +216,7 @@ def test_apply_feature_view_success(test_feature_store): ) @pytest.mark.parametrize("dataframe_source", [lazy_fixture("simple_dataset_1")]) def test_feature_view_inference_success(test_feature_store, dataframe_source): - with prep_file_source( - df=dataframe_source, event_timestamp_column="ts_1" - ) as file_source: + with prep_file_source(df=dataframe_source, timestamp_field="ts_1") as file_source: entity = Entity( name="id", join_keys=["id_join_key"], value_type=ValueType.INT64 ) @@ -434,9 +432,7 @@ def test_apply_remote_repo(): ) @pytest.mark.parametrize("dataframe_source", [lazy_fixture("simple_dataset_1")]) def test_reapply_feature_view_success(test_feature_store, dataframe_source): - with prep_file_source( - df=dataframe_source, event_timestamp_column="ts_1" - ) as file_source: + with prep_file_source(df=dataframe_source, timestamp_field="ts_1") as file_source: e = Entity(name="id", join_keys=["id_join_key"], value_type=ValueType.STRING) diff --git a/sdk/python/tests/integration/registration/test_inference.py b/sdk/python/tests/integration/registration/test_inference.py index c6819ac3c0..8b719eb733 100644 --- a/sdk/python/tests/integration/registration/test_inference.py +++ b/sdk/python/tests/integration/registration/test_inference.py @@ -42,9 +42,9 @@ def test_update_entities_with_inferred_types_from_feature_views( simple_dataset_1, simple_dataset_2 ): with prep_file_source( - df=simple_dataset_1, event_timestamp_column="ts_1" + df=simple_dataset_1, timestamp_field="ts_1" ) as file_source, prep_file_source( - df=simple_dataset_2, event_timestamp_column="ts_1" + df=simple_dataset_2, timestamp_field="ts_1" ) as file_source_2: fv1 = FeatureView( diff --git a/sdk/python/tests/unit/diff/test_registry_diff.py b/sdk/python/tests/unit/diff/test_registry_diff.py index 0322ab47ab..483dae73e2 100644 --- a/sdk/python/tests/unit/diff/test_registry_diff.py +++ b/sdk/python/tests/unit/diff/test_registry_diff.py @@ -7,9 +7,7 @@ def test_tag_objects_for_keep_delete_update_add(simple_dataset_1): - with prep_file_source( - df=simple_dataset_1, event_timestamp_column="ts_1" - ) as file_source: + with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: to_delete = FeatureView( name="to_delete", entities=["id"], batch_source=file_source, ttl=None, ) @@ -53,9 +51,7 @@ def test_tag_objects_for_keep_delete_update_add(simple_dataset_1): def test_diff_registry_objects_feature_views(simple_dataset_1): - with prep_file_source( - df=simple_dataset_1, event_timestamp_column="ts_1" - ) as file_source: + with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: pre_changed = FeatureView( name="fv2", entities=["id"], diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index 6bd4baf4fa..7f288d36db 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -3,6 +3,7 @@ from feast import ValueType from feast.data_format import ProtoFormat from feast.data_source import ( + DataSource, KafkaSource, KinesisSource, PushSource, @@ -11,6 +12,9 @@ ) from feast.field import Field from feast.infra.offline_stores.bigquery_source import BigQuerySource +from feast.infra.offline_stores.file_source import FileSource +from feast.infra.offline_stores.redshift_source import RedshiftSource +from feast.infra.offline_stores.snowflake_source import SnowflakeSource from feast.types import Bool, Float32, Int64 @@ -140,8 +144,112 @@ def test_default_data_source_kw_arg_warning(): # No name warning for DataSource with pytest.warns(UserWarning): source = KafkaSource( - event_timestamp_column="column", + timestamp_field="column", bootstrap_servers="bootstrap_servers", message_format=ProtoFormat("class_path"), topic="topic", ) + + +def test_proto_conversion(): + bigquery_source = BigQuerySource( + name="test_source", + table="test_table", + timestamp_field="event_timestamp", + created_timestamp_column="created_timestamp", + field_mapping={"foo": "bar"}, + description="test description", + tags={"test": "test"}, + owner="test@gmail.com", + ) + + file_source = FileSource( + name="test_source", + path="test_path", + timestamp_field="event_timestamp", + created_timestamp_column="created_timestamp", + field_mapping={"foo": "bar"}, + description="test description", + tags={"test": "test"}, + owner="test@gmail.com", + ) + + redshift_source = RedshiftSource( + name="test_source", + database="test_database", + schema="test_schema", + table="test_table", + timestamp_field="event_timestamp", + created_timestamp_column="created_timestamp", + field_mapping={"foo": "bar"}, + description="test description", + tags={"test": "test"}, + owner="test@gmail.com", + ) + + snowflake_source = SnowflakeSource( + name="test_source", + database="test_database", + warehouse="test_warehouse", + schema="test_schema", + table="test_table", + timestamp_field="event_timestamp", + created_timestamp_column="created_timestamp", + field_mapping={"foo": "bar"}, + description="test description", + tags={"test": "test"}, + owner="test@gmail.com", + ) + + kafka_source = KafkaSource( + name="test_source", + bootstrap_servers="test_servers", + message_format=ProtoFormat("class_path"), + topic="test_topic", + timestamp_field="event_timestamp", + created_timestamp_column="created_timestamp", + field_mapping={"foo": "bar"}, + description="test description", + tags={"test": "test"}, + owner="test@gmail.com", + batch_source=file_source, + ) + + kinesis_source = KinesisSource( + name="test_source", + region="test_region", + record_format=ProtoFormat("class_path"), + stream_name="test_stream", + timestamp_field="event_timestamp", + created_timestamp_column="created_timestamp", + field_mapping={"foo": "bar"}, + description="test description", + tags={"test": "test"}, + owner="test@gmail.com", + batch_source=file_source, + ) + + push_source = PushSource( + name="test_source", + batch_source=file_source, + description="test description", + tags={"test": "test"}, + owner="test@gmail.com", + ) + + request_source = RequestSource( + name="test_source", + schema=[Field(name="test1", dtype=Float32), Field(name="test1", dtype=Int64)], + description="test description", + tags={"test": "test"}, + owner="test@gmail.com", + ) + + assert DataSource.from_proto(bigquery_source.to_proto()) == bigquery_source + assert DataSource.from_proto(file_source.to_proto()) == file_source + assert DataSource.from_proto(redshift_source.to_proto()) == redshift_source + assert DataSource.from_proto(snowflake_source.to_proto()) == snowflake_source + assert DataSource.from_proto(kafka_source.to_proto()) == kafka_source + assert DataSource.from_proto(kinesis_source.to_proto()) == kinesis_source + assert DataSource.from_proto(push_source.to_proto()) == push_source + assert DataSource.from_proto(request_source.to_proto()) == request_source diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index d78788f3ae..8708a983c4 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -25,7 +25,7 @@ def test_create_batch_feature_view(): stream_source = KafkaSource( name="kafka", - event_timestamp_column="", + timestamp_field="", bootstrap_servers="", message_format=AvroFormat(""), topic="topic", @@ -43,7 +43,7 @@ def test_create_batch_feature_view(): def test_create_stream_feature_view(): stream_source = KafkaSource( name="kafka", - event_timestamp_column="", + timestamp_field="", bootstrap_servers="", message_format=AvroFormat(""), topic="topic", diff --git a/sdk/python/tests/utils/data_source_utils.py b/sdk/python/tests/utils/data_source_utils.py index 5bb5a622d6..d5f45964ca 100644 --- a/sdk/python/tests/utils/data_source_utils.py +++ b/sdk/python/tests/utils/data_source_utils.py @@ -11,19 +11,17 @@ @contextlib.contextmanager -def prep_file_source(df, event_timestamp_column=None) -> Iterator[FileSource]: +def prep_file_source(df, timestamp_field=None) -> Iterator[FileSource]: with tempfile.NamedTemporaryFile(suffix=".parquet") as f: f.close() df.to_parquet(f.name) file_source = FileSource( - file_format=ParquetFormat(), - path=f.name, - timestamp_field=event_timestamp_column, + file_format=ParquetFormat(), path=f.name, timestamp_field=timestamp_field, ) yield file_source -def simple_bq_source_using_table_arg(df, event_timestamp_column=None) -> BigQuerySource: +def simple_bq_source_using_table_arg(df, timestamp_field=None) -> BigQuerySource: client = bigquery.Client() gcp_project = client.project bigquery_dataset = f"ds_{time.time_ns()}" @@ -40,13 +38,13 @@ def simple_bq_source_using_table_arg(df, event_timestamp_column=None) -> BigQuer job = client.load_table_from_dataframe(df, table) job.result() - return BigQuerySource(table=table, timestamp_field=event_timestamp_column,) + return BigQuerySource(table=table, timestamp_field=timestamp_field,) -def simple_bq_source_using_query_arg(df, event_timestamp_column=None) -> BigQuerySource: - bq_source_using_table = simple_bq_source_using_table_arg(df, event_timestamp_column) +def simple_bq_source_using_query_arg(df, timestamp_field=None) -> BigQuerySource: + bq_source_using_table = simple_bq_source_using_table_arg(df, timestamp_field) return BigQuerySource( name=bq_source_using_table.table, query=f"SELECT * FROM {bq_source_using_table.table}", - timestamp_field=event_timestamp_column, + timestamp_field=timestamp_field, )