Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix created timestamp related errors for BigQuery source #1474

Merged
merged 2 commits into from
Apr 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def pull_latest_from_table_or_query(
"PARTITION BY " + partition_by_join_key_string
)
timestamps = [event_timestamp_column]
if created_timestamp_column is not None:
if created_timestamp_column:
timestamps.append(created_timestamp_column)
timestamp_desc_string = " DESC, ".join(timestamps) + " DESC"
field_string = ", ".join(join_key_columns + feature_name_columns + timestamps)
Expand Down Expand Up @@ -129,7 +129,7 @@ class FeatureViewQueryContext:
features: List[str] # feature reference format
table_ref: str
event_timestamp_column: str
created_timestamp_column: str
created_timestamp_column: Optional[str]
query: str
table_subquery: str
entity_selections: List[str]
Expand Down Expand Up @@ -270,7 +270,7 @@ def build_point_in_time_query(
-- the feature_timestamp, i.e. the latest occurrence of the requested feature relative to the entity_dataset timestamp
NULL as {{ featureview.name }}_feature_timestamp,
-- created timestamp of the feature at the corresponding feature_timestamp
NULL as created_timestamp,
{{ 'NULL as created_timestamp,' if featureview.created_timestamp_column else '' }}
-- select only entities belonging to this feature set
{{ featureview.entities | join(', ')}},
-- boolean for filtering the dataset later
Expand All @@ -281,7 +281,7 @@ def build_point_in_time_query(
NULL as row_number,
{{ featureview.event_timestamp_column }} as event_timestamp,
{{ featureview.event_timestamp_column }} as {{ featureview.name }}_feature_timestamp,
{{ featureview.created_timestamp_column }} as created_timestamp,
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
{{ featureview.entity_selections | join(', ')}},
false AS is_entity_table
FROM {{ featureview.table_subquery }} WHERE {{ featureview.event_timestamp_column }} <= '{{ max_timestamp }}'
Expand Down Expand Up @@ -309,11 +309,11 @@ def build_point_in_time_query(
row_number,
event_timestamp,
{{ featureview.entities | join(', ')}},
FIRST_VALUE(created_timestamp IGNORE NULLS) over w AS created_timestamp,
{{ 'FIRST_VALUE(created_timestamp IGNORE NULLS) over w AS created_timestamp,' if featureview.created_timestamp_column else '' }}
FIRST_VALUE({{ featureview.name }}_feature_timestamp IGNORE NULLS) over w AS {{ featureview.name }}_feature_timestamp,
is_entity_table
FROM {{ featureview.name }}__union_features
WINDOW w AS (PARTITION BY {{ featureview.entities | join(', ') }} ORDER BY event_timestamp DESC, is_entity_table DESC, created_timestamp DESC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
WINDOW w AS (PARTITION BY {{ featureview.entities | join(', ') }} ORDER BY event_timestamp DESC, is_entity_table DESC{{', created_timestamp DESC' if featureview.created_timestamp_column else ''}} ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
)
/*
3. Select only the rows from the entity table, and join the features from the original feature set table
Expand All @@ -322,14 +322,14 @@ def build_point_in_time_query(
LEFT JOIN (
SELECT
{{ featureview.event_timestamp_column }} as {{ featureview.name }}_feature_timestamp,
{{ featureview.created_timestamp_column }} as created_timestamp,
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
{{ featureview.entity_selections | join(', ')}},
{% for feature in featureview.features %}
{{ feature }} as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }} WHERE {{ featureview.event_timestamp_column }} <= '{{ max_timestamp }}'
{% if featureview.ttl == 0 %}{% else %}AND {{ featureview.event_timestamp_column }} >= Timestamp_sub(TIMESTAMP '{{ min_timestamp }}', interval {{ featureview.ttl }} second){% endif %}
) USING ({{ featureview.name }}_feature_timestamp, created_timestamp, {{ featureview.entities | join(', ')}})
) USING ({{ featureview.name }}_feature_timestamp,{{ ' created_timestamp,' if featureview.created_timestamp_column else '' }} {{ featureview.entities | join(', ')}})
WHERE is_entity_table
),
/*
Expand Down
14 changes: 11 additions & 3 deletions sdk/python/tests/test_historical_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ def stage_driver_hourly_stats_parquet_source(directory, df):
# Write to disk
driver_stats_path = os.path.join(directory, "driver_stats.parquet")
df.to_parquet(path=driver_stats_path, allow_truncated_timestamps=True)
return FileSource(path=driver_stats_path, event_timestamp_column="datetime")
return FileSource(
path=driver_stats_path,
event_timestamp_column="datetime",
created_timestamp_column="",
)


def stage_driver_hourly_stats_bigquery_source(df, table_id):
Expand Down Expand Up @@ -72,7 +76,11 @@ def create_driver_hourly_stats_feature_view(source):
def stage_customer_daily_profile_parquet_source(directory, df):
customer_profile_path = os.path.join(directory, "customer_profile.parquet")
df.to_parquet(path=customer_profile_path, allow_truncated_timestamps=True)
return FileSource(path=customer_profile_path, event_timestamp_column="datetime")
return FileSource(
path=customer_profile_path,
event_timestamp_column="datetime",
created_timestamp_column="created",
)


def stage_customer_daily_profile_bigquery_source(df, table_id):
Expand Down Expand Up @@ -325,7 +333,7 @@ def test_historical_features_from_bigquery_sources(provider_type):
customer_source = BigQuerySource(
table_ref=customer_table_id,
event_timestamp_column="datetime",
created_timestamp_column="created",
created_timestamp_column="",
)
customer_fv = create_customer_daily_profile_feature_view(customer_source)

Expand Down