Skip to content

Commit

Permalink
Avoid skewed join between entity_df and feature views (#1712)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Delacour <[email protected]>
  • Loading branch information
MattDelac authored Jul 19, 2021
1 parent a7f88c5 commit 8cfe914
Showing 1 changed file with 33 additions and 22 deletions.
55 changes: 33 additions & 22 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,19 +557,30 @@ def _get_bigquery_client(project: Optional[str] = None):
all the logic as the field to GROUP BY the data
*/
WITH entity_dataframe AS (
SELECT
*,
CONCAT(
{% for entity_key in unique_entity_keys %}
CAST({{entity_key}} AS STRING),
{% endfor %}
CAST({{entity_df_event_timestamp_col}} AS STRING)
) AS entity_row_unique_id
SELECT *,
{{entity_df_event_timestamp_col}} AS entity_timestamp,
{% for featureview in featureviews %}
CONCAT(
{% for entity in featureview.entities %}
CAST({{entity}} AS STRING),
{% endfor %}
CAST({{entity_df_event_timestamp_col}} AS STRING)
) AS {{featureview.name}}__entity_row_unique_id,
{% endfor %}
FROM {{ left_table_query_string }}
),
{% for featureview in featureviews %}
{{ featureview.name }}__entity_dataframe AS (
SELECT
{{ featureview.entities | join(', ')}},
entity_timestamp,
{{featureview.name}}__entity_row_unique_id
FROM entity_dataframe
GROUP BY {{ featureview.entities | join(', ')}}, entity_timestamp, {{featureview.name}}__entity_row_unique_id
),
/*
This query template performs the point-in-time correctness join for a single feature set table
to the provided entity table.
Expand Down Expand Up @@ -605,15 +616,15 @@ def _get_bigquery_client(project: Optional[str] = None):
{{ featureview.name }}__base AS (
SELECT
subquery.*,
entity_dataframe.{{entity_df_event_timestamp_col}} AS entity_timestamp,
entity_dataframe.entity_row_unique_id
entity_dataframe.entity_timestamp,
entity_dataframe.{{featureview.name}}__entity_row_unique_id
FROM {{ featureview.name }}__subquery AS subquery
INNER JOIN entity_dataframe
INNER JOIN {{ featureview.name }}__entity_dataframe AS entity_dataframe
ON TRUE
AND subquery.event_timestamp <= entity_dataframe.{{entity_df_event_timestamp_col}}
AND subquery.event_timestamp <= entity_dataframe.entity_timestamp
{% if featureview.ttl == 0 %}{% else %}
AND subquery.event_timestamp >= Timestamp_sub(entity_dataframe.{{entity_df_event_timestamp_col}}, interval {{ featureview.ttl }} second)
AND subquery.event_timestamp >= Timestamp_sub(entity_dataframe.entity_timestamp, interval {{ featureview.ttl }} second)
{% endif %}
{% for entity in featureview.entities %}
Expand All @@ -630,11 +641,11 @@ def _get_bigquery_client(project: Optional[str] = None):
{% if featureview.created_timestamp_column %}
{{ featureview.name }}__dedup AS (
SELECT
entity_row_unique_id,
{{featureview.name}}__entity_row_unique_id,
event_timestamp,
MAX(created_timestamp) as created_timestamp,
FROM {{ featureview.name }}__base
GROUP BY entity_row_unique_id, event_timestamp
GROUP BY {{featureview.name}}__entity_row_unique_id, event_timestamp
),
{% endif %}
Expand All @@ -644,7 +655,7 @@ def _get_bigquery_client(project: Optional[str] = None):
*/
{{ featureview.name }}__latest AS (
SELECT
entity_row_unique_id,
{{featureview.name}}__entity_row_unique_id,
MAX(event_timestamp) AS event_timestamp
{% if featureview.created_timestamp_column %}
,ANY_VALUE(created_timestamp) AS created_timestamp
Expand All @@ -653,10 +664,10 @@ def _get_bigquery_client(project: Optional[str] = None):
FROM {{ featureview.name }}__base
{% if featureview.created_timestamp_column %}
INNER JOIN {{ featureview.name }}__dedup
USING (entity_row_unique_id, event_timestamp, created_timestamp)
USING ({{featureview.name}}__entity_row_unique_id, event_timestamp, created_timestamp)
{% endif %}
GROUP BY entity_row_unique_id
GROUP BY {{featureview.name}}__entity_row_unique_id
),
/*
Expand All @@ -668,7 +679,7 @@ def _get_bigquery_client(project: Optional[str] = None):
FROM {{ featureview.name }}__base as base
INNER JOIN {{ featureview.name }}__latest
USING(
entity_row_unique_id,
{{featureview.name}}__entity_row_unique_id,
event_timestamp
{% if featureview.created_timestamp_column %}
,created_timestamp
Expand All @@ -683,17 +694,17 @@ def _get_bigquery_client(project: Optional[str] = None):
The entity_dataframe dataset being our source of truth here.
*/
SELECT * EXCEPT (entity_row_unique_id)
SELECT * EXCEPT(entity_timestamp, {% for featureview in featureviews %} {{featureview.name}}__entity_row_unique_id{% if loop.last %}{% else %},{% endif %}{% endfor %})
FROM entity_dataframe
{% for featureview in featureviews %}
LEFT JOIN (
SELECT
entity_row_unique_id,
{{featureview.name}}__entity_row_unique_id,
{% for feature in featureview.features %}
{% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %},
{% endfor %}
FROM {{ featureview.name }}__cleaned
) USING (entity_row_unique_id)
) USING ({{featureview.name}}__entity_row_unique_id)
{% endfor %}
"""

Expand Down

0 comments on commit 8cfe914

Please sign in to comment.