From 39aeea3fa77c3b3a789556a1e0fa22ecedcae4ea Mon Sep 17 00:00:00 2001 From: sfc-gh-madkins <82121043+sfc-gh-madkins@users.noreply.github.com> Date: Mon, 12 Sep 2022 12:34:29 -0500 Subject: [PATCH] feat: Add tag kwarg to set Snowflake online store table path (#3176) Signed-off-by: Miles Adkins Signed-off-by: Miles Adkins --- docs/reference/online-stores/snowflake.md | 16 +++++++++++-- .../infra/materialization/snowflake_engine.py | 10 +++++--- .../feast/infra/online_stores/snowflake.py | 24 +++++++++---------- .../infra/utils/snowflake/snowflake_utils.py | 16 +++++++++++++ 4 files changed, 49 insertions(+), 17 deletions(-) diff --git a/docs/reference/online-stores/snowflake.md b/docs/reference/online-stores/snowflake.md index 7db45dd7bd..1f79bbfdeb 100644 --- a/docs/reference/online-stores/snowflake.md +++ b/docs/reference/online-stores/snowflake.md @@ -17,7 +17,6 @@ The data model for using a Snowflake Transient Table as an online store follows (This model may be subject to change when Snowflake Hybrid Tables are released) ## Example - {% code title="feature_store.yaml" %} ```yaml project: my_feature_repo @@ -34,6 +33,19 @@ online_store: ``` {% endcode %} +## Tags KWARGs Actions: + +"ONLINE_PATH": Adding the "ONLINE_PATH" key to a FeatureView tags parameter allows you to choose the online table path for the online serving table (ex. "{database}"."{schema}"). + +{% code title="example_config.py" %} +```python +driver_stats_fv = FeatureView( + ... + tags={"snowflake-online-store/online_path": '"FEAST"."ONLINE"'}, +) +``` +{% endcode %} + The full set of configuration options is available in [SnowflakeOnlineStoreConfig](https://rtd.feast.dev/en/latest/#feast.infra.online_stores.snowflake.SnowflakeOnlineStoreConfig). ## Functionality Matrix @@ -41,7 +53,7 @@ The full set of configuration options is available in [SnowflakeOnlineStoreConfi The set of functionality supported by online stores is described in detail [here](overview.md#functionality). Below is a matrix indicating which functionality is supported by the Snowflake online store. -| | Snowflake | +| | Snowflake | | :-------------------------------------------------------- | :-- | | write feature values to the online store | yes | | read feature values from the online store | yes | diff --git a/sdk/python/feast/infra/materialization/snowflake_engine.py b/sdk/python/feast/infra/materialization/snowflake_engine.py index de498fe4e1..0219a7923f 100644 --- a/sdk/python/feast/infra/materialization/snowflake_engine.py +++ b/sdk/python/feast/infra/materialization/snowflake_engine.py @@ -28,6 +28,7 @@ assert_snowflake_feature_names, execute_snowflake_statement, get_snowflake_conn, + get_snowflake_online_store_path, package_snowpark_zip, ) from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -370,8 +371,6 @@ def materialize_to_snowflake_online_store( ) -> None: assert_snowflake_feature_names(feature_view) - online_table = f"""{repo_config .online_store.database}"."{repo_config.online_store.schema_}"."[online-transient] {project}_{feature_view.name}""" - feature_names_str = '", "'.join( [feature.name for feature in feature_view.features] ) @@ -381,8 +380,13 @@ def materialize_to_snowflake_online_store( else: fv_created_str = None + online_path = get_snowflake_online_store_path(repo_config, feature_view) + online_table = ( + f'{online_path}."[online-transient] {project}_{feature_view.name}"' + ) + query = f""" - MERGE INTO "{online_table}" online_table + MERGE INTO {online_table} online_table USING ( SELECT "entity_key" || TO_BINARY("feature_name", 'UTF-8') AS "entity_feature_key", diff --git a/sdk/python/feast/infra/online_stores/snowflake.py b/sdk/python/feast/infra/online_stores/snowflake.py index a5c0308503..c4474dff38 100644 --- a/sdk/python/feast/infra/online_stores/snowflake.py +++ b/sdk/python/feast/infra/online_stores/snowflake.py @@ -15,6 +15,7 @@ from feast.infra.utils.snowflake.snowflake_utils import ( execute_snowflake_statement, get_snowflake_conn, + get_snowflake_online_store_path, write_pandas_binary, ) from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -112,9 +113,7 @@ def online_write_batch( agg_df = pd.concat(dfs) # This combines both the data upload plus the overwrite in the same transaction - table_path = ( - f'"{config.online_store.database}"."{config.online_store.schema_}"' - ) + online_path = get_snowflake_online_store_path(config, table) with get_snowflake_conn(config.online_store, autocommit=False) as conn: write_pandas_binary( conn, @@ -125,7 +124,7 @@ def online_write_batch( ) # special function for writing binary to snowflake query = f""" - INSERT OVERWRITE INTO {table_path}."[online-transient] {config.project}_{table.name}" + INSERT OVERWRITE INTO {online_path}."[online-transient] {config.project}_{table.name}" SELECT "entity_feature_key", "entity_key", @@ -138,7 +137,7 @@ def online_write_batch( *, ROW_NUMBER() OVER(PARTITION BY "entity_key","feature_name" ORDER BY "event_ts" DESC, "created_ts" DESC) AS "_feast_row" FROM - {table_path}."[online-transient] {config.project}_{table.name}") + {online_path}."[online-transient] {config.project}_{table.name}") WHERE "_feast_row" = 1; """ @@ -178,13 +177,13 @@ def online_read( ] ) - table_path = f'"{config.online_store.database}"."{config.online_store.schema_}"' + online_path = get_snowflake_online_store_path(config, table) with get_snowflake_conn(config.online_store) as conn: query = f""" SELECT "entity_key", "feature_name", "value", "event_ts" FROM - {table_path}."[online-transient] {config.project}_{table.name}" + {online_path}."[online-transient] {config.project}_{table.name}" WHERE "entity_feature_key" IN ({entity_fetch_str}) """ @@ -221,11 +220,11 @@ def update( ): assert isinstance(config.online_store, SnowflakeOnlineStoreConfig) - table_path = f'"{config.online_store.database}"."{config.online_store.schema_}"' with get_snowflake_conn(config.online_store) as conn: for table in tables_to_keep: + online_path = get_snowflake_online_store_path(config, table) query = f""" - CREATE TRANSIENT TABLE IF NOT EXISTS {table_path}."[online-transient] {config.project}_{table.name}" ( + CREATE TRANSIENT TABLE IF NOT EXISTS {online_path}."[online-transient] {config.project}_{table.name}" ( "entity_feature_key" BINARY, "entity_key" BINARY, "feature_name" VARCHAR, @@ -237,7 +236,8 @@ def update( execute_snowflake_statement(conn, query) for table in tables_to_delete: - query = f'DROP TABLE IF EXISTS {table_path}."[online-transient] {config.project}_{table.name}"' + online_path = get_snowflake_online_store_path(config, table) + query = f'DROP TABLE IF EXISTS {online_path}."[online-transient] {config.project}_{table.name}"' execute_snowflake_statement(conn, query) def teardown( @@ -248,8 +248,8 @@ def teardown( ): assert isinstance(config.online_store, SnowflakeOnlineStoreConfig) - table_path = f'"{config.online_store.database}"."{config.online_store.schema_}"' with get_snowflake_conn(config.online_store) as conn: for table in tables: - query = f'DROP TABLE IF EXISTS {table_path}."[online-transient] {config.project}_{table.name}"' + online_path = get_snowflake_online_store_path(config, table) + query = f'DROP TABLE IF EXISTS {online_path}."[online-transient] {config.project}_{table.name}"' execute_snowflake_statement(conn, query) diff --git a/sdk/python/feast/infra/utils/snowflake/snowflake_utils.py b/sdk/python/feast/infra/utils/snowflake/snowflake_utils.py index c7b27d8331..a5d2b05d45 100644 --- a/sdk/python/feast/infra/utils/snowflake/snowflake_utils.py +++ b/sdk/python/feast/infra/utils/snowflake/snowflake_utils.py @@ -22,6 +22,7 @@ import feast from feast.errors import SnowflakeIncompleteConfig, SnowflakeQueryUnknownError from feast.feature_view import FeatureView +from feast.repo_config import RepoConfig try: import snowflake.connector @@ -104,6 +105,21 @@ def get_snowflake_conn(config, autocommit=True) -> SnowflakeConnection: raise SnowflakeIncompleteConfig(e) +def get_snowflake_online_store_path( + config: RepoConfig, + feature_view: FeatureView, +) -> str: + path_tag = "snowflake-online-store/online_path" + if path_tag in feature_view.tags: + online_path = feature_view.tags[path_tag] + else: + online_path = ( + f'"{config.online_store.database}"."{config.online_store.schema_}"' + ) + + return online_path + + def package_snowpark_zip(project_name) -> Tuple[str, str]: path = os.path.dirname(feast.__file__) copy_path = path + f"/snowflake_feast_{project_name}"