From 45e58d15ecfaacd937c19da3a33be0ebe22dbbc0 Mon Sep 17 00:00:00 2001 From: Matt Delacour Date: Tue, 28 Sep 2021 12:57:16 +0200 Subject: [PATCH 1/8] Add final_output_feature_names in Query context to avoid SELECT * EXCEPT at the end Signed-off-by: Matt Delacour --- .../feast/infra/offline_stores/bigquery.py | 10 +++++-- .../infra/offline_stores/offline_utils.py | 13 ++++++++- .../feast/infra/offline_stores/redshift.py | 29 +++++++++++-------- 3 files changed, 36 insertions(+), 16 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 6753223f2e..9e4d3efc2e 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -164,6 +164,7 @@ def query_generator() -> Iterator[str]: query_context, left_table_query_string=table_reference, entity_df_event_timestamp_col=entity_df_event_timestamp_col, + entity_df_columns=entity_schema.keys(), query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, full_feature_names=full_feature_names, ) @@ -517,14 +518,17 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str] Thus we only need to compute the latest timestamp of each feature. */ {{ featureview.name }}__latest AS ( - SELECT * EXCEPT(row_number) + SELECT + event_timestamp, + {% if featureview.created_timestamp_column %}created_timestamp,{% endif %} + {{featureview.name}}__entity_row_unique_id FROM ( SELECT *, ROW_NUMBER() OVER( PARTITION BY {{featureview.name}}__entity_row_unique_id ORDER BY event_timestamp DESC{% if featureview.created_timestamp_column %},created_timestamp DESC{% endif %} - ) AS row_number, + ) AS row_number FROM {{ featureview.name }}__base {% if featureview.created_timestamp_column %} INNER JOIN {{ featureview.name }}__dedup @@ -558,7 +562,7 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str] The entity_dataframe dataset being our source of truth here. */ -SELECT * EXCEPT(entity_timestamp, {% for featureview in featureviews %} {{featureview.name}}__entity_row_unique_id{% if loop.last %}{% else %},{% endif %}{% endfor %}) +SELECT {{ final_output_feature_names | join(', ')}} FROM entity_dataframe {% for featureview in featureviews %} LEFT JOIN ( diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index 1e23ca54fb..10c0de2f9b 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -2,7 +2,7 @@ import uuid from dataclasses import asdict, dataclass from datetime import timedelta -from typing import Any, Dict, List, Optional, Set, Tuple +from typing import Any, Dict, KeysView, List, Optional, Set, Tuple import numpy as np import pandas as pd @@ -153,12 +153,22 @@ def build_point_in_time_query( feature_view_query_contexts: List[FeatureViewQueryContext], left_table_query_string: str, entity_df_event_timestamp_col: str, + entity_df_columns: KeysView[str], query_template: str, full_feature_names: bool = False, ) -> str: """Build point-in-time query between each feature view table and the entity dataframe for Bigquery and Redshift""" template = Environment(loader=BaseLoader()).from_string(source=query_template) + final_output_feature_names = list(entity_df_columns) + final_output_feature_names.extend( + [ + (f"{fv.name}__{feature}" if full_feature_names else feature) + for fv in feature_view_query_contexts + for feature in fv.features + ] + ) + # Add additional fields to dict template_context = { "left_table_query_string": left_table_query_string, @@ -168,6 +178,7 @@ def build_point_in_time_query( ), "featureviews": [asdict(context) for context in feature_view_query_contexts], "full_feature_names": full_feature_names, + "final_output_feature_names": final_output_feature_names, } query = template.render(template_context) diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index c3cd461ce1..fa5061ad22 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -149,6 +149,7 @@ def query_generator() -> Iterator[str]: query_context, left_table_query_string=table_name, entity_df_event_timestamp_col=entity_df_event_timestamp_col, + entity_df_columns=entity_schema.keys(), query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, full_feature_names=full_feature_names, ) @@ -479,19 +480,23 @@ def _upload_entity_df_and_get_entity_schema( */ {{ featureview.name }}__latest AS ( SELECT - {{featureview.name}}__entity_row_unique_id, - MAX(event_timestamp) AS event_timestamp + event_timestamp, + {% if featureview.created_timestamp_column %}created_timestamp,{% endif %} + {{featureview.name}}__entity_row_unique_id + FROM + ( + SELECT *, + ROW_NUMBER() OVER( + PARTITION BY {{featureview.name}}__entity_row_unique_id + ORDER BY event_timestamp DESC{% if featureview.created_timestamp_column %},created_timestamp DESC{% endif %} + ) AS row_number + FROM {{ featureview.name }}__base {% if featureview.created_timestamp_column %} - ,MAX(created_timestamp) AS created_timestamp + INNER JOIN {{ featureview.name }}__dedup + USING ({{featureview.name}}__entity_row_unique_id, event_timestamp, created_timestamp) {% endif %} - - FROM {{ featureview.name }}__base - {% if featureview.created_timestamp_column %} - INNER JOIN {{ featureview.name }}__dedup - USING ({{featureview.name}}__entity_row_unique_id, event_timestamp, created_timestamp) - {% endif %} - - GROUP BY {{featureview.name}}__entity_row_unique_id + ) + WHERE row_number = 1 ), /* @@ -518,7 +523,7 @@ def _upload_entity_df_and_get_entity_schema( The entity_dataframe dataset being our source of truth here. */ -SELECT * +SELECT {{ final_output_feature_names | join(', ')}} FROM entity_dataframe {% for featureview in featureviews %} LEFT JOIN ( From 4b69ef8600e77273310c63c4f961ecb52379b760 Mon Sep 17 00:00:00 2001 From: Matt Delacour Date: Thu, 30 Sep 2021 18:48:42 +0200 Subject: [PATCH 2/8] Remove the drop_columns concept for AWS Redshift Signed-off-by: Matt Delacour --- .../feast/infra/offline_stores/redshift.py | 15 ---------- sdk/python/feast/infra/utils/aws_utils.py | 29 ++++++++++--------- 2 files changed, 15 insertions(+), 29 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index fa5061ad22..8c14a7d237 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -175,11 +175,6 @@ def query_generator() -> Iterator[str]: on_demand_feature_views=OnDemandFeatureView.get_requested_odfvs( feature_refs, project, registry ), - drop_columns=["entity_timestamp"] - + [ - f"{feature_view.projection.name_to_use()}__entity_row_unique_id" - for feature_view in feature_views - ], ) @@ -192,7 +187,6 @@ def __init__( config: RepoConfig, full_feature_names: bool, on_demand_feature_views: Optional[List[OnDemandFeatureView]], - drop_columns: Optional[List[str]] = None, ): """Initialize RedshiftRetrievalJob object. @@ -203,8 +197,6 @@ def __init__( config: Feast repo config full_feature_names: Whether to add the feature view prefixes to the feature names on_demand_feature_views: A list of on demand transforms to apply at retrieval time - drop_columns: Optionally a list of columns to drop before unloading to S3. - This is a convenient field, since "SELECT ... EXCEPT col" isn't supported in Redshift. """ if not isinstance(query, str): self._query_generator = query @@ -226,7 +218,6 @@ def query_generator() -> Iterator[str]: ) self._full_feature_names = full_feature_names self._on_demand_feature_views = on_demand_feature_views - self._drop_columns = drop_columns @property def full_feature_names(self) -> bool: @@ -247,7 +238,6 @@ def _to_df_internal(self) -> pd.DataFrame: self._s3_path, self._config.offline_store.iam_role, query, - self._drop_columns, ) def _to_arrow_internal(self) -> pa.Table: @@ -261,7 +251,6 @@ def _to_arrow_internal(self) -> pa.Table: self._s3_path, self._config.offline_store.iam_role, query, - self._drop_columns, ) def to_s3(self) -> str: @@ -280,7 +269,6 @@ def to_s3(self) -> str: self._s3_path, self._config.offline_store.iam_role, query, - self._drop_columns, ) return self._s3_path @@ -303,9 +291,6 @@ def to_redshift(self, table_name: str) -> None: with self._query_generator() as query: query = f'CREATE TABLE "{table_name}" AS ({query});\n' - if self._drop_columns is not None: - for column in self._drop_columns: - query += f"ALTER TABLE {table_name} DROP COLUMN {column};\n" aws_utils.execute_redshift_statement( self._redshift_client, diff --git a/sdk/python/feast/infra/utils/aws_utils.py b/sdk/python/feast/infra/utils/aws_utils.py index 68e5be2739..87bf175450 100644 --- a/sdk/python/feast/infra/utils/aws_utils.py +++ b/sdk/python/feast/infra/utils/aws_utils.py @@ -2,7 +2,7 @@ import os import tempfile import uuid -from typing import Dict, Iterator, List, Optional, Tuple +from typing import Dict, Iterator, Optional, Tuple import pandas as pd import pyarrow as pa @@ -80,7 +80,10 @@ def execute_redshift_statement_async( """ try: return redshift_data_client.execute_statement( - ClusterIdentifier=cluster_id, Database=database, DbUser=user, Sql=query, + ClusterIdentifier=cluster_id, + Database=database, + DbUser=user, + Sql=query, ) except ClientError as e: if e.response["Error"]["Code"] == "ValidationException": @@ -148,7 +151,11 @@ def get_redshift_statement_result(redshift_data_client, statement_id: str) -> di return redshift_data_client.get_statement_result(Id=statement_id) -def upload_df_to_s3(s3_resource, s3_path: str, df: pd.DataFrame,) -> None: +def upload_df_to_s3( + s3_resource, + s3_path: str, + df: pd.DataFrame, +) -> None: """Uploads a Pandas DataFrame to S3 as a parquet file Args: @@ -291,7 +298,11 @@ def temporarily_upload_df_to_redshift( # Clean up the uploaded Redshift table execute_redshift_statement( - redshift_data_client, cluster_id, database, user, f"DROP TABLE {table_name}", + redshift_data_client, + cluster_id, + database, + user, + f"DROP TABLE {table_name}", ) @@ -324,7 +335,6 @@ def execute_redshift_query_and_unload_to_s3( s3_path: str, iam_role: str, query: str, - drop_columns: Optional[List[str]] = None, ) -> None: """Unload Redshift Query results to S3 @@ -337,16 +347,11 @@ def execute_redshift_query_and_unload_to_s3( iam_role: IAM Role for Redshift to assume during the UNLOAD command. The role must grant permission to write to the S3 location. query: The SQL query to execute - drop_columns: Optionally a list of columns to drop before unloading to S3. - This is a convenient field, since "SELECT ... EXCEPT col" isn't supported in Redshift. """ # Run the query, unload the results to S3 unique_table_name = "_" + str(uuid.uuid4()).replace("-", "") query = f"CREATE TEMPORARY TABLE {unique_table_name} AS ({query});\n" - if drop_columns is not None: - for column in drop_columns: - query += f"ALTER TABLE {unique_table_name} DROP COLUMN {column};\n" query += f"UNLOAD ('SELECT * FROM {unique_table_name}') TO '{s3_path}/' IAM_ROLE '{iam_role}' PARQUET" execute_redshift_statement(redshift_data_client, cluster_id, database, user, query) @@ -360,7 +365,6 @@ def unload_redshift_query_to_pa( s3_path: str, iam_role: str, query: str, - drop_columns: Optional[List[str]] = None, ) -> pa.Table: """ Unload Redshift Query results to S3 and get the results in PyArrow Table format """ bucket, key = get_bucket_and_key(s3_path) @@ -373,7 +377,6 @@ def unload_redshift_query_to_pa( s3_path, iam_role, query, - drop_columns, ) with tempfile.TemporaryDirectory() as temp_dir: @@ -391,7 +394,6 @@ def unload_redshift_query_to_df( s3_path: str, iam_role: str, query: str, - drop_columns: Optional[List[str]] = None, ) -> pd.DataFrame: """ Unload Redshift Query results to S3 and get the results in Pandas DataFrame format """ table = unload_redshift_query_to_pa( @@ -403,7 +405,6 @@ def unload_redshift_query_to_df( s3_path, iam_role, query, - drop_columns, ) return table.to_pandas() From 023fcb91ed4bb5cf4b91e0833f3ddf6d17ffa320 Mon Sep 17 00:00:00 2001 From: Matt Delacour Date: Mon, 4 Oct 2021 12:15:38 +0200 Subject: [PATCH 3/8] Format files Signed-off-by: Matt Delacour --- sdk/python/feast/infra/utils/aws_utils.py | 25 ++++------------------- 1 file changed, 4 insertions(+), 21 deletions(-) diff --git a/sdk/python/feast/infra/utils/aws_utils.py b/sdk/python/feast/infra/utils/aws_utils.py index 87bf175450..28d48a489e 100644 --- a/sdk/python/feast/infra/utils/aws_utils.py +++ b/sdk/python/feast/infra/utils/aws_utils.py @@ -80,10 +80,7 @@ def execute_redshift_statement_async( """ try: return redshift_data_client.execute_statement( - ClusterIdentifier=cluster_id, - Database=database, - DbUser=user, - Sql=query, + ClusterIdentifier=cluster_id, Database=database, DbUser=user, Sql=query, ) except ClientError as e: if e.response["Error"]["Code"] == "ValidationException": @@ -151,11 +148,7 @@ def get_redshift_statement_result(redshift_data_client, statement_id: str) -> di return redshift_data_client.get_statement_result(Id=statement_id) -def upload_df_to_s3( - s3_resource, - s3_path: str, - df: pd.DataFrame, -) -> None: +def upload_df_to_s3(s3_resource, s3_path: str, df: pd.DataFrame,) -> None: """Uploads a Pandas DataFrame to S3 as a parquet file Args: @@ -298,11 +291,7 @@ def temporarily_upload_df_to_redshift( # Clean up the uploaded Redshift table execute_redshift_statement( - redshift_data_client, - cluster_id, - database, - user, - f"DROP TABLE {table_name}", + redshift_data_client, cluster_id, database, user, f"DROP TABLE {table_name}", ) @@ -370,13 +359,7 @@ def unload_redshift_query_to_pa( bucket, key = get_bucket_and_key(s3_path) execute_redshift_query_and_unload_to_s3( - redshift_data_client, - cluster_id, - database, - user, - s3_path, - iam_role, - query, + redshift_data_client, cluster_id, database, user, s3_path, iam_role, query, ) with tempfile.TemporaryDirectory() as temp_dir: From e82106f37265c4acebbc7d0b85169c4d10518974 Mon Sep 17 00:00:00 2001 From: Matt Delacour Date: Tue, 12 Oct 2021 17:44:06 -0400 Subject: [PATCH 4/8] Add again integration tests about backfill rows Signed-off-by: Matt Delacour --- .../test_universal_historical_retrieval.py | 114 +++++++++++++++++- 1 file changed, 113 insertions(+), 1 deletion(-) diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index 2f88f2194c..59153dec45 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -1,4 +1,6 @@ -from datetime import datetime +import random +import time +from datetime import datetime, timedelta from typing import Any, Dict, List, Optional import numpy as np @@ -8,15 +10,18 @@ from pytz import utc from feast import utils +from feast.entity import Entity from feast.errors import ( FeatureNameCollisionError, RequestDataNotFoundInEntityDfException, ) +from feast.feature import Feature from feast.feature_service import FeatureService from feast.feature_view import FeatureView from feast.infra.offline_stores.offline_utils import ( DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, ) +from feast.value_type import ValueType from tests.integration.feature_repos.repo_configuration import ( construct_universal_feature_views, table_name_from_data_source, @@ -480,6 +485,113 @@ def test_historical_features(environment, universal_data_sources, full_feature_n ) +@pytest.mark.integration +def test_historical_features_from_bigquery_sources_containing_backfills(environment): + store = environment.feature_store + + now = datetime.now().replace(microsecond=0, second=0, minute=0) + tomorrow = now + timedelta(days=1) + + entity_df = pd.DataFrame( + data=[ + {"driver_id": 1001, "event_timestamp": now + timedelta(days=2)}, + {"driver_id": 1002, "event_timestamp": now + timedelta(days=2)}, + ] + ) + + driver_stats_df = pd.DataFrame( + data=[ + # Duplicated rows simple case + { + "driver_id": 1001, + "avg_daily_trips": 10, + "event_timestamp": now, + "created": now, + }, + { + "driver_id": 1001, + "avg_daily_trips": 20, + "event_timestamp": now, + "created": tomorrow, + }, + # Duplicated rows after a backfill + { + "driver_id": 1002, + "avg_daily_trips": 30, + "event_timestamp": now, + "created": tomorrow, + }, + { + "driver_id": 1002, + "avg_daily_trips": 40, + "event_timestamp": tomorrow, + "created": now, + }, + ] + ) + + expected_df = pd.DataFrame( + data=[ + { + "driver_id": 1001, + "event_timestamp": now + timedelta(days=2), + "avg_daily_trips": 20, + }, + { + "driver_id": 1002, + "event_timestamp": now + timedelta(days=2), + "avg_daily_trips": 40, + }, + ] + ) + + driver_stats_data_source = environment.data_source_creator.create_data_source( + df=driver_stats_df, + destination_name=f"test_driver_stats_{int(time.time_ns())}_{random.randint(1000, 9999)}", + event_timestamp_column="event_timestamp", + created_timestamp_column="created", + ) + + driver = Entity(name="driver", join_key="driver_id", value_type=ValueType.INT64) + driver_fv = FeatureView( + name="driver_stats", + entities=["driver"], + features=[Feature(name="avg_daily_trips", dtype=ValueType.INT32)], + batch_source=driver_stats_data_source, + ttl=None, + ) + + store.apply([driver, driver_fv]) + + try: + offline_job = store.get_historical_features( + entity_df=entity_df, + features=["driver_stats:avg_daily_trips"], + full_feature_names=False, + ) + + start_time = datetime.utcnow() + actual_df = offline_job.to_df() + + print(f"actual_df shape: {actual_df.shape}") + end_time = datetime.utcnow() + print( + str(f"Time to execute job_from_df.to_df() = '{(end_time - start_time)}'\n") + ) + + assert sorted(expected_df.columns) == sorted(actual_df.columns) + assert_frame_equal( + expected_df.sort_values(by=["driver_id"]).reset_index(drop=True), + actual_df[expected_df.columns] + .sort_values(by=["driver_id"]) + .reset_index(drop=True), + check_dtype=False, + ) + + finally: + store.teardown() + + def response_feature_name(feature: str, full_feature_names: bool) -> str: if feature in {"conv_rate", "avg_daily_trips"} and full_feature_names: return f"driver_stats__{feature}" From 92819dd647bdb284abc361c1c608707bae90b6d3 Mon Sep 17 00:00:00 2001 From: Matt Delacour Date: Thu, 14 Oct 2021 11:44:53 -0400 Subject: [PATCH 5/8] Add teardown to datasource creator Signed-off-by: Matt Delacour --- .../offline_store/test_universal_historical_retrieval.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index 59153dec45..cde95f0e07 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -590,6 +590,7 @@ def test_historical_features_from_bigquery_sources_containing_backfills(environm finally: store.teardown() + environment.data_source_creator.teardown() def response_feature_name(feature: str, full_feature_names: bool) -> str: From 5fea569f669c7abe5f44d6c711f9acb70a70b5a1 Mon Sep 17 00:00:00 2001 From: Matt Delacour Date: Thu, 14 Oct 2021 14:55:02 -0400 Subject: [PATCH 6/8] Remove teardown logic in tests as it s part of conftest Signed-off-by: Matt Delacour --- .../test_universal_historical_retrieval.py | 45 +++++++++---------- 1 file changed, 20 insertions(+), 25 deletions(-) diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index cde95f0e07..4b17095131 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -563,34 +563,29 @@ def test_historical_features_from_bigquery_sources_containing_backfills(environm store.apply([driver, driver_fv]) - try: - offline_job = store.get_historical_features( - entity_df=entity_df, - features=["driver_stats:avg_daily_trips"], - full_feature_names=False, - ) - - start_time = datetime.utcnow() - actual_df = offline_job.to_df() + offline_job = store.get_historical_features( + entity_df=entity_df, + features=["driver_stats:avg_daily_trips"], + full_feature_names=False, + ) - print(f"actual_df shape: {actual_df.shape}") - end_time = datetime.utcnow() - print( - str(f"Time to execute job_from_df.to_df() = '{(end_time - start_time)}'\n") - ) + start_time = datetime.utcnow() + actual_df = offline_job.to_df() - assert sorted(expected_df.columns) == sorted(actual_df.columns) - assert_frame_equal( - expected_df.sort_values(by=["driver_id"]).reset_index(drop=True), - actual_df[expected_df.columns] - .sort_values(by=["driver_id"]) - .reset_index(drop=True), - check_dtype=False, - ) + print(f"actual_df shape: {actual_df.shape}") + end_time = datetime.utcnow() + print( + str(f"Time to execute job_from_df.to_df() = '{(end_time - start_time)}'\n") + ) - finally: - store.teardown() - environment.data_source_creator.teardown() + assert sorted(expected_df.columns) == sorted(actual_df.columns) + assert_frame_equal( + expected_df.sort_values(by=["driver_id"]).reset_index(drop=True), + actual_df[expected_df.columns] + .sort_values(by=["driver_id"]) + .reset_index(drop=True), + check_dtype=False, + ) def response_feature_name(feature: str, full_feature_names: bool) -> str: From 5247764c75f8c2b92dc3bd9c29e07d9aa90fdc70 Mon Sep 17 00:00:00 2001 From: Matt Delacour Date: Thu, 14 Oct 2021 18:16:55 -0400 Subject: [PATCH 7/8] Fix linter Signed-off-by: Matt Delacour --- .../offline_store/test_universal_historical_retrieval.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index 4b17095131..e3e1b293e6 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -574,9 +574,7 @@ def test_historical_features_from_bigquery_sources_containing_backfills(environm print(f"actual_df shape: {actual_df.shape}") end_time = datetime.utcnow() - print( - str(f"Time to execute job_from_df.to_df() = '{(end_time - start_time)}'\n") - ) + print(str(f"Time to execute job_from_df.to_df() = '{(end_time - start_time)}'\n")) assert sorted(expected_df.columns) == sorted(actual_df.columns) assert_frame_equal( From 6baf214b6235c72e561ca8646bc791335dee80cd Mon Sep 17 00:00:00 2001 From: Matt Delacour Date: Fri, 15 Oct 2021 13:04:08 -0400 Subject: [PATCH 8/8] Add pytest.mark.universal to new test Signed-off-by: Matt Delacour --- .../offline_store/test_universal_historical_retrieval.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index e3e1b293e6..e6194cb012 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -486,6 +486,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n @pytest.mark.integration +@pytest.mark.universal def test_historical_features_from_bigquery_sources_containing_backfills(environment): store = environment.feature_store