From 41b0a35cc199d38a5d848696b2862e76439df1ab Mon Sep 17 00:00:00 2001 From: John Lemmon <137814163+JohnLemmonMedely@users.noreply.github.com> Date: Thu, 25 Jan 2024 14:27:04 -0600 Subject: [PATCH] feat: Add support for arrays in snowflake (#3769) Adds support for arrays in snowflake Signed-off-by: john.lemmon Signed-off-by: Attila Toth --- .../feast/infra/offline_stores/snowflake.py | 31 ++++ .../infra/offline_stores/snowflake_source.py | 6 +- .../snowflake_python_udfs_creation.sql | 56 ++++++ .../snowflake/snowpark/snowflake_udfs.py | 175 ++++++++++++++++++ sdk/python/feast/type_map.py | 8 + sdk/python/tests/data/data_creator.py | 1 + .../feature_repos/repo_configuration.py | 4 +- .../universal/data_sources/snowflake.py | 4 +- .../materialization/test_snowflake.py | 84 +++++++-- 9 files changed, 350 insertions(+), 19 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 38568ce79b..4f11b1ac42 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -1,4 +1,5 @@ import contextlib +import json import os import uuid import warnings @@ -51,6 +52,17 @@ ) from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.saved_dataset import SavedDatasetStorage +from feast.types import ( + Array, + Bool, + Bytes, + Float32, + Float64, + Int32, + Int64, + String, + UnixTimestamp, +) from feast.usage import log_exceptions_and_usage try: @@ -320,6 +332,7 @@ def query_generator() -> Iterator[str]: on_demand_feature_views=OnDemandFeatureView.get_requested_odfvs( feature_refs, project, registry ), + feature_views=feature_views, metadata=RetrievalMetadata( features=feature_refs, keys=list(entity_schema.keys() - {entity_df_event_timestamp_col}), @@ -398,9 +411,12 @@ def __init__( config: RepoConfig, full_feature_names: bool, on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None, + feature_views: Optional[List[FeatureView]] = None, metadata: Optional[RetrievalMetadata] = None, ): + if feature_views is None: + feature_views = [] if not isinstance(query, str): self._query_generator = query else: @@ -416,6 +432,7 @@ def query_generator() -> Iterator[str]: self.config = config self._full_feature_names = full_feature_names self._on_demand_feature_views = on_demand_feature_views or [] + self._feature_views = feature_views self._metadata = metadata self.export_path: Optional[str] if self.config.offline_store.blob_export_location: @@ -436,6 +453,20 @@ def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: self.snowflake_conn, self.to_sql() ).fetch_pandas_all() + for feature_view in self._feature_views: + for feature in feature_view.features: + if feature.dtype in [ + Array(String), + Array(Bytes), + Array(Int32), + Array(Int64), + Array(UnixTimestamp), + Array(Float64), + Array(Float32), + Array(Bool), + ]: + df[feature.name] = [json.loads(x) for x in df[feature.name]] + return df def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: diff --git a/sdk/python/feast/infra/offline_stores/snowflake_source.py b/sdk/python/feast/infra/offline_stores/snowflake_source.py index 95bd46f1ec..0cbf82dd1c 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake_source.py +++ b/sdk/python/feast/infra/offline_stores/snowflake_source.py @@ -279,12 +279,12 @@ def get_table_column_names_and_types( else: row["snowflake_type"] = "NUMBERwSCALE" - elif row["type_code"] in [5, 9, 10, 12]: + elif row["type_code"] in [5, 9, 12]: error = snowflake_unsupported_map[row["type_code"]] raise NotImplementedError( f"The following Snowflake Data Type is not supported: {error}" ) - elif row["type_code"] in [1, 2, 3, 4, 6, 7, 8, 11, 13]: + elif row["type_code"] in [1, 2, 3, 4, 6, 7, 8, 10, 11, 13]: row["snowflake_type"] = snowflake_type_code_map[row["type_code"]] else: raise NotImplementedError( @@ -305,6 +305,7 @@ def get_table_column_names_and_types( 6: "TIMESTAMP_LTZ", 7: "TIMESTAMP_TZ", 8: "TIMESTAMP_NTZ", + 10: "ARRAY", 11: "BINARY", 13: "BOOLEAN", } @@ -312,7 +313,6 @@ def get_table_column_names_and_types( snowflake_unsupported_map = { 5: "VARIANT -- Try converting to VARCHAR", 9: "OBJECT -- Try converting to VARCHAR", - 10: "ARRAY -- Try converting to VARCHAR", 12: "TIME -- Try converting to VARCHAR", } diff --git a/sdk/python/feast/infra/utils/snowflake/snowpark/snowflake_python_udfs_creation.sql b/sdk/python/feast/infra/utils/snowflake/snowpark/snowflake_python_udfs_creation.sql index a197a3ee4c..a444c0b7c5 100644 --- a/sdk/python/feast/infra/utils/snowflake/snowpark/snowflake_python_udfs_creation.sql +++ b/sdk/python/feast/infra/utils/snowflake/snowpark/snowflake_python_udfs_creation.sql @@ -14,6 +14,62 @@ CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_varchar_to_string_pro HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_varchar_to_string_proto' IMPORTS = ('@STAGE_HOLDER/feast.zip'); +CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_bytes_to_list_bytes_proto(df ARRAY) + RETURNS BINARY + LANGUAGE PYTHON + RUNTIME_VERSION = '3.8' + PACKAGES = ('protobuf', 'pandas') + HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_bytes_to_list_bytes_proto' + IMPORTS = ('@STAGE_HOLDER/feast.zip'); + +CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_varchar_to_list_string_proto(df ARRAY) + RETURNS BINARY + LANGUAGE PYTHON + RUNTIME_VERSION = '3.8' + PACKAGES = ('protobuf', 'pandas') + HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_varchar_to_list_string_proto' + IMPORTS = ('@STAGE_HOLDER/feast.zip'); + +CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_number_to_list_int32_proto(df ARRAY) + RETURNS BINARY + LANGUAGE PYTHON + RUNTIME_VERSION = '3.8' + PACKAGES = ('protobuf', 'pandas') + HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int32_proto' + IMPORTS = ('@STAGE_HOLDER/feast.zip'); + +CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_number_to_list_int64_proto(df ARRAY) + RETURNS BINARY + LANGUAGE PYTHON + RUNTIME_VERSION = '3.8' + PACKAGES = ('protobuf', 'pandas') + HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int64_proto' + IMPORTS = ('@STAGE_HOLDER/feast.zip'); + +CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_float_to_list_double_proto(df ARRAY) + RETURNS BINARY + LANGUAGE PYTHON + RUNTIME_VERSION = '3.8' + PACKAGES = ('protobuf', 'pandas') + HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_float_to_list_double_proto' + IMPORTS = ('@STAGE_HOLDER/feast.zip'); + +CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_boolean_to_list_bool_proto(df ARRAY) + RETURNS BINARY + LANGUAGE PYTHON + RUNTIME_VERSION = '3.8' + PACKAGES = ('protobuf', 'pandas') + HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_boolean_to_list_bool_proto' + IMPORTS = ('@STAGE_HOLDER/feast.zip'); + +CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_timestamp_to_list_unix_timestamp_proto(df ARRAY) + RETURNS BINARY + LANGUAGE PYTHON + RUNTIME_VERSION = '3.8' + PACKAGES = ('protobuf', 'pandas') + HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_timestamp_to_list_unix_timestamp_proto' + IMPORTS = ('@STAGE_HOLDER/feast.zip'); + CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_number_to_int32_proto(df NUMBER) RETURNS BINARY LANGUAGE PYTHON diff --git a/sdk/python/feast/infra/utils/snowflake/snowpark/snowflake_udfs.py b/sdk/python/feast/infra/utils/snowflake/snowpark/snowflake_udfs.py index 02311ca55d..f5d5f10631 100644 --- a/sdk/python/feast/infra/utils/snowflake/snowpark/snowflake_udfs.py +++ b/sdk/python/feast/infra/utils/snowflake/snowpark/snowflake_udfs.py @@ -1,6 +1,7 @@ import sys from binascii import unhexlify +import numpy as np import pandas from _snowflake import vectorized @@ -59,6 +60,180 @@ def feast_snowflake_varchar_to_string_proto(df): return df +""" +CREATE OR REPLACE FUNCTION feast_snowflake_array_bytes_to_list_bytes_proto(df ARRAY) + RETURNS BINARY + LANGUAGE PYTHON + RUNTIME_VERSION = '3.8' + PACKAGES = ('protobuf', 'pandas') + HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_bytes_to_list_bytes_proto' + IMPORTS = ('@feast_stage/feast.zip'); +""" +# ValueType.STRING_LIST = 12 +@vectorized(input=pandas.DataFrame) +def feast_snowflake_array_bytes_to_list_bytes_proto(df): + sys._xoptions["snowflake_partner_attribution"].append("feast") + + # Sometimes bytes come in as strings so we need to convert back to float + numpy_arrays = np.asarray(df[0].to_list()).astype(bytes) + + df = list( + map( + ValueProto.SerializeToString, + python_values_to_proto_values(numpy_arrays, ValueType.BYTES_LIST), + ) + ) + return df + + +""" +CREATE OR REPLACE FUNCTION feast_snowflake_array_varchar_to_list_string_proto(df ARRAY) + RETURNS BINARY + LANGUAGE PYTHON + RUNTIME_VERSION = '3.8' + PACKAGES = ('protobuf', 'pandas') + HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_varchar_to_list_string_proto' + IMPORTS = ('@feast_stage/feast.zip'); +""" + + +@vectorized(input=pandas.DataFrame) +def feast_snowflake_array_varchar_to_list_string_proto(df): + sys._xoptions["snowflake_partner_attribution"].append("feast") + + df = list( + map( + ValueProto.SerializeToString, + python_values_to_proto_values(df[0].to_numpy(), ValueType.STRING_LIST), + ) + ) + return df + + +""" +CREATE OR REPLACE FUNCTION feast_snowflake_array_number_to_list_int32_proto(df ARRAY) + RETURNS BINARY + LANGUAGE PYTHON + RUNTIME_VERSION = '3.8' + PACKAGES = ('protobuf', 'pandas') + HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int32_proto' + IMPORTS = ('@feast_stage/feast.zip'); +""" + + +@vectorized(input=pandas.DataFrame) +def feast_snowflake_array_number_to_list_int32_proto(df): + sys._xoptions["snowflake_partner_attribution"].append("feast") + + df = list( + map( + ValueProto.SerializeToString, + python_values_to_proto_values(df[0].to_numpy(), ValueType.INT32_LIST), + ) + ) + return df + + +""" +CREATE OR REPLACE FUNCTION feast_snowflake_array_number_to_list_int64_proto(df ARRAY) + RETURNS BINARY + LANGUAGE PYTHON + RUNTIME_VERSION = '3.8' + PACKAGES = ('protobuf', 'pandas') + HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int64_proto' + IMPORTS = ('@feast_stage/feast.zip'); +""" + + +@vectorized(input=pandas.DataFrame) +def feast_snowflake_array_number_to_list_int64_proto(df): + sys._xoptions["snowflake_partner_attribution"].append("feast") + + df = list( + map( + ValueProto.SerializeToString, + python_values_to_proto_values(df[0].to_numpy(), ValueType.INT64_LIST), + ) + ) + return df + + +""" +CREATE OR REPLACE FUNCTION feast_snowflake_array_float_to_list_double_proto(df ARRAY) + RETURNS BINARY + LANGUAGE PYTHON + RUNTIME_VERSION = '3.8' + PACKAGES = ('protobuf', 'pandas') + HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_float_to_list_double_proto' + IMPORTS = ('@feast_stage/feast.zip'); +""" + + +@vectorized(input=pandas.DataFrame) +def feast_snowflake_array_float_to_list_double_proto(df): + sys._xoptions["snowflake_partner_attribution"].append("feast") + + numpy_arrays = np.asarray(df[0].to_list()).astype(float) + + df = list( + map( + ValueProto.SerializeToString, + python_values_to_proto_values(numpy_arrays, ValueType.DOUBLE_LIST), + ) + ) + return df + + +""" +CREATE OR REPLACE FUNCTION feast_snowflake_array_boolean_to_list_bool_proto(df ARRAY) + RETURNS BINARY + LANGUAGE PYTHON + RUNTIME_VERSION = '3.8' + PACKAGES = ('protobuf', 'pandas') + HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_boolean_to_list_bool_proto' + IMPORTS = ('@feast_stage/feast.zip'); +""" + + +@vectorized(input=pandas.DataFrame) +def feast_snowflake_array_boolean_to_list_bool_proto(df): + sys._xoptions["snowflake_partner_attribution"].append("feast") + + df = list( + map( + ValueProto.SerializeToString, + python_values_to_proto_values(df[0].to_numpy(), ValueType.BOOL_LIST), + ) + ) + return df + + +""" +CREATE OR REPLACE FUNCTION feast_snowflake_array_timestamp_to_list_unix_timestamp_proto(df ARRAY) + RETURNS BINARY + LANGUAGE PYTHON + RUNTIME_VERSION = '3.8' + PACKAGES = ('protobuf', 'pandas') + HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_timestamp_to_list_unix_timestamp_proto' + IMPORTS = ('@feast_stage/feast.zip'); +""" + + +@vectorized(input=pandas.DataFrame) +def feast_snowflake_array_timestamp_to_list_unix_timestamp_proto(df): + sys._xoptions["snowflake_partner_attribution"].append("feast") + + numpy_arrays = np.asarray(df[0].to_list()).astype(np.datetime64) + + df = list( + map( + ValueProto.SerializeToString, + python_values_to_proto_values(numpy_arrays, ValueType.UNIX_TIMESTAMP_LIST), + ) + ) + return df + + """ CREATE OR REPLACE FUNCTION feast_snowflake_number_to_int32_proto(df NUMBER) RETURNS BINARY diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 9dbbb5a64c..e51e1e743b 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -680,6 +680,14 @@ def _convert_value_name_to_snowflake_udf(value_name: str, project_name: str) -> "FLOAT": f"feast_{project_name}_snowflake_float_to_double_proto", "BOOL": f"feast_{project_name}_snowflake_boolean_to_bool_proto", "UNIX_TIMESTAMP": f"feast_{project_name}_snowflake_timestamp_to_unix_timestamp_proto", + "BYTES_LIST": f"feast_{project_name}_snowflake_array_bytes_to_list_bytes_proto", + "STRING_LIST": f"feast_{project_name}_snowflake_array_varchar_to_list_string_proto", + "INT32_LIST": f"feast_{project_name}_snowflake_array_number_to_list_int32_proto", + "INT64_LIST": f"feast_{project_name}_snowflake_array_number_to_list_int64_proto", + "DOUBLE_LIST": f"feast_{project_name}_snowflake_array_float_to_list_double_proto", + "FLOAT_LIST": f"feast_{project_name}_snowflake_array_float_to_list_double_proto", + "BOOL_LIST": f"feast_{project_name}_snowflake_array_boolean_to_list_bool_proto", + "UNIX_TIMESTAMP_LIST": f"feast_{project_name}_snowflake_array_timestamp_to_list_unix_timestamp_proto", } return name_map[value_name].upper() diff --git a/sdk/python/tests/data/data_creator.py b/sdk/python/tests/data/data_creator.py index 2155468445..8d5b1979fa 100644 --- a/sdk/python/tests/data/data_creator.py +++ b/sdk/python/tests/data/data_creator.py @@ -59,6 +59,7 @@ def get_feature_values_for_dtype( "int64": [1, 2, 3, 4, 5], "float": [1.0, None, 3.0, 4.0, 5.0], "string": ["1", None, "3", "4", "5"], + "bytes": [b"1", None, b"3", b"4", b"5"], "bool": [True, None, False, True, False], "datetime": [ datetime(1980, 1, 1), diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index fda5b3c11d..027dea2c58 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -83,8 +83,8 @@ "password": os.getenv("SNOWFLAKE_CI_PASSWORD", ""), "role": os.getenv("SNOWFLAKE_CI_ROLE", ""), "warehouse": os.getenv("SNOWFLAKE_CI_WAREHOUSE", ""), - "database": "FEAST", - "schema": "ONLINE", + "database": os.getenv("SNOWFLAKE_CI_DATABASE", "FEAST"), + "schema": os.getenv("SNOWFLAKE_CI_SCHEMA_ONLINE", "ONLINE"), } BIGTABLE_CONFIG = { diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py index c7e5961a88..c14780da97 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py @@ -36,8 +36,8 @@ def __init__(self, project_name: str, *args, **kwargs): password=os.environ["SNOWFLAKE_CI_PASSWORD"], role=os.environ["SNOWFLAKE_CI_ROLE"], warehouse=os.environ["SNOWFLAKE_CI_WAREHOUSE"], - database="FEAST", - schema="OFFLINE", + database=os.environ.get("SNOWFLAKE_CI_DATABASE", "FEAST"), + schema=os.environ.get("SNOWFLAKE_CI_SCHEMA_OFFLINE", "OFFLINE"), storage_integration_name=os.getenv("BLOB_EXPORT_STORAGE_NAME", "FEAST_S3"), blob_export_location=os.getenv( "BLOB_EXPORT_URI", "s3://feast-snowflake-offload/export" diff --git a/sdk/python/tests/integration/materialization/test_snowflake.py b/sdk/python/tests/integration/materialization/test_snowflake.py index 0cf1471dfe..daa96a87c9 100644 --- a/sdk/python/tests/integration/materialization/test_snowflake.py +++ b/sdk/python/tests/integration/materialization/test_snowflake.py @@ -1,10 +1,13 @@ import os -from datetime import timedelta +from datetime import datetime, timedelta import pytest +from pytz import utc +from feast import Field from feast.entity import Entity from feast.feature_view import FeatureView +from feast.types import Array, Bool, Bytes, Float64, Int32, Int64, String, UnixTimestamp from tests.data.data_creator import create_basic_driver_dataset from tests.integration.feature_repos.integration_test_repo_config import ( IntegrationTestRepoConfig, @@ -24,8 +27,8 @@ "password": os.getenv("SNOWFLAKE_CI_PASSWORD", ""), "role": os.getenv("SNOWFLAKE_CI_ROLE", ""), "warehouse": os.getenv("SNOWFLAKE_CI_WAREHOUSE", ""), - "database": "FEAST", - "schema": "MATERIALIZATION", + "database": os.getenv("SNOWFLAKE_CI_DATABASE", "FEAST"), + "schema": os.getenv("SNOWFLAKE_CI_SCHEMA_MATERIALIZATION", "MATERIALIZATION"), } SNOWFLAKE_ONLINE_CONFIG = { @@ -35,15 +38,16 @@ "password": os.getenv("SNOWFLAKE_CI_PASSWORD", ""), "role": os.getenv("SNOWFLAKE_CI_ROLE", ""), "warehouse": os.getenv("SNOWFLAKE_CI_WAREHOUSE", ""), - "database": "FEAST", - "schema": "ONLINE", + "database": os.getenv("SNOWFLAKE_CI_DATABASE", "FEAST"), + "schema": os.getenv("SNOWFLAKE_CI_SCHEMA_ONLINE", "ONLINE"), } +@pytest.mark.parametrize("online_store", [SNOWFLAKE_ONLINE_CONFIG, "sqlite"]) @pytest.mark.integration -def test_snowflake_materialization_consistency_internal(): +def test_snowflake_materialization_consistency(online_store): snowflake_config = IntegrationTestRepoConfig( - online_store=SNOWFLAKE_ONLINE_CONFIG, + online_store=online_store, offline_store_creator=SnowflakeDataSourceCreator, batch_engine=SNOWFLAKE_ENGINE_CONFIG, ) @@ -84,15 +88,32 @@ def test_snowflake_materialization_consistency_internal(): snowflake_environment.data_source_creator.teardown() +@pytest.mark.parametrize( + "feature_dtype, feast_dtype", + [ + ("string", Array(String)), + ("bytes", Array(Bytes)), + ("int32", Array(Int32)), + ("int64", Array(Int64)), + ("float", Array(Float64)), + ("bool", Array(Bool)), + ("datetime", Array(UnixTimestamp)), + ], +) +@pytest.mark.parametrize("feature_is_empty_list", [False]) +@pytest.mark.parametrize("online_store", [SNOWFLAKE_ONLINE_CONFIG, "sqlite"]) @pytest.mark.integration -def test_snowflake_materialization_consistency_external(): +def test_snowflake_materialization_consistency_internal_with_lists( + feature_dtype, feast_dtype, feature_is_empty_list, online_store +): snowflake_config = IntegrationTestRepoConfig( + online_store=online_store, offline_store_creator=SnowflakeDataSourceCreator, batch_engine=SNOWFLAKE_ENGINE_CONFIG, ) snowflake_environment = construct_test_environment(snowflake_config, None) - df = create_basic_driver_dataset() + df = create_basic_driver_dataset(Int32, feature_dtype, True, feature_is_empty_list) ds = snowflake_environment.data_source_creator.create_data_source( df, snowflake_environment.feature_store.project, @@ -105,23 +126,62 @@ def test_snowflake_materialization_consistency_external(): join_keys=["driver_id"], ) + schema = [ + Field(name="driver_id", dtype=Int32), + Field(name="value", dtype=feast_dtype), + ] driver_stats_fv = FeatureView( name="driver_hourly_stats", entities=[driver], ttl=timedelta(weeks=52), + schema=schema, source=ds, ) try: fs.apply([driver, driver_stats_fv]) - # materialization is run in two steps and - # we use timestamp from generated dataframe as a split point split_dt = df["ts_1"][4].to_pydatetime() - timedelta(seconds=1) print(f"Split datetime: {split_dt}") + now = datetime.utcnow() + + full_feature_names = True + start_date = (now - timedelta(hours=5)).replace(tzinfo=utc) + end_date = split_dt + fs.materialize( + feature_views=[driver_stats_fv.name], + start_date=start_date, + end_date=end_date, + ) + + expected_values = { + "int32": [3] * 2, + "int64": [3] * 2, + "float": [3.0] * 2, + "string": ["3"] * 2, + "bytes": [b"3"] * 2, + "bool": [False] * 2, + "datetime": [datetime(1981, 1, 1, tzinfo=utc)] * 2, + } + expected_value = [] if feature_is_empty_list else expected_values[feature_dtype] + + response_dict = fs.get_online_features( + [f"{driver_stats_fv.name}:value"], + [{"driver_id": 1}], + full_feature_names=full_feature_names, + ).to_dict() + + actual_value = response_dict[f"{driver_stats_fv.name}__value"][0] + assert actual_value is not None, f"Response: {response_dict}" + if feature_dtype == "float": + for actual_num, expected_num in zip(actual_value, expected_value): + assert ( + abs(actual_num - expected_num) < 1e-6 + ), f"Response: {response_dict}, Expected: {expected_value}" + else: + assert actual_value == expected_value - validate_offline_online_store_consistency(fs, driver_stats_fv, split_dt) finally: fs.teardown() snowflake_environment.data_source_creator.teardown()