diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index 215c810b08..e966315853 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -8,6 +8,7 @@ import pyarrow as pa from pydantic import StrictStr from pydantic.typing import Literal +from pytz import utc from feast import OnDemandFeatureView, RedshiftSource from feast.data_source import DataSource @@ -82,6 +83,9 @@ def pull_latest_from_table_or_query( ) s3_resource = aws_utils.get_s3_resource(config.offline_store.region) + start_date = start_date.astimezone(tz=utc) + end_date = end_date.astimezone(tz=utc) + query = f""" SELECT {field_string} diff --git a/sdk/python/tests/integration/e2e/test_universal_e2e.py b/sdk/python/tests/integration/e2e/test_universal_e2e.py index 7e72e49d9c..fbbdd14f23 100644 --- a/sdk/python/tests/integration/e2e/test_universal_e2e.py +++ b/sdk/python/tests/integration/e2e/test_universal_e2e.py @@ -16,14 +16,21 @@ @pytest.mark.parametrize("infer_features", [True, False]) def test_e2e_consistency(environment, e2e_data_sources, infer_features): fs = environment.feature_store - fs.config.project = fs.config.project + str(infer_features) df, data_source = e2e_data_sources - fv = driver_feature_view(data_source=data_source, infer_features=infer_features) + fv = driver_feature_view( + name=f"test_consistency_{'with_inference' if infer_features else ''}", + data_source=data_source, + infer_features=infer_features, + ) entity = driver() fs.apply([fv, entity]) - run_offline_online_store_consistency_test(fs, fv) + # materialization is run in two steps and + # we use timestamp from generated dataframe as a split point + split_dt = df["ts_1"][4].to_pydatetime() - timedelta(seconds=1) + + run_offline_online_store_consistency_test(fs, fv, split_dt) def check_offline_and_online_features( @@ -80,7 +87,7 @@ def check_offline_and_online_features( def run_offline_online_store_consistency_test( - fs: FeatureStore, fv: FeatureView + fs: FeatureStore, fv: FeatureView, split_dt: datetime ) -> None: now = datetime.utcnow() @@ -90,7 +97,7 @@ def run_offline_online_store_consistency_test( # Run materialize() # use both tz-naive & tz-aware timestamps to test that they're both correctly handled start_date = (now - timedelta(hours=5)).replace(tzinfo=utc) - end_date = now - timedelta(hours=2) + end_date = split_dt fs.materialize(feature_views=[fv.name], start_date=start_date, end_date=end_date) # check result of materialize()