Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add final_output_feature_names in Query context to avoid SELECT * EXCEPT #1911

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ def query_generator() -> Iterator[str]:
query_context,
left_table_query_string=table_reference,
entity_df_event_timestamp_col=entity_df_event_timestamp_col,
entity_df_columns=entity_schema.keys(),
query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN,
full_feature_names=full_feature_names,
)
Expand Down Expand Up @@ -517,14 +518,17 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str]
Thus we only need to compute the latest timestamp of each feature.
*/
{{ featureview.name }}__latest AS (
SELECT * EXCEPT(row_number)
SELECT
event_timestamp,
{% if featureview.created_timestamp_column %}created_timestamp,{% endif %}
{{featureview.name}}__entity_row_unique_id
FROM
(
SELECT *,
ROW_NUMBER() OVER(
PARTITION BY {{featureview.name}}__entity_row_unique_id
ORDER BY event_timestamp DESC{% if featureview.created_timestamp_column %},created_timestamp DESC{% endif %}
) AS row_number,
) AS row_number
FROM {{ featureview.name }}__base
{% if featureview.created_timestamp_column %}
INNER JOIN {{ featureview.name }}__dedup
Expand Down Expand Up @@ -558,7 +562,7 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str]
The entity_dataframe dataset being our source of truth here.
*/

SELECT * EXCEPT(entity_timestamp, {% for featureview in featureviews %} {{featureview.name}}__entity_row_unique_id{% if loop.last %}{% else %},{% endif %}{% endfor %})
SELECT {{ final_output_feature_names | join(', ')}}
FROM entity_dataframe
{% for featureview in featureviews %}
LEFT JOIN (
Expand Down
13 changes: 12 additions & 1 deletion sdk/python/feast/infra/offline_stores/offline_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import uuid
from dataclasses import asdict, dataclass
from datetime import timedelta
from typing import Any, Dict, List, Optional, Set, Tuple
from typing import Any, Dict, KeysView, List, Optional, Set, Tuple

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -153,12 +153,22 @@ def build_point_in_time_query(
feature_view_query_contexts: List[FeatureViewQueryContext],
left_table_query_string: str,
entity_df_event_timestamp_col: str,
entity_df_columns: KeysView[str],
query_template: str,
full_feature_names: bool = False,
) -> str:
"""Build point-in-time query between each feature view table and the entity dataframe for Bigquery and Redshift"""
template = Environment(loader=BaseLoader()).from_string(source=query_template)

final_output_feature_names = list(entity_df_columns)
final_output_feature_names.extend(
[
(f"{fv.name}__{feature}" if full_feature_names else feature)
for fv in feature_view_query_contexts
for feature in fv.features
]
)

# Add additional fields to dict
template_context = {
"left_table_query_string": left_table_query_string,
Expand All @@ -168,6 +178,7 @@ def build_point_in_time_query(
),
"featureviews": [asdict(context) for context in feature_view_query_contexts],
"full_feature_names": full_feature_names,
"final_output_feature_names": final_output_feature_names,
}

query = template.render(template_context)
Expand Down
44 changes: 17 additions & 27 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def query_generator() -> Iterator[str]:
query_context,
left_table_query_string=table_name,
entity_df_event_timestamp_col=entity_df_event_timestamp_col,
entity_df_columns=entity_schema.keys(),
query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN,
full_feature_names=full_feature_names,
)
Expand All @@ -174,11 +175,6 @@ def query_generator() -> Iterator[str]:
on_demand_feature_views=OnDemandFeatureView.get_requested_odfvs(
feature_refs, project, registry
),
drop_columns=["entity_timestamp"]
+ [
f"{feature_view.projection.name_to_use()}__entity_row_unique_id"
for feature_view in feature_views
],
)


Expand All @@ -191,7 +187,6 @@ def __init__(
config: RepoConfig,
full_feature_names: bool,
on_demand_feature_views: Optional[List[OnDemandFeatureView]],
drop_columns: Optional[List[str]] = None,
):
"""Initialize RedshiftRetrievalJob object.

Expand All @@ -202,8 +197,6 @@ def __init__(
config: Feast repo config
full_feature_names: Whether to add the feature view prefixes to the feature names
on_demand_feature_views: A list of on demand transforms to apply at retrieval time
drop_columns: Optionally a list of columns to drop before unloading to S3.
This is a convenient field, since "SELECT ... EXCEPT col" isn't supported in Redshift.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will be honest with you but this concept of dropping columns is bad. If some SQL query engines do not support SELECT * EXCEPT, then let's not support it !

It's very weird to add this implementation outside of the SQL template. It makes things very different between BigQuery and other offline query engines

Thoughts about keeping it ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can be missing something, so feel free to change my mind 🙂

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this change. Pretty sure it's just BQ that supports EXCEPT in this way

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it also is clearer to understand the query output

"""
if not isinstance(query, str):
self._query_generator = query
Expand All @@ -225,7 +218,6 @@ def query_generator() -> Iterator[str]:
)
self._full_feature_names = full_feature_names
self._on_demand_feature_views = on_demand_feature_views
self._drop_columns = drop_columns

@property
def full_feature_names(self) -> bool:
Expand All @@ -246,7 +238,6 @@ def _to_df_internal(self) -> pd.DataFrame:
self._s3_path,
self._config.offline_store.iam_role,
query,
self._drop_columns,
)

def _to_arrow_internal(self) -> pa.Table:
Expand All @@ -260,7 +251,6 @@ def _to_arrow_internal(self) -> pa.Table:
self._s3_path,
self._config.offline_store.iam_role,
query,
self._drop_columns,
)

def to_s3(self) -> str:
Expand All @@ -279,7 +269,6 @@ def to_s3(self) -> str:
self._s3_path,
self._config.offline_store.iam_role,
query,
self._drop_columns,
)
return self._s3_path

Expand All @@ -302,9 +291,6 @@ def to_redshift(self, table_name: str) -> None:

with self._query_generator() as query:
query = f'CREATE TABLE "{table_name}" AS ({query});\n'
if self._drop_columns is not None:
for column in self._drop_columns:
query += f"ALTER TABLE {table_name} DROP COLUMN {column};\n"

aws_utils.execute_redshift_statement(
self._redshift_client,
Expand Down Expand Up @@ -479,19 +465,23 @@ def _upload_entity_df_and_get_entity_schema(
*/
{{ featureview.name }}__latest AS (
SELECT
{{featureview.name}}__entity_row_unique_id,
adchia marked this conversation as resolved.
Show resolved Hide resolved
MAX(event_timestamp) AS event_timestamp
event_timestamp,
{% if featureview.created_timestamp_column %}created_timestamp,{% endif %}
{{featureview.name}}__entity_row_unique_id
FROM
(
SELECT *,
ROW_NUMBER() OVER(
PARTITION BY {{featureview.name}}__entity_row_unique_id
ORDER BY event_timestamp DESC{% if featureview.created_timestamp_column %},created_timestamp DESC{% endif %}
) AS row_number
FROM {{ featureview.name }}__base
{% if featureview.created_timestamp_column %}
,MAX(created_timestamp) AS created_timestamp
INNER JOIN {{ featureview.name }}__dedup
USING ({{featureview.name}}__entity_row_unique_id, event_timestamp, created_timestamp)
{% endif %}

FROM {{ featureview.name }}__base
{% if featureview.created_timestamp_column %}
INNER JOIN {{ featureview.name }}__dedup
USING ({{featureview.name}}__entity_row_unique_id, event_timestamp, created_timestamp)
{% endif %}

GROUP BY {{featureview.name}}__entity_row_unique_id
)
WHERE row_number = 1
),

/*
Expand All @@ -518,7 +508,7 @@ def _upload_entity_df_and_get_entity_schema(
The entity_dataframe dataset being our source of truth here.
*/

SELECT *
SELECT {{ final_output_feature_names | join(', ')}}
FROM entity_dataframe
{% for featureview in featureviews %}
LEFT JOIN (
Expand Down
20 changes: 2 additions & 18 deletions sdk/python/feast/infra/utils/aws_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import tempfile
import uuid
from typing import Dict, Iterator, List, Optional, Tuple
from typing import Dict, Iterator, Optional, Tuple

import pandas as pd
import pyarrow as pa
Expand Down Expand Up @@ -324,7 +324,6 @@ def execute_redshift_query_and_unload_to_s3(
s3_path: str,
iam_role: str,
query: str,
drop_columns: Optional[List[str]] = None,
) -> None:
"""Unload Redshift Query results to S3

Expand All @@ -337,16 +336,11 @@ def execute_redshift_query_and_unload_to_s3(
iam_role: IAM Role for Redshift to assume during the UNLOAD command.
The role must grant permission to write to the S3 location.
query: The SQL query to execute
drop_columns: Optionally a list of columns to drop before unloading to S3.
This is a convenient field, since "SELECT ... EXCEPT col" isn't supported in Redshift.

"""
# Run the query, unload the results to S3
unique_table_name = "_" + str(uuid.uuid4()).replace("-", "")
query = f"CREATE TEMPORARY TABLE {unique_table_name} AS ({query});\n"
if drop_columns is not None:
for column in drop_columns:
query += f"ALTER TABLE {unique_table_name} DROP COLUMN {column};\n"
query += f"UNLOAD ('SELECT * FROM {unique_table_name}') TO '{s3_path}/' IAM_ROLE '{iam_role}' PARQUET"
execute_redshift_statement(redshift_data_client, cluster_id, database, user, query)

Expand All @@ -360,20 +354,12 @@ def unload_redshift_query_to_pa(
s3_path: str,
iam_role: str,
query: str,
drop_columns: Optional[List[str]] = None,
) -> pa.Table:
""" Unload Redshift Query results to S3 and get the results in PyArrow Table format """
bucket, key = get_bucket_and_key(s3_path)

execute_redshift_query_and_unload_to_s3(
redshift_data_client,
cluster_id,
database,
user,
s3_path,
iam_role,
query,
drop_columns,
redshift_data_client, cluster_id, database, user, s3_path, iam_role, query,
)

with tempfile.TemporaryDirectory() as temp_dir:
Expand All @@ -391,7 +377,6 @@ def unload_redshift_query_to_df(
s3_path: str,
iam_role: str,
query: str,
drop_columns: Optional[List[str]] = None,
) -> pd.DataFrame:
""" Unload Redshift Query results to S3 and get the results in Pandas DataFrame format """
table = unload_redshift_query_to_pa(
Expand All @@ -403,7 +388,6 @@ def unload_redshift_query_to_df(
s3_path,
iam_role,
query,
drop_columns,
)
return table.to_pandas()

Expand Down
Loading