diff --git a/README.md b/README.md index 649bb909fa..7ede0c612a 100644 --- a/README.md +++ b/README.md @@ -136,6 +136,7 @@ The list below contains the functionality that contributors are planning to deve * Want to speak to a Feast contributor? We are more than happy to jump on a call. Please schedule a time using [Calendly](https://calendly.com/d/x2ry-g5bb/meet-with-feast-team). * **Data Sources** + * [x] [Snowflake source](https://docs.feast.dev/reference/data-sources/snowflake) * [x] [Redshift source](https://docs.feast.dev/reference/data-sources/redshift) * [x] [BigQuery source](https://docs.feast.dev/reference/data-sources/bigquery) * [x] [Parquet file source](https://docs.feast.dev/reference/data-sources/file) @@ -143,9 +144,9 @@ The list below contains the functionality that contributors are planning to deve * [x] [Hive (community plugin)](https://github.com/baineng/feast-hive) * [x] [Postgres (community plugin)](https://github.com/nossrannug/feast-postgres) * [x] Kafka source (with [push support into the online store](reference/alpha-stream-ingestion.md)) - * [x] [Snowflake source (community plugin)](https://github.com/sfc-gh-madkins/feast-snowflake) * [ ] HTTP source * **Offline Stores** + * [x] [Snowflake](https://docs.feast.dev/reference/offline-stores/snowflake) * [x] [Redshift](https://docs.feast.dev/reference/offline-stores/redshift) * [x] [BigQuery](https://docs.feast.dev/reference/offline-stores/bigquery) * [x] [Synapse (community plugin)](https://github.com/Azure/feast-azure) @@ -153,7 +154,6 @@ The list below contains the functionality that contributors are planning to deve * [x] [Postgres (community plugin)](https://github.com/nossrannug/feast-postgres) * [x] [In-memory / Pandas](https://docs.feast.dev/reference/offline-stores/file) * [x] [Custom offline store support](https://docs.feast.dev/how-to-guides/adding-a-new-offline-store) - * [x] [Snowflake (community plugin)](https://github.com/sfc-gh-madkins/feast-snowflake) * [x] [Trino (communiuty plugin)](https://github.com/Shopify/feast-trino) * **Online Stores** * [x] [DynamoDB](https://docs.feast.dev/reference/online-stores/dynamodb) @@ -208,7 +208,7 @@ The list below contains the functionality that contributors are planning to deve Please refer to the official documentation at [Documentation](https://docs.feast.dev/) * [Quickstart](https://docs.feast.dev/getting-started/quickstart) * [Tutorials](https://docs.feast.dev/tutorials/tutorials-overview) - * [Running Feast with GCP/AWS](https://docs.feast.dev/how-to-guides/feast-gcp-aws) + * [Running Feast with Snowflake/GCP/AWS](https://docs.feast.dev/how-to-guides/feast-snowflake-gcp-aws) * [Change Log](https://github.com/feast-dev/feast/blob/master/CHANGELOG.md) * [Slack (#Feast)](https://slack.feast.dev/) @@ -224,4 +224,4 @@ Thanks goes to these incredible people: - \ No newline at end of file + diff --git a/docs/README.md b/docs/README.md index 1a76adbde3..d5c5177a18 100644 --- a/docs/README.md +++ b/docs/README.md @@ -52,6 +52,6 @@ Explore the following resources to get started with Feast: * [Concepts](getting-started/concepts/) describes all important Feast API concepts * [Architecture](getting-started/architecture-and-components/) describes Feast's overall architecture. * [Tutorials](tutorials/tutorials-overview.md) shows full examples of using Feast in machine learning applications. -* [Running Feast with GCP/AWS](how-to-guides/feast-gcp-aws/) provides a more in-depth guide to using Feast. +* [Running Feast with Snowflake/GCP/AWS](how-to-guides/feast-snowflake-gcp-aws/) provides a more in-depth guide to using Feast. * [Reference](reference/feast-cli-commands.md) contains detailed API and design documents. * [Contributing](project/contributing.md) contains resources for anyone who wants to contribute to Feast. diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index ae23cd5d40..e1343ec485 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -33,10 +33,11 @@ * [Driver ranking](tutorials/driver-ranking-with-feast.md) * [Fraud detection on GCP](tutorials/fraud-detection.md) * [Real-time credit scoring on AWS](tutorials/real-time-credit-scoring-on-aws.md) +* [Driver Stats using Snowflake](tutorials/driver-stats-using-snowflake.md) ## How-to Guides -* [Running Feast with GCP/AWS](how-to-guides/feast-gcp-aws/README.md) +* [Running Feast with Snowflake/GCP/AWS](how-to-guides/feast-snowflake-gcp-aws/README.md) * [Install Feast](how-to-guides/feast-gcp-aws/install-feast.md) * [Create a feature repository](how-to-guides/feast-gcp-aws/create-a-feature-repository.md) * [Deploy a feature store](how-to-guides/feast-gcp-aws/deploy-a-feature-store.md) @@ -54,10 +55,12 @@ * [Data sources](reference/data-sources/README.md) * [File](reference/data-sources/file.md) + * [Snowflake](reference/data-sources/snowflake.md) * [BigQuery](reference/data-sources/bigquery.md) * [Redshift](reference/data-sources/redshift.md) * [Offline stores](reference/offline-stores/README.md) * [File](reference/offline-stores/file.md) + * [Snowflake](reference/offline-stores/snowflake.md) * [BigQuery](reference/offline-stores/bigquery.md) * [Redshift](reference/offline-stores/redshift.md) * [Online stores](reference/online-stores/README.md) diff --git a/docs/getting-started/third-party-integrations.md b/docs/getting-started/third-party-integrations.md index 31b6acdc88..a3a41bb836 100644 --- a/docs/getting-started/third-party-integrations.md +++ b/docs/getting-started/third-party-integrations.md @@ -13,6 +13,7 @@ Don't see your offline store or online store of choice here? Check out our guide ### **Data Sources** +* [x] [Snowflake source](https://docs.feast.dev/reference/data-sources/snowflake) * [x] [Redshift source](https://docs.feast.dev/reference/data-sources/redshift) * [x] [BigQuery source](https://docs.feast.dev/reference/data-sources/bigquery) * [x] [Parquet file source](https://docs.feast.dev/reference/data-sources/file) @@ -20,11 +21,11 @@ Don't see your offline store or online store of choice here? Check out our guide * [x] [Hive (community plugin)](https://github.com/baineng/feast-hive) * [x] [Postgres (community plugin)](https://github.com/nossrannug/feast-postgres) * [x] Kafka source (with [push support into the online store](https://docs.feast.dev/reference/alpha-stream-ingestion)) -* [x] [Snowflake source (community plugin)](https://github.com/sfc-gh-madkins/feast-snowflake) * [ ] HTTP source ### Offline Stores +* [x] [Snowflake](https://docs.feast.dev/reference/offline-stores/snowflake) * [x] [Redshift](https://docs.feast.dev/reference/offline-stores/redshift) * [x] [BigQuery](https://docs.feast.dev/reference/offline-stores/bigquery) * [x] [Synapse (community plugin)](https://github.com/Azure/feast-azure) @@ -32,7 +33,6 @@ Don't see your offline store or online store of choice here? Check out our guide * [x] [Postgres (community plugin)](https://github.com/nossrannug/feast-postgres) * [x] [In-memory / Pandas](https://docs.feast.dev/reference/offline-stores/file) * [x] [Custom offline store support](https://docs.feast.dev/how-to-guides/adding-a-new-offline-store) -* [x] [Snowflake source (community plugin)](https://github.com/sfc-gh-madkins/feast-snowflake) * [x] [Trino (communiuty plugin)](https://github.com/Shopify/feast-trino) ### Online Stores diff --git a/docs/how-to-guides/feast-gcp-aws/README.md b/docs/how-to-guides/feast-snowflake-gcp-aws/README.md similarity index 100% rename from docs/how-to-guides/feast-gcp-aws/README.md rename to docs/how-to-guides/feast-snowflake-gcp-aws/README.md diff --git a/docs/how-to-guides/feast-gcp-aws/build-a-training-dataset.md b/docs/how-to-guides/feast-snowflake-gcp-aws/build-a-training-dataset.md similarity index 100% rename from docs/how-to-guides/feast-gcp-aws/build-a-training-dataset.md rename to docs/how-to-guides/feast-snowflake-gcp-aws/build-a-training-dataset.md diff --git a/docs/how-to-guides/feast-gcp-aws/create-a-feature-repository.md b/docs/how-to-guides/feast-snowflake-gcp-aws/create-a-feature-repository.md similarity index 84% rename from docs/how-to-guides/feast-gcp-aws/create-a-feature-repository.md rename to docs/how-to-guides/feast-snowflake-gcp-aws/create-a-feature-repository.md index 1add0a92e8..8754bc051a 100644 --- a/docs/how-to-guides/feast-gcp-aws/create-a-feature-repository.md +++ b/docs/how-to-guides/feast-snowflake-gcp-aws/create-a-feature-repository.md @@ -13,6 +13,21 @@ Creating a new Feast repository in /<...>/tiny_pika. ``` {% endtab %} +{% tabs %} +{% tab title="Snowflake template" %} +```bash +feast init -t snowflake +Snowflake Deployment URL: ... +Snowflake User Name: ... +Snowflake Password: ... +Snowflake Role Name: ... +Snowflake Warehouse Name: ... +Snowflake Database Name: ... + +Creating a new Feast repository in /<...>/tiny_pika. +``` +{% endtab %} + {% tab title="GCP template" %} ```text feast init -t gcp @@ -30,7 +45,7 @@ Redshift Database Name: ... Redshift User Name: ... Redshift S3 Staging Location (s3://*): ... Redshift IAM Role for S3 (arn:aws:iam::*:role/*): ... -Should I upload example data to Redshift (overwriting 'feast_driver_hourly_stats' table)? (Y/n): +Should I upload example data to Redshift (overwriting 'feast_driver_hourly_stats' table)? (Y/n): Creating a new Feast repository in /<...>/tiny_pika. ``` @@ -63,4 +78,3 @@ You can now use this feature repository for development. You can try the followi * Run `feast apply` to apply these definitions to Feast. * Edit the example feature definitions in `example.py` and run `feast apply` again to change feature definitions. * Initialize a git repository in the same directory and checking the feature repository into version control. - diff --git a/docs/how-to-guides/feast-gcp-aws/deploy-a-feature-store.md b/docs/how-to-guides/feast-snowflake-gcp-aws/deploy-a-feature-store.md similarity index 100% rename from docs/how-to-guides/feast-gcp-aws/deploy-a-feature-store.md rename to docs/how-to-guides/feast-snowflake-gcp-aws/deploy-a-feature-store.md diff --git a/docs/how-to-guides/feast-gcp-aws/install-feast.md b/docs/how-to-guides/feast-snowflake-gcp-aws/install-feast.md similarity index 80% rename from docs/how-to-guides/feast-gcp-aws/install-feast.md rename to docs/how-to-guides/feast-snowflake-gcp-aws/install-feast.md index 019231be09..26d95c6117 100644 --- a/docs/how-to-guides/feast-gcp-aws/install-feast.md +++ b/docs/how-to-guides/feast-snowflake-gcp-aws/install-feast.md @@ -6,6 +6,12 @@ Install Feast using [pip](https://pip.pypa.io): pip install feast ``` +Install Feast with Snowflake dependencies (required when using Snowflake): + +``` +pip install 'feast[snowflake]' +``` + Install Feast with GCP dependencies (required when using BigQuery or Firestore): ``` diff --git a/docs/how-to-guides/feast-gcp-aws/load-data-into-the-online-store.md b/docs/how-to-guides/feast-snowflake-gcp-aws/load-data-into-the-online-store.md similarity index 100% rename from docs/how-to-guides/feast-gcp-aws/load-data-into-the-online-store.md rename to docs/how-to-guides/feast-snowflake-gcp-aws/load-data-into-the-online-store.md diff --git a/docs/how-to-guides/feast-gcp-aws/read-features-from-the-online-store.md b/docs/how-to-guides/feast-snowflake-gcp-aws/read-features-from-the-online-store.md similarity index 100% rename from docs/how-to-guides/feast-gcp-aws/read-features-from-the-online-store.md rename to docs/how-to-guides/feast-snowflake-gcp-aws/read-features-from-the-online-store.md diff --git a/docs/reference/data-sources/README.md b/docs/reference/data-sources/README.md index 6732fc16a0..fc6e136a9c 100644 --- a/docs/reference/data-sources/README.md +++ b/docs/reference/data-sources/README.md @@ -4,7 +4,8 @@ Please see [Data Source](../../getting-started/concepts/feature-view.md#data-sou {% page-ref page="file.md" %} +{% page-ref page="snowflake.md" %} + {% page-ref page="bigquery.md" %} {% page-ref page="redshift.md" %} - diff --git a/docs/reference/data-sources/snowflake.md b/docs/reference/data-sources/snowflake.md new file mode 100644 index 0000000000..0f5304b6cd --- /dev/null +++ b/docs/reference/data-sources/snowflake.md @@ -0,0 +1,44 @@ +# Snowflake + +## Description + +Snowflake data sources allow for the retrieval of historical feature values from Snowflake for building training datasets as well as materializing features into an online store. + +* Either a table reference or a SQL query can be provided. + +## Examples + +Using a table reference + +```python +from feast import SnowflakeSource + +my_snowflake_source = SnowflakeSource( + database="FEAST", + schema="PUBLIC", + table="FEATURE_TABLE", +) +``` + +Using a query + +```python +from feast import SnowflakeSource + +my_snowflake_source = SnowflakeSource( + query=""" + SELECT + timestamp_column AS "ts", + "created", + "f1", + "f2" + FROM + `FEAST.PUBLIC.FEATURE_TABLE` + """, +) +``` + +One thing to remember is how Snowflake handles table and column name conventions. +You can read more about quote identifiers [here](https://docs.snowflake.com/en/sql-reference/identifiers-syntax.html) + +Configuration options are available [here](https://rtd.feast.dev/en/latest/index.html#feast.data_source.SnowflakeSource). diff --git a/docs/reference/offline-stores/README.md b/docs/reference/offline-stores/README.md index 1260fe8b29..141a34d03b 100644 --- a/docs/reference/offline-stores/README.md +++ b/docs/reference/offline-stores/README.md @@ -4,7 +4,8 @@ Please see [Offline Store](../../getting-started/architecture-and-components/off {% page-ref page="file.md" %} +{% page-ref page="snowflake.md" %} + {% page-ref page="bigquery.md" %} {% page-ref page="redshift.md" %} - diff --git a/docs/reference/offline-stores/snowflake.md b/docs/reference/offline-stores/snowflake.md new file mode 100644 index 0000000000..fcf9a7a6fd --- /dev/null +++ b/docs/reference/offline-stores/snowflake.md @@ -0,0 +1,30 @@ +# Snowflake + +## Description + +The Snowflake offline store provides support for reading [SnowflakeSources](../data-sources/snowflake.md). + +* Snowflake tables and views are allowed as sources. +* All joins happen within Snowflake. +* Entity dataframes can be provided as a SQL query or can be provided as a Pandas dataframe. Pandas dataframes will be uploaded to Snowflake in order to complete join operations. +* A [SnowflakeRetrievalJob](https://github.com/feast-dev/feast/blob/bf557bcb72c7878a16dccb48443bbbe9dc3efa49/sdk/python/feast/infra/offline_stores/snowflake.py#L185) is returned when calling `get_historical_features()`. + +## Example + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: local +offline_store: + type: snowflake.offline + account: snowflake_deployment.us-east-1 + user: user_login + password: user_password + role: sysadmin + warehouse: demo_wh + database: FEAST +``` +{% endcode %} + +Configuration options are available [here](https://github.com/feast-dev/feast/blob/bf557bcb72c7878a16dccb48443bbbe9dc3efa49/sdk/python/feast/infra/offline_stores/snowflake.py#L39). diff --git a/docs/reference/offline-stores/untitled.md b/docs/reference/offline-stores/untitled.md deleted file mode 100644 index 8ffa566a70..0000000000 --- a/docs/reference/offline-stores/untitled.md +++ /dev/null @@ -1,26 +0,0 @@ -# BigQuery - -### Description - -The BigQuery offline store provides support for reading [BigQuerySources](../data-sources/bigquery.md). - -* BigQuery tables and views are allowed as sources. -* All joins happen within BigQuery. -* Entity dataframes can be provided as a SQL query or can be provided as a Pandas dataframe. Pandas dataframes will be uploaded to BigQuery in order to complete join operations. -* A [BigQueryRetrievalJob](https://github.com/feast-dev/feast/blob/c50a36ec1ad5b8d81c6f773c23204db7c7a7d218/sdk/python/feast/infra/offline_stores/bigquery.py#L210) is returned when calling `get_historical_features()`. - -### Example - -{% code title="feature\_store.yaml" %} -```yaml -project: my_feature_repo -registry: gs://my-bucket/data/registry.db -provider: gcp -offline_store: - type: bigquery - dataset: feast_bq_dataset -``` -{% endcode %} - -Configuration options are available [here](https://rtd.feast.dev/en/latest/#feast.repo_config.BigQueryOfflineStoreConfig). - diff --git a/docs/reference/online-stores/README.md b/docs/reference/online-stores/README.md index aadcc0eb65..2c2902bc57 100644 --- a/docs/reference/online-stores/README.md +++ b/docs/reference/online-stores/README.md @@ -9,4 +9,3 @@ Please see [Online Store](../../getting-started/architecture-and-components/onli {% page-ref page="datastore.md" %} {% page-ref page="dynamodb.md" %} - diff --git a/docs/reference/providers/README.md b/docs/reference/providers/README.md index 7eb992d5ac..dc52d92726 100644 --- a/docs/reference/providers/README.md +++ b/docs/reference/providers/README.md @@ -7,4 +7,3 @@ Please see [Provider](../../getting-started/architecture-and-components/provider {% page-ref page="google-cloud-platform.md" %} {% page-ref page="amazon-web-services.md" %} - diff --git a/docs/roadmap.md b/docs/roadmap.md index 723bfba82a..42da01fcba 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -8,6 +8,7 @@ The list below contains the functionality that contributors are planning to deve * Want to speak to a Feast contributor? We are more than happy to jump on a call. Please schedule a time using [Calendly](https://calendly.com/d/x2ry-g5bb/meet-with-feast-team). * **Data Sources** + * [x] [Snowflake source](https://docs.feast.dev/reference/data-sources/snowflake) * [x] [Redshift source](https://docs.feast.dev/reference/data-sources/redshift) * [x] [BigQuery source](https://docs.feast.dev/reference/data-sources/bigquery) * [x] [Parquet file source](https://docs.feast.dev/reference/data-sources/file) @@ -18,6 +19,7 @@ The list below contains the functionality that contributors are planning to deve * [x] [Snowflake source (community plugin)](https://github.com/sfc-gh-madkins/feast-snowflake) * [ ] HTTP source * **Offline Stores** + * [x] [Snowflake](https://docs.feast.dev/reference/offline-stores/snowflake) * [x] [Redshift](https://docs.feast.dev/reference/offline-stores/redshift) * [x] [BigQuery](https://docs.feast.dev/reference/offline-stores/bigquery) * [x] [Synapse (community plugin)](https://github.com/Azure/feast-azure) diff --git a/docs/specs/offline_store_format.md b/docs/specs/offline_store_format.md index 6826c50190..ac829dd52f 100644 --- a/docs/specs/offline_store_format.md +++ b/docs/specs/offline_store_format.md @@ -7,8 +7,8 @@ One of the design goals of Feast is being able to plug seamlessly into existing Feast provides first class support for the following data warehouses (DWH) to store feature data offline out of the box: * [BigQuery](https://cloud.google.com/bigquery) -* [Snowflake](https://www.snowflake.com/) (Coming Soon) -* [Redshift](https://aws.amazon.com/redshift/) (Coming Soon) +* [Snowflake](https://www.snowflake.com/) +* [Redshift](https://aws.amazon.com/redshift/) The integration between Feast and the DWH is highly configurable, but at the same time there are some non-configurable implications and assumptions that Feast imposes on table schemas and mapping between database-native types and Feast type system. This is what this document is about. @@ -28,14 +28,14 @@ Feature data is stored in tables in the DWH. There is one DWH table per Feast Fe ## Type mappings #### Pandas types -Here's how Feast types map to Pandas types for Feast APIs that take in or return a Pandas dataframe: +Here's how Feast types map to Pandas types for Feast APIs that take in or return a Pandas dataframe: | Feast Type | Pandas Type | |-------------|--| | Event Timestamp | `datetime64[ns]` | | BYTES | `bytes` | | STRING | `str` , `category`| -| INT32 | `int32`, `uint32` | +| INT32 | `int16`, `uint16`, `int32`, `uint32` | | INT64 | `int64`, `uint64` | | UNIX_TIMESTAMP | `datetime64[ns]`, `datetime64[ns, tz]` | | DOUBLE | `float64` | @@ -80,3 +80,17 @@ Here's how Feast types map to BigQuery types when using BigQuery for offline sto | BOOL\_LIST | `ARRAY`| Values that are not specified by the table above will cause an error on conversion. + +#### Snowflake Types +Here's how Feast types map to Snowflake types when using Snowflake for offline storage +See source here: +https://docs.snowflake.com/en/user-guide/python-connector-pandas.html#snowflake-to-pandas-data-mapping + +| Feast Type | Snowflake Python Type | +|-------------|--| +| Event Timestamp | `DATETIME64[NS]` | +| UNIX_TIMESTAMP | `DATETIME64[NS]` | +| STRING | `STR` | +| INT32 | `INT8 / UINT8 / INT16 / UINT16 / INT32 / UINT32` | +| INT64 | `INT64 / UINT64` | +| DOUBLE | `FLOAT64` | diff --git a/docs/tutorials/driver-stats-using-snowflake.md b/docs/tutorials/driver-stats-using-snowflake.md new file mode 100644 index 0000000000..c51fc9b1ce --- /dev/null +++ b/docs/tutorials/driver-stats-using-snowflake.md @@ -0,0 +1,140 @@ +--- +description: >- + Initial demonstration of using Snowflake with Feast as both and Offline & Online store + using the snowflake demo template. +--- + +# Drivers Stats using Snowflake + +In the following steps below, we will setup a sample feast project that leverages Snowflake +as an Offline Store. + +Starting with data in a Snowflake table, we will register that table to the feature store and +define features associated with the columns in that table. From there, we will generate historical +training data based on those feature definitions. We then will materialize the latest feature values +given our feature definitions into our online feature store. Lastly, we will then call +for those latest feature values. + +Our template that you will leverage will generate new data related to driver statistics. +From there, we will show you code snippets that will call to the offline store for generating +training datasets, and then the code for calling the online store to serve you the +latest feature values to serve models in production. + +## Snowflake Offline/Online Store Example + +#### Install feast-snowflake + +```shell +pip install feast[snowflake] +``` + +#### Get a Snowflake Trial Account (Optional) + +[Snowflake Trial Account](trial.snowflake.com) + +#### Create a feature repository + +```shell +feast init -t snowflake {feature_repo_name} +Snowflake Deployment URL (exclude .snowflakecomputing.com): +Snowflake User Name:: +Snowflake Password:: +Snowflake Role Name (Case Sensitive):: +Snowflake Warehouse Name (Case Sensitive):: +Snowflake Database Name (Case Sensitive):: +Should I upload example data to Snowflake (overwrite table)? [Y/n]: Y +cd {feature_repo_name} +``` + +The following files will automatically be created in your project folder: + +* feature_store.yaml -- This is your main configuration file +* driver_repo.py -- This is your main feature definition file +* test.py -- This is a file to test your feature store configuration + +#### Inspect `feature_store.yaml` + +Here you will see the information that you entered. This template will look to use +Snowflake as both an Offline & Online store. The main thing to remember is by default, +Snowflake Objects have ALL CAPS names unless lower case was specified. + +{% code title="feature_store.yaml" %} +```yaml +project: ... +registry: ... +provider: local +offline_store: + type: snowflake.offline + account: SNOWFLAKE_DEPLOYMENT_URL #drop .snowflakecomputing.com + user: USERNAME + password: PASSWORD + role: ROLE_NAME #case sensitive + warehouse: WAREHOUSE_NAME #case sensitive + database: DATABASE_NAME #case cap sensitive +``` +{% endcode %} + +#### Run our test python script `test.py` + +```shell +python test.py +``` + +## What we did in `test.py` + +#### Initialize our Feature Store +{% code title="test.py" %} +```python +from datetime import datetime, timedelta + +import pandas as pd +from driver_repo import driver, driver_stats_fv + +from feast import FeatureStore + +fs = FeatureStore(repo_path=".") + +fs.apply([driver, driver_stats_fv]) +``` +{% endcode %} + +#### Create a dummy training dataframe, then call our Offline store to add additional columns +{% code title="test.py" %} +```python +entity_df = pd.DataFrame( + { + "event_timestamp": [ + pd.Timestamp(dt, unit="ms", tz="UTC").round("ms") + for dt in pd.date_range( + start=datetime.now() - timedelta(days=3), + end=datetime.now(), + periods=3, + ) + ], + "driver_id": [1001, 1002, 1003], + } +) + +features = ["driver_hourly_stats:conv_rate", "driver_hourly_stats:acc_rate"] + +training_df = fs.get_historical_features( + features=features, entity_df=entity_df +).to_df() +``` +{% endcode %} + +#### Materialize the latest feature values into our Online store +{% code title="test.py" %} +```python +fs.materialize_incremental(end_date=datetime.now()) +``` +{% endcode %} + +#### Retrieve the latest values from our Online store based on our Entity Key +{% code title="test.py" %} +```python +online_features = fs.get_online_features( + features=features, entity_rows=[{"driver_id": 1001}, {"driver_id": 1002}], +).to_dict() +``` +{% endcode %} diff --git a/docs/tutorials/tutorials-overview.md b/docs/tutorials/tutorials-overview.md index a523f9b38e..86a8c25371 100644 --- a/docs/tutorials/tutorials-overview.md +++ b/docs/tutorials/tutorials-overview.md @@ -8,3 +8,4 @@ These Feast tutorials showcase how to use Feast to simplify end to end model tra {% page-ref page="real-time-credit-scoring-on-aws.md" %} +{% page-ref page="driver-stats-using-snowflake.md" %} diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index ee5c6939d7..41bba6443f 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -32,19 +32,22 @@ message DataSource { reserved 6 to 10; // Type of Data Source. + // Next available id: 9 enum SourceType { INVALID = 0; BATCH_FILE = 1; + BATCH_SNOWFLAKE = 8; BATCH_BIGQUERY = 2; + BATCH_REDSHIFT = 5; STREAM_KAFKA = 3; STREAM_KINESIS = 4; - BATCH_REDSHIFT = 5; CUSTOM_SOURCE = 6; REQUEST_SOURCE = 7; + } SourceType type = 1; - // Defines mapping between fields in the sourced data + // Defines mapping between fields in the sourced data // and fields in parent FeatureTable. map field_mapping = 2; @@ -128,6 +131,22 @@ message DataSource { string schema = 3; } + // Defines options for DataSource that sources features from a Snowflake Query + message SnowflakeOptions { + // Snowflake table name + string table = 1; + + // SQL query that returns a table containing feature data. Must contain an event_timestamp column, and respective + // entity columns + string query = 2; + + // Snowflake schema name + string schema = 3; + + // Snowflake schema name + string database = 4; + } + // Defines configuration for custom third-party data sources. message CustomSourceOptions { // Serialized configuration information for the data source. The implementer of the custom data source is @@ -153,5 +172,6 @@ message DataSource { RedshiftOptions redshift_options = 15; RequestDataOptions request_data_options = 18; CustomSourceOptions custom_options = 16; + SnowflakeOptions snowflake_options = 19; } } diff --git a/protos/feast/core/SavedDataset.proto b/protos/feast/core/SavedDataset.proto index 6ec9df0835..ebd2e56d35 100644 --- a/protos/feast/core/SavedDataset.proto +++ b/protos/feast/core/SavedDataset.proto @@ -53,6 +53,7 @@ message SavedDatasetStorage { DataSource.FileOptions file_storage = 4; DataSource.BigQueryOptions bigquery_storage = 5; DataSource.RedshiftOptions redshift_storage = 6; + DataSource.SnowflakeOptions snowflake_storage = 7; } } diff --git a/sdk/python/feast/__init__.py b/sdk/python/feast/__init__.py index eada13f995..9f78f9d98b 100644 --- a/sdk/python/feast/__init__.py +++ b/sdk/python/feast/__init__.py @@ -5,6 +5,7 @@ from feast.infra.offline_stores.bigquery_source import BigQuerySource from feast.infra.offline_stores.file_source import FileSource from feast.infra.offline_stores.redshift_source import RedshiftSource +from feast.infra.offline_stores.snowflake_source import SnowflakeSource from .data_source import KafkaSource, KinesisSource, SourceType from .entity import Entity @@ -43,4 +44,5 @@ "BigQuerySource", "FileSource", "RedshiftSource", + "SnowflakeSource", ] diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 4950977e2a..f6d326410a 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -477,7 +477,7 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List @click.option( "--template", "-t", - type=click.Choice(["local", "gcp", "aws"], case_sensitive=False), + type=click.Choice(["local", "gcp", "aws", "snowflake"], case_sensitive=False), help="Specify a template for the created project", default="local", ) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index b30340f0d2..94910c6c08 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -360,6 +360,12 @@ def from_proto(data_source: DataSourceProto) -> Any: from feast.infra.offline_stores.redshift_source import RedshiftSource data_source_obj = RedshiftSource.from_proto(data_source) + + elif data_source.snowflake_options.table or data_source.snowflake_options.query: + from feast.infra.offline_stores.snowflake_source import SnowflakeSource + + data_source_obj = SnowflakeSource.from_proto(data_source) + elif ( data_source.kafka_options.bootstrap_servers and data_source.kafka_options.topic diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 3fc8c7571e..17147f8a60 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -250,6 +250,16 @@ def __init__(self, table_name: str): ) +class SnowflakeCredentialsError(Exception): + def __init__(self): + super().__init__("Snowflake Connector failed due to incorrect credentials") + + +class SnowflakeQueryError(Exception): + def __init__(self, details): + super().__init__(f"Snowflake SQL Query failed to finish. Details: {details}") + + class EntityTimestampInferenceException(Exception): def __init__(self, expected_column_name: str): super().__init__( @@ -310,3 +320,13 @@ def __init__(self, actual_class: str, expected_class: str): class FeastInvalidInfraObjectType(Exception): def __init__(self): super().__init__("Could not identify the type of the InfraObject.") + + +class SnowflakeIncompleteConfig(Exception): + def __init__(self, e: KeyError): + super().__init__(f"{e} not defined in a config file or feature_store.yaml file") + + +class SnowflakeQueryUnknownError(Exception): + def __init__(self, query: str): + super().__init__(f"Snowflake query failed: {query}") diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 642a3c6442..ce8fa919f1 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -1,7 +1,14 @@ import re from typing import List -from feast import BigQuerySource, Entity, Feature, FileSource, RedshiftSource +from feast import ( + BigQuerySource, + Entity, + Feature, + FileSource, + RedshiftSource, + SnowflakeSource, +) from feast.data_source import DataSource from feast.errors import RegistryInferenceFailure from feast.feature_view import FeatureView @@ -83,6 +90,8 @@ def update_data_sources_with_inferred_event_timestamp_col( ts_column_type_regex_pattern = "TIMESTAMP|DATETIME" elif isinstance(data_source, RedshiftSource): ts_column_type_regex_pattern = "TIMESTAMP[A-Z]*" + elif isinstance(data_source, SnowflakeSource): + ts_column_type_regex_pattern = "TIMESTAMP_[A-Z]*" else: raise RegistryInferenceFailure( "DataSource", @@ -92,8 +101,10 @@ def update_data_sources_with_inferred_event_timestamp_col( """, ) # for informing the type checker - assert isinstance(data_source, FileSource) or isinstance( - data_source, BigQuerySource + assert ( + isinstance(data_source, FileSource) + or isinstance(data_source, BigQuerySource) + or isinstance(data_source, SnowflakeSource) ) # loop through table columns to find singular match diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py new file mode 100644 index 0000000000..ee8cd71ce0 --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -0,0 +1,632 @@ +import contextlib +import os +from datetime import datetime +from pathlib import Path +from typing import ( + Callable, + ContextManager, + Dict, + Iterator, + List, + Optional, + Tuple, + Union, + cast, +) + +import numpy as np +import pandas as pd +import pyarrow as pa +from pydantic import Field +from pydantic.typing import Literal +from pytz import utc + +from feast import OnDemandFeatureView +from feast.data_source import DataSource +from feast.errors import InvalidEntityType +from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView +from feast.infra.offline_stores import offline_utils +from feast.infra.offline_stores.offline_store import ( + OfflineStore, + RetrievalJob, + RetrievalMetadata, +) +from feast.infra.offline_stores.snowflake_source import ( + SavedDatasetSnowflakeStorage, + SnowflakeSource, +) +from feast.infra.utils.snowflake_utils import ( + execute_snowflake_statement, + get_snowflake_conn, + write_pandas, +) +from feast.registry import Registry +from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.saved_dataset import SavedDatasetStorage +from feast.usage import log_exceptions_and_usage + +try: + from snowflake.connector import SnowflakeConnection +except ImportError as e: + from feast.errors import FeastExtrasDependencyImportError + + raise FeastExtrasDependencyImportError("snowflake", str(e)) + + +class SnowflakeOfflineStoreConfig(FeastConfigBaseModel): + """ Offline store config for Snowflake """ + + type: Literal["snowflake.offline"] = "snowflake.offline" + """ Offline store type selector""" + + config_path: Optional[str] = ( + Path(os.environ["HOME"]) / ".snowsql/config" + ).__str__() + """ Snowflake config path -- absolute path required (Cant use ~)""" + + account: Optional[str] = None + """ Snowflake deployment identifier -- drop .snowflakecomputing.com""" + + user: Optional[str] = None + """ Snowflake user name """ + + password: Optional[str] = None + """ Snowflake password """ + + role: Optional[str] = None + """ Snowflake role name""" + + warehouse: Optional[str] = None + """ Snowflake warehouse name """ + + database: Optional[str] = None + """ Snowflake database name """ + + schema_: Optional[str] = Field("PUBLIC", alias="schema") + """ Snowflake schema name """ + + class Config: + allow_population_by_field_name = True + + +class SnowflakeOfflineStore(OfflineStore): + @staticmethod + @log_exceptions_and_usage(offline_store="snowflake") + def pull_latest_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + event_timestamp_column: str, + created_timestamp_column: Optional[str], + start_date: datetime, + end_date: datetime, + ) -> RetrievalJob: + assert isinstance(data_source, SnowflakeSource) + assert isinstance(config.offline_store, SnowflakeOfflineStoreConfig) + + from_expression = ( + data_source.get_table_query_string() + ) # returns schema.table as a string + + if join_key_columns: + partition_by_join_key_string = '"' + '", "'.join(join_key_columns) + '"' + partition_by_join_key_string = ( + "PARTITION BY " + partition_by_join_key_string + ) + else: + partition_by_join_key_string = "" + + timestamp_columns = [event_timestamp_column] + if created_timestamp_column: + timestamp_columns.append(created_timestamp_column) + + timestamp_desc_string = '"' + '" DESC, "'.join(timestamp_columns) + '" DESC' + field_string = ( + '"' + + '", "'.join(join_key_columns + feature_name_columns + timestamp_columns) + + '"' + ) + + snowflake_conn = get_snowflake_conn(config.offline_store) + + query = f""" + SELECT + {field_string} + {f''', TRIM({repr(DUMMY_ENTITY_VAL)}::VARIANT,'"') AS "{DUMMY_ENTITY_ID}"''' if not join_key_columns else ""} + FROM ( + SELECT {field_string}, + ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS "_feast_row" + FROM {from_expression} + WHERE "{event_timestamp_column}" BETWEEN TO_TIMESTAMP_NTZ({start_date.timestamp()}) AND TO_TIMESTAMP_NTZ({end_date.timestamp()}) + ) + WHERE "_feast_row" = 1 + """ + + return SnowflakeRetrievalJob( + query=query, + snowflake_conn=snowflake_conn, + config=config, + full_feature_names=False, + on_demand_feature_views=None, + ) + + @staticmethod + @log_exceptions_and_usage(offline_store="snowflake") + def pull_all_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + event_timestamp_column: str, + start_date: datetime, + end_date: datetime, + ) -> RetrievalJob: + assert isinstance(data_source, SnowflakeSource) + from_expression = data_source.get_table_query_string() + + field_string = ( + '"' + + '", "'.join( + join_key_columns + feature_name_columns + [event_timestamp_column] + ) + + '"' + ) + + snowflake_conn = get_snowflake_conn(config.offline_store) + + start_date = start_date.astimezone(tz=utc) + end_date = end_date.astimezone(tz=utc) + + query = f""" + SELECT {field_string} + FROM {from_expression} + WHERE "{event_timestamp_column}" BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' + """ + + return SnowflakeRetrievalJob( + query=query, + snowflake_conn=snowflake_conn, + config=config, + full_feature_names=False, + ) + + @staticmethod + @log_exceptions_and_usage(offline_store="snowflake") + def get_historical_features( + config: RepoConfig, + feature_views: List[FeatureView], + feature_refs: List[str], + entity_df: Union[pd.DataFrame, str], + registry: Registry, + project: str, + full_feature_names: bool = False, + ) -> RetrievalJob: + assert isinstance(config.offline_store, SnowflakeOfflineStoreConfig) + + snowflake_conn = get_snowflake_conn(config.offline_store) + + entity_schema = _get_entity_schema(entity_df, snowflake_conn, config) + + entity_df_event_timestamp_col = offline_utils.infer_event_timestamp_from_entity_df( + entity_schema + ) + + entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range( + entity_df, entity_df_event_timestamp_col, snowflake_conn, + ) + + @contextlib.contextmanager + def query_generator() -> Iterator[str]: + + table_name = offline_utils.get_temp_entity_table_name() + + _upload_entity_df(entity_df, snowflake_conn, config, table_name) + + expected_join_keys = offline_utils.get_expected_join_keys( + project, feature_views, registry + ) + + offline_utils.assert_expected_columns_in_entity_df( + entity_schema, expected_join_keys, entity_df_event_timestamp_col + ) + + # Build a query context containing all information required to template the Snowflake SQL query + query_context = offline_utils.get_feature_view_query_context( + feature_refs, + feature_views, + registry, + project, + entity_df_event_timestamp_range, + ) + + query_context = _fix_entity_selections_identifiers(query_context) + + # Generate the Snowflake SQL query from the query context + query = offline_utils.build_point_in_time_query( + query_context, + left_table_query_string=table_name, + entity_df_event_timestamp_col=entity_df_event_timestamp_col, + entity_df_columns=entity_schema.keys(), + query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + full_feature_names=full_feature_names, + ) + + yield query + + return SnowflakeRetrievalJob( + query=query_generator, + snowflake_conn=snowflake_conn, + config=config, + full_feature_names=full_feature_names, + on_demand_feature_views=OnDemandFeatureView.get_requested_odfvs( + feature_refs, project, registry + ), + metadata=RetrievalMetadata( + features=feature_refs, + keys=list(entity_schema.keys() - {entity_df_event_timestamp_col}), + min_event_timestamp=entity_df_event_timestamp_range[0], + max_event_timestamp=entity_df_event_timestamp_range[1], + ), + ) + + +class SnowflakeRetrievalJob(RetrievalJob): + def __init__( + self, + query: Union[str, Callable[[], ContextManager[str]]], + snowflake_conn: SnowflakeConnection, + config: RepoConfig, + full_feature_names: bool, + on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None, + metadata: Optional[RetrievalMetadata] = None, + ): + + if not isinstance(query, str): + self._query_generator = query + else: + + @contextlib.contextmanager + def query_generator() -> Iterator[str]: + assert isinstance(query, str) + yield query + + self._query_generator = query_generator + + self.snowflake_conn = snowflake_conn + self.config = config + self._full_feature_names = full_feature_names + self._on_demand_feature_views = ( + on_demand_feature_views if on_demand_feature_views else [] + ) + self._metadata = metadata + + @property + def full_feature_names(self) -> bool: + return self._full_feature_names + + @property + def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]: + return self._on_demand_feature_views + + def _to_df_internal(self) -> pd.DataFrame: + with self._query_generator() as query: + + df = execute_snowflake_statement( + self.snowflake_conn, query + ).fetch_pandas_all() + + return df + + def _to_arrow_internal(self) -> pa.Table: + with self._query_generator() as query: + + pa_table = execute_snowflake_statement( + self.snowflake_conn, query + ).fetch_arrow_all() + + if pa_table: + + return pa_table + else: + empty_result = execute_snowflake_statement(self.snowflake_conn, query) + + return pa.Table.from_pandas( + pd.DataFrame(columns=[md.name for md in empty_result.description]) + ) + + def to_snowflake(self, table_name: str) -> None: + """ Save dataset as a new Snowflake table """ + if self.on_demand_feature_views is not None: + transformed_df = self.to_df() + + write_pandas( + self.snowflake_conn, transformed_df, table_name, auto_create_table=True + ) + + return None + + with self._query_generator() as query: + query = f'CREATE TABLE IF NOT EXISTS "{table_name}" AS ({query});\n' + + execute_snowflake_statement(self.snowflake_conn, query) + + def to_sql(self) -> str: + """ + Returns the SQL query that will be executed in Snowflake to build the historical feature table. + """ + with self._query_generator() as query: + return query + + def to_arrow_chunks(self, arrow_options: Optional[Dict] = None) -> Optional[List]: + with self._query_generator() as query: + + arrow_batches = execute_snowflake_statement( + self.snowflake_conn, query + ).get_result_batches() + + return arrow_batches + + def persist(self, storage: SavedDatasetStorage): + assert isinstance(storage, SavedDatasetSnowflakeStorage) + self.to_snowflake(table_name=storage.snowflake_options.table) + + @property + def metadata(self) -> Optional[RetrievalMetadata]: + return self._metadata + + +def _get_entity_schema( + entity_df: Union[pd.DataFrame, str], + snowflake_conn: SnowflakeConnection, + config: RepoConfig, +) -> Dict[str, np.dtype]: + + if isinstance(entity_df, pd.DataFrame): + + return dict(zip(entity_df.columns, entity_df.dtypes)) + + else: + + query = f"SELECT * FROM ({entity_df}) LIMIT 1" + limited_entity_df = execute_snowflake_statement( + snowflake_conn, query + ).fetch_pandas_all() + + return dict(zip(limited_entity_df.columns, limited_entity_df.dtypes)) + + +def _upload_entity_df( + entity_df: Union[pd.DataFrame, str], + snowflake_conn: SnowflakeConnection, + config: RepoConfig, + table_name: str, +) -> None: + + if isinstance(entity_df, pd.DataFrame): + # Write the data from the DataFrame to the table + write_pandas( + snowflake_conn, + entity_df, + table_name, + auto_create_table=True, + create_temp_table=True, + ) + + return None + elif isinstance(entity_df, str): + # If the entity_df is a string (SQL query), create a Snowflake table out of it, + query = f'CREATE TEMPORARY TABLE "{table_name}" AS ({entity_df})' + execute_snowflake_statement(snowflake_conn, query) + + return None + else: + raise InvalidEntityType(type(entity_df)) + + +def _fix_entity_selections_identifiers(query_context) -> list: + + for i, qc in enumerate(query_context): + for j, es in enumerate(qc.entity_selections): + query_context[i].entity_selections[j] = f'"{es}"'.replace(" AS ", '" AS "') + + return query_context + + +def _get_entity_df_event_timestamp_range( + entity_df: Union[pd.DataFrame, str], + entity_df_event_timestamp_col: str, + snowflake_conn: SnowflakeConnection, +) -> Tuple[datetime, datetime]: + if isinstance(entity_df, pd.DataFrame): + entity_df_event_timestamp = entity_df.loc[ + :, entity_df_event_timestamp_col + ].infer_objects() + if pd.api.types.is_string_dtype(entity_df_event_timestamp): + entity_df_event_timestamp = pd.to_datetime( + entity_df_event_timestamp, utc=True + ) + entity_df_event_timestamp_range = ( + entity_df_event_timestamp.min().to_pydatetime(), + entity_df_event_timestamp.max().to_pydatetime(), + ) + elif isinstance(entity_df, str): + # If the entity_df is a string (SQL query), determine range + # from table + query = f'SELECT MIN("{entity_df_event_timestamp_col}") AS "min_value", MAX("{entity_df_event_timestamp_col}") AS "max_value" FROM ({entity_df})' + results = execute_snowflake_statement(snowflake_conn, query).fetchall() + + entity_df_event_timestamp_range = cast(Tuple[datetime, datetime], results[0]) + else: + raise InvalidEntityType(type(entity_df)) + + return entity_df_event_timestamp_range + + +MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """ +/* + Compute a deterministic hash for the `left_table_query_string` that will be used throughout + all the logic as the field to GROUP BY the data +*/ +WITH "entity_dataframe" AS ( + SELECT *, + "{{entity_df_event_timestamp_col}}" AS "entity_timestamp" + {% for featureview in featureviews %} + {% if featureview.entities %} + ,( + {% for entity in featureview.entities %} + CAST("{{entity}}" AS VARCHAR) || + {% endfor %} + CAST("{{entity_df_event_timestamp_col}}" AS VARCHAR) + ) AS "{{featureview.name}}__entity_row_unique_id" + {% else %} + ,CAST("{{entity_df_event_timestamp_col}}" AS VARCHAR) AS "{{featureview.name}}__entity_row_unique_id" + {% endif %} + {% endfor %} + FROM "{{ left_table_query_string }}" +), + +{% for featureview in featureviews %} + +"{{ featureview.name }}__entity_dataframe" AS ( + SELECT + {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} + "entity_timestamp", + "{{featureview.name}}__entity_row_unique_id" + FROM "entity_dataframe" + GROUP BY + {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} + "entity_timestamp", + "{{featureview.name}}__entity_row_unique_id" +), + +/* + This query template performs the point-in-time correctness join for a single feature set table + to the provided entity table. + + 1. We first join the current feature_view to the entity dataframe that has been passed. + This JOIN has the following logic: + - For each row of the entity dataframe, only keep the rows where the `event_timestamp_column` + is less than the one provided in the entity dataframe + - If there a TTL for the current feature_view, also keep the rows where the `event_timestamp_column` + is higher the the one provided minus the TTL + - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been + computed previously + + The output of this CTE will contain all the necessary information and already filtered out most + of the data that is not relevant. +*/ + +"{{ featureview.name }}__subquery" AS ( + SELECT + "{{ featureview.event_timestamp_column }}" as "event_timestamp", + {{'"' ~ featureview.created_timestamp_column ~ '" as "created_timestamp",' if featureview.created_timestamp_column else '' }} + {{featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %} + {% for feature in featureview.features %} + "{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %} + {% endfor %} + FROM {{ featureview.table_subquery }} + WHERE "{{ featureview.event_timestamp_column }}" <= '{{ featureview.max_event_timestamp }}' + {% if featureview.ttl == 0 %}{% else %} + AND "{{ featureview.event_timestamp_column }}" >= '{{ featureview.min_event_timestamp }}' + {% endif %} +), + +"{{ featureview.name }}__base" AS ( + SELECT + "subquery".*, + "entity_dataframe"."entity_timestamp", + "entity_dataframe"."{{featureview.name}}__entity_row_unique_id" + FROM "{{ featureview.name }}__subquery" AS "subquery" + INNER JOIN "{{ featureview.name }}__entity_dataframe" AS "entity_dataframe" + ON TRUE + AND "subquery"."event_timestamp" <= "entity_dataframe"."entity_timestamp" + + {% if featureview.ttl == 0 %}{% else %} + AND "subquery"."event_timestamp" >= TIMESTAMPADD(second,-{{ featureview.ttl }},"entity_dataframe"."entity_timestamp") + {% endif %} + + {% for entity in featureview.entities %} + AND "subquery"."{{ entity }}" = "entity_dataframe"."{{ entity }}" + {% endfor %} +), + +/* + 2. If the `created_timestamp_column` has been set, we need to + deduplicate the data first. This is done by calculating the + `MAX(created_at_timestamp)` for each event_timestamp. + We then join the data on the next CTE +*/ +{% if featureview.created_timestamp_column %} +"{{ featureview.name }}__dedup" AS ( + SELECT + "{{featureview.name}}__entity_row_unique_id", + "event_timestamp", + MAX("created_timestamp") AS "created_timestamp" + FROM "{{ featureview.name }}__base" + GROUP BY "{{featureview.name}}__entity_row_unique_id", "event_timestamp" +), +{% endif %} + +/* + 3. The data has been filtered during the first CTE "*__base" + Thus we only need to compute the latest timestamp of each feature. +*/ +"{{ featureview.name }}__latest" AS ( + SELECT + "event_timestamp", + {% if featureview.created_timestamp_column %}"created_timestamp",{% endif %} + "{{featureview.name}}__entity_row_unique_id" + FROM + ( + SELECT *, + ROW_NUMBER() OVER( + PARTITION BY "{{featureview.name}}__entity_row_unique_id" + ORDER BY "event_timestamp" DESC{% if featureview.created_timestamp_column %},"created_timestamp" DESC{% endif %} + ) AS "row_number" + FROM "{{ featureview.name }}__base" + {% if featureview.created_timestamp_column %} + INNER JOIN "{{ featureview.name }}__dedup" + USING ("{{featureview.name}}__entity_row_unique_id", "event_timestamp", "created_timestamp") + {% endif %} + ) + WHERE "row_number" = 1 +), + +/* + 4. Once we know the latest value of each feature for a given timestamp, + we can join again the data back to the original "base" dataset +*/ +"{{ featureview.name }}__cleaned" AS ( + SELECT "base".* + FROM "{{ featureview.name }}__base" AS "base" + INNER JOIN "{{ featureview.name }}__latest" + USING( + "{{featureview.name}}__entity_row_unique_id", + "event_timestamp" + {% if featureview.created_timestamp_column %} + ,"created_timestamp" + {% endif %} + ) +){% if loop.last %}{% else %}, {% endif %} + + +{% endfor %} +/* + Joins the outputs of multiple time travel joins to a single table. + The entity_dataframe dataset being our source of truth here. + */ + +SELECT "{{ final_output_feature_names | join('", "')}}" +FROM "entity_dataframe" +{% for featureview in featureviews %} +LEFT JOIN ( + SELECT + "{{featureview.name}}__entity_row_unique_id" + {% for feature in featureview.features %} + ,{% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %} + {% endfor %} + FROM "{{ featureview.name }}__cleaned" +) "{{ featureview.name }}__cleaned" USING ("{{featureview.name}}__entity_row_unique_id") +{% endfor %} +""" diff --git a/sdk/python/feast/infra/offline_stores/snowflake_source.py b/sdk/python/feast/infra/offline_stores/snowflake_source.py new file mode 100644 index 0000000000..b5d50be0f4 --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/snowflake_source.py @@ -0,0 +1,315 @@ +from typing import Callable, Dict, Iterable, Optional, Tuple + +from feast import type_map +from feast.data_source import DataSource +from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.protos.feast.core.SavedDataset_pb2 import ( + SavedDatasetStorage as SavedDatasetStorageProto, +) +from feast.repo_config import RepoConfig +from feast.saved_dataset import SavedDatasetStorage +from feast.value_type import ValueType + + +class SnowflakeSource(DataSource): + def __init__( + self, + database: Optional[str] = None, + schema: Optional[str] = None, + table: Optional[str] = None, + query: Optional[str] = None, + event_timestamp_column: Optional[str] = "", + created_timestamp_column: Optional[str] = "", + field_mapping: Optional[Dict[str, str]] = None, + date_partition_column: Optional[str] = "", + ): + """ + Creates a SnowflakeSource object. + + Args: + database (optional): Snowflake database where the features are stored. + schema (optional): Snowflake schema in which the table is located. + table (optional): Snowflake table where the features are stored. + event_timestamp_column (optional): Event timestamp column used for point in + time joins of feature values. + query (optional): The query to be executed to obtain the features. + created_timestamp_column (optional): Timestamp column indicating when the + row was created, used for deduplicating rows. + field_mapping (optional): A dictionary mapping of column names in this data + source to column names in a feature table or view. + date_partition_column (optional): Timestamp column used for partitioning. + + """ + super().__init__( + event_timestamp_column, + created_timestamp_column, + field_mapping, + date_partition_column, + ) + + # The default Snowflake schema is named "PUBLIC". + _schema = "PUBLIC" if (database and table and not schema) else schema + + self._snowflake_options = SnowflakeOptions( + database=database, schema=_schema, table=table, query=query + ) + + @staticmethod + def from_proto(data_source: DataSourceProto): + """ + Creates a SnowflakeSource from a protobuf representation of a SnowflakeSource. + + Args: + data_source: A protobuf representation of a SnowflakeSource + + Returns: + A SnowflakeSource object based on the data_source protobuf. + """ + return SnowflakeSource( + field_mapping=dict(data_source.field_mapping), + database=data_source.snowflake_options.database, + schema=data_source.snowflake_options.schema, + table=data_source.snowflake_options.table, + event_timestamp_column=data_source.event_timestamp_column, + created_timestamp_column=data_source.created_timestamp_column, + date_partition_column=data_source.date_partition_column, + query=data_source.snowflake_options.query, + ) + + def __eq__(self, other): + if not isinstance(other, SnowflakeSource): + raise TypeError( + "Comparisons should only involve SnowflakeSource class objects." + ) + + return ( + self.snowflake_options.database == other.snowflake_options.database + and self.snowflake_options.schema == other.snowflake_options.schema + and self.snowflake_options.table == other.snowflake_options.table + and self.snowflake_options.query == other.snowflake_options.query + and self.event_timestamp_column == other.event_timestamp_column + and self.created_timestamp_column == other.created_timestamp_column + and self.field_mapping == other.field_mapping + ) + + @property + def database(self): + """Returns the database of this snowflake source.""" + return self._snowflake_options.database + + @property + def schema(self): + """Returns the schema of this snowflake source.""" + return self._snowflake_options.schema + + @property + def table(self): + """Returns the table of this snowflake source.""" + return self._snowflake_options.table + + @property + def query(self): + """Returns the snowflake options of this snowflake source.""" + return self._snowflake_options.query + + @property + def snowflake_options(self): + """Returns the snowflake options of this snowflake source.""" + return self._snowflake_options + + @snowflake_options.setter + def snowflake_options(self, _snowflake_options): + """Sets the snowflake options of this snowflake source.""" + self._snowflake_options = _snowflake_options + + def to_proto(self) -> DataSourceProto: + """ + Converts a SnowflakeSource object to its protobuf representation. + + Returns: + A DataSourceProto object. + """ + data_source_proto = DataSourceProto( + type=DataSourceProto.BATCH_SNOWFLAKE, + field_mapping=self.field_mapping, + snowflake_options=self.snowflake_options.to_proto(), + ) + + data_source_proto.event_timestamp_column = self.event_timestamp_column + data_source_proto.created_timestamp_column = self.created_timestamp_column + data_source_proto.date_partition_column = self.date_partition_column + + return data_source_proto + + def validate(self, config: RepoConfig): + # As long as the query gets successfully executed, or the table exists, + # the data source is validated. We don't need the results though. + self.get_table_column_names_and_types(config) + + def get_table_query_string(self) -> str: + """Returns a string that can directly be used to reference this table in SQL.""" + if self.database and self.table: + return f'"{self.database}"."{self.schema}"."{self.table}"' + elif self.table: + return f'"{self.table}"' + else: + return f"({self.query})" + + @staticmethod + def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: + return type_map.snowflake_python_type_to_feast_value_type + + def get_table_column_names_and_types( + self, config: RepoConfig + ) -> Iterable[Tuple[str, str]]: + """ + Returns a mapping of column names to types for this snowflake source. + + Args: + config: A RepoConfig describing the feature repo + """ + + from feast.infra.offline_stores.snowflake import SnowflakeOfflineStoreConfig + from feast.infra.utils.snowflake_utils import ( + execute_snowflake_statement, + get_snowflake_conn, + ) + + assert isinstance(config.offline_store, SnowflakeOfflineStoreConfig) + + snowflake_conn = get_snowflake_conn(config.offline_store) + + if self.database and self.table: + query = f'SELECT * FROM "{self.database}"."{self.schema}"."{self.table}" LIMIT 1' + elif self.table: + query = f'SELECT * FROM "{self.table}" LIMIT 1' + else: + query = f"SELECT * FROM ({self.query}) LIMIT 1" + + result = execute_snowflake_statement(snowflake_conn, query).fetch_pandas_all() + + if not result.empty: + metadata = result.dtypes.apply(str) + return list(zip(metadata.index, metadata)) + else: + raise ValueError("The following source:\n" + query + "\n ... is empty") + + +class SnowflakeOptions: + """ + DataSource snowflake options used to source features from snowflake query. + """ + + def __init__( + self, + database: Optional[str], + schema: Optional[str], + table: Optional[str], + query: Optional[str], + ): + self._database = database + self._schema = schema + self._table = table + self._query = query + + @property + def query(self): + """Returns the snowflake SQL query referenced by this source.""" + return self._query + + @query.setter + def query(self, query): + """Sets the snowflake SQL query referenced by this source.""" + self._query = query + + @property + def database(self): + """Returns the database name of this snowflake table.""" + return self._database + + @database.setter + def database(self, database): + """Sets the database ref of this snowflake table.""" + self._database = database + + @property + def schema(self): + """Returns the schema name of this snowflake table.""" + return self._schema + + @schema.setter + def schema(self, schema): + """Sets the schema of this snowflake table.""" + self._schema = schema + + @property + def table(self): + """Returns the table name of this snowflake table.""" + return self._table + + @table.setter + def table(self, table): + """Sets the table ref of this snowflake table.""" + self._table = table + + @classmethod + def from_proto(cls, snowflake_options_proto: DataSourceProto.SnowflakeOptions): + """ + Creates a SnowflakeOptions from a protobuf representation of a snowflake option. + + Args: + snowflake_options_proto: A protobuf representation of a DataSource + + Returns: + A SnowflakeOptions object based on the snowflake_options protobuf. + """ + snowflake_options = cls( + database=snowflake_options_proto.database, + schema=snowflake_options_proto.schema, + table=snowflake_options_proto.table, + query=snowflake_options_proto.query, + ) + + return snowflake_options + + def to_proto(self) -> DataSourceProto.SnowflakeOptions: + """ + Converts an SnowflakeOptionsProto object to its protobuf representation. + + Returns: + A SnowflakeOptionsProto protobuf. + """ + snowflake_options_proto = DataSourceProto.SnowflakeOptions( + database=self.database, + schema=self.schema, + table=self.table, + query=self.query, + ) + + return snowflake_options_proto + + +class SavedDatasetSnowflakeStorage(SavedDatasetStorage): + _proto_attr_name = "snowflake_storage" + + snowflake_options: SnowflakeOptions + + def __init__(self, table_ref: str): + self.snowflake_options = SnowflakeOptions( + database=None, schema=None, table=table_ref, query=None + ) + + @staticmethod + def from_proto(storage_proto: SavedDatasetStorageProto) -> SavedDatasetStorage: + + return SavedDatasetSnowflakeStorage( + table_ref=SnowflakeOptions.from_proto(storage_proto.snowflake_storage).table + ) + + def to_proto(self) -> SavedDatasetStorageProto: + return SavedDatasetStorageProto( + snowflake_storage=self.snowflake_options.to_proto() + ) + + def to_data_source(self) -> DataSource: + return SnowflakeSource(table=self.snowflake_options.table) diff --git a/sdk/python/feast/infra/utils/snowflake_utils.py b/sdk/python/feast/infra/utils/snowflake_utils.py new file mode 100644 index 0000000000..f280cfa218 --- /dev/null +++ b/sdk/python/feast/infra/utils/snowflake_utils.py @@ -0,0 +1,279 @@ +import configparser +import os +import random +import string +from logging import getLogger +from tempfile import TemporaryDirectory +from typing import Dict, Iterator, List, Optional, Tuple, cast + +import pandas as pd +import snowflake.connector +from snowflake.connector import ProgrammingError, SnowflakeConnection +from snowflake.connector.cursor import SnowflakeCursor +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential, +) + +from feast.errors import SnowflakeIncompleteConfig, SnowflakeQueryUnknownError + +getLogger("snowflake.connector.cursor").disabled = True +getLogger("snowflake.connector.connection").disabled = True +getLogger("snowflake.connector.network").disabled = True +logger = getLogger(__name__) + + +def execute_snowflake_statement(conn: SnowflakeConnection, query) -> SnowflakeCursor: + cursor = conn.cursor().execute(query) + if cursor is None: + raise SnowflakeQueryUnknownError(query) + return cursor + + +def get_snowflake_conn(config, autocommit=True) -> SnowflakeConnection: + if config.type == "snowflake.offline": + config_header = "connections.feast_offline_store" + + config = dict(config) + + # read config file + config_reader = configparser.ConfigParser() + config_reader.read([config["config_path"]]) + if config_reader.has_section(config_header): + kwargs = dict(config_reader[config_header]) + else: + kwargs = {} + + kwargs.update((k, v) for k, v in config.items() if v is not None) + + try: + conn = snowflake.connector.connect( + account=kwargs["account"], + user=kwargs["user"], + password=kwargs["password"], + role=f'''"{kwargs['role']}"''', + warehouse=f'''"{kwargs['warehouse']}"''', + database=f'''"{kwargs['database']}"''', + schema=f'''"{kwargs['schema_']}"''', + application="feast", + autocommit=autocommit, + ) + + return conn + except KeyError as e: + raise SnowflakeIncompleteConfig(e) + + +# TO DO -- sfc-gh-madkins +# Remove dependency on write_pandas function by falling back to native snowflake python connector +# Current issue is datetime[ns] types are read incorrectly in Snowflake, need to coerce to datetime[ns, UTC] +def write_pandas( + conn: SnowflakeConnection, + df: pd.DataFrame, + table_name: str, + database: Optional[str] = None, + schema: Optional[str] = None, + chunk_size: Optional[int] = None, + compression: str = "gzip", + on_error: str = "abort_statement", + parallel: int = 4, + quote_identifiers: bool = True, + auto_create_table: bool = False, + create_temp_table: bool = False, +): + """Allows users to most efficiently write back a pandas DataFrame to Snowflake. + + It works by dumping the DataFrame into Parquet files, uploading them and finally copying their data into the table. + + Returns whether all files were ingested correctly, number of chunks uploaded, and number of rows ingested + with all of the COPY INTO command's output for debugging purposes. + + Example usage: + import pandas + from snowflake.connector.pandas_tools import write_pandas + + df = pandas.DataFrame([('Mark', 10), ('Luke', 20)], columns=['name', 'balance']) + success, nchunks, nrows, _ = write_pandas(cnx, df, 'customers') + + Args: + conn: Connection to be used to communicate with Snowflake. + df: Dataframe we'd like to write back. + table_name: Table name where we want to insert into. + database: Database schema and table is in, if not provided the default one will be used (Default value = None). + schema: Schema table is in, if not provided the default one will be used (Default value = None). + chunk_size: Number of elements to be inserted once, if not provided all elements will be dumped once + (Default value = None). + compression: The compression used on the Parquet files, can only be gzip, or snappy. Gzip gives supposedly a + better compression, while snappy is faster. Use whichever is more appropriate (Default value = 'gzip'). + on_error: Action to take when COPY INTO statements fail, default follows documentation at: + https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html#copy-options-copyoptions + (Default value = 'abort_statement'). + parallel: Number of threads to be used when uploading chunks, default follows documentation at: + https://docs.snowflake.com/en/sql-reference/sql/put.html#optional-parameters (Default value = 4). + quote_identifiers: By default, identifiers, specifically database, schema, table and column names + (from df.columns) will be quoted. If set to False, identifiers are passed on to Snowflake without quoting. + I.e. identifiers will be coerced to uppercase by Snowflake. (Default value = True) + auto_create_table: When true, will automatically create a table with corresponding columns for each column in + the passed in DataFrame. The table will not be created if it already exists + create_temp_table: Will make the auto-created table as a temporary table + """ + if database is not None and schema is None: + raise ProgrammingError( + "Schema has to be provided to write_pandas when a database is provided" + ) + # This dictionary maps the compression algorithm to Snowflake put copy into command type + # https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html#type-parquet + compression_map = {"gzip": "auto", "snappy": "snappy"} + if compression not in compression_map.keys(): + raise ProgrammingError( + "Invalid compression '{}', only acceptable values are: {}".format( + compression, compression_map.keys() + ) + ) + if quote_identifiers: + location = ( + (('"' + database + '".') if database else "") + + (('"' + schema + '".') if schema else "") + + ('"' + table_name + '"') + ) + else: + location = ( + (database + "." if database else "") + + (schema + "." if schema else "") + + (table_name) + ) + if chunk_size is None: + chunk_size = len(df) + cursor: SnowflakeCursor = conn.cursor() + stage_name = create_temporary_sfc_stage(cursor) + + with TemporaryDirectory() as tmp_folder: + for i, chunk in chunk_helper(df, chunk_size): + chunk_path = os.path.join(tmp_folder, "file{}.txt".format(i)) + # Dump chunk into parquet file + chunk.to_parquet( + chunk_path, + compression=compression, + use_deprecated_int96_timestamps=True, + ) + # Upload parquet file + upload_sql = ( + "PUT /* Python:snowflake.connector.pandas_tools.write_pandas() */ " + "'file://{path}' @\"{stage_name}\" PARALLEL={parallel}" + ).format( + path=chunk_path.replace("\\", "\\\\").replace("'", "\\'"), + stage_name=stage_name, + parallel=parallel, + ) + logger.debug(f"uploading files with '{upload_sql}'") + cursor.execute(upload_sql, _is_internal=True) + # Remove chunk file + os.remove(chunk_path) + if quote_identifiers: + columns = '"' + '","'.join(list(df.columns)) + '"' + else: + columns = ",".join(list(df.columns)) + + if auto_create_table: + file_format_name = create_file_format(compression, compression_map, cursor) + infer_schema_sql = f"SELECT COLUMN_NAME, TYPE FROM table(infer_schema(location=>'@\"{stage_name}\"', file_format=>'{file_format_name}'))" + logger.debug(f"inferring schema with '{infer_schema_sql}'") + result_cursor = cursor.execute(infer_schema_sql, _is_internal=True) + if result_cursor is None: + raise SnowflakeQueryUnknownError(infer_schema_sql) + result = cast(List[Tuple[str, str]], result_cursor.fetchall()) + column_type_mapping: Dict[str, str] = dict(result) + # Infer schema can return the columns out of order depending on the chunking we do when uploading + # so we have to iterate through the dataframe columns to make sure we create the table with its + # columns in order + quote = '"' if quote_identifiers else "" + create_table_columns = ", ".join( + [f"{quote}{c}{quote} {column_type_mapping[c]}" for c in df.columns] + ) + create_table_sql = ( + f"CREATE {'TEMP ' if create_temp_table else ''}TABLE IF NOT EXISTS {location} " + f"({create_table_columns})" + f" /* Python:snowflake.connector.pandas_tools.write_pandas() */ " + ) + logger.debug(f"auto creating table with '{create_table_sql}'") + cursor.execute(create_table_sql, _is_internal=True) + drop_file_format_sql = f"DROP FILE FORMAT IF EXISTS {file_format_name}" + logger.debug(f"dropping file format with '{drop_file_format_sql}'") + cursor.execute(drop_file_format_sql, _is_internal=True) + + # in Snowflake, all parquet data is stored in a single column, $1, so we must select columns explicitly + # see (https://docs.snowflake.com/en/user-guide/script-data-load-transform-parquet.html) + if quote_identifiers: + parquet_columns = "$1:" + ",$1:".join(f'"{c}"' for c in df.columns) + else: + parquet_columns = "$1:" + ",$1:".join(df.columns) + copy_into_sql = ( + "COPY INTO {location} /* Python:snowflake.connector.pandas_tools.write_pandas() */ " + "({columns}) " + 'FROM (SELECT {parquet_columns} FROM @"{stage_name}") ' + "FILE_FORMAT=(TYPE=PARQUET COMPRESSION={compression}) " + "PURGE=TRUE ON_ERROR={on_error}" + ).format( + location=location, + columns=columns, + parquet_columns=parquet_columns, + stage_name=stage_name, + compression=compression_map[compression], + on_error=on_error, + ) + logger.debug("copying into with '{}'".format(copy_into_sql)) + # Snowflake returns the original cursor if the query execution succeeded. + result_cursor = cursor.execute(copy_into_sql, _is_internal=True) + if result_cursor is None: + raise SnowflakeQueryUnknownError(copy_into_sql) + result_cursor.close() + + +@retry( + wait=wait_exponential(multiplier=1, max=4), + retry=retry_if_exception_type(ProgrammingError), + stop=stop_after_attempt(5), + reraise=True, +) +def create_file_format( + compression: str, compression_map: Dict[str, str], cursor: SnowflakeCursor +) -> str: + file_format_name = ( + '"' + "".join(random.choice(string.ascii_lowercase) for _ in range(5)) + '"' + ) + file_format_sql = ( + f"CREATE FILE FORMAT {file_format_name} " + f"/* Python:snowflake.connector.pandas_tools.write_pandas() */ " + f"TYPE=PARQUET COMPRESSION={compression_map[compression]}" + ) + logger.debug(f"creating file format with '{file_format_sql}'") + cursor.execute(file_format_sql, _is_internal=True) + return file_format_name + + +@retry( + wait=wait_exponential(multiplier=1, max=4), + retry=retry_if_exception_type(ProgrammingError), + stop=stop_after_attempt(5), + reraise=True, +) +def create_temporary_sfc_stage(cursor: SnowflakeCursor) -> str: + stage_name = "".join(random.choice(string.ascii_lowercase) for _ in range(5)) + create_stage_sql = ( + "create temporary stage /* Python:snowflake.connector.pandas_tools.write_pandas() */ " + '"{stage_name}"' + ).format(stage_name=stage_name) + logger.debug(f"creating stage with '{create_stage_sql}'") + result_cursor = cursor.execute(create_stage_sql, _is_internal=True) + if result_cursor is None: + raise SnowflakeQueryUnknownError(create_stage_sql) + result_cursor.fetchall() + return stage_name + + +def chunk_helper(lst: pd.DataFrame, n: int) -> Iterator[Tuple[int, pd.DataFrame]]: + """Helper generator to chunk a sequence efficiently with current index like if enumerate was called on sequence.""" + for i in range(0, len(lst), n): + yield int(i / n), lst[i : i + n] diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index e8ba180568..3f32d18b80 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -31,12 +31,14 @@ "datastore": "feast.infra.online_stores.datastore.DatastoreOnlineStore", "redis": "feast.infra.online_stores.redis.RedisOnlineStore", "dynamodb": "feast.infra.online_stores.dynamodb.DynamoDBOnlineStore", + "snowflake.online": "feast.infra.online_stores.snowflake.SnowflakeOnlineStore", } OFFLINE_STORE_CLASS_FOR_TYPE = { "file": "feast.infra.offline_stores.file.FileOfflineStore", "bigquery": "feast.infra.offline_stores.bigquery.BigQueryOfflineStore", "redshift": "feast.infra.offline_stores.redshift.RedshiftOfflineStore", + "snowflake.offline": "feast.infra.offline_stores.snowflake.SnowflakeOfflineStore", } FEATURE_SERVER_CONFIG_CLASS_FOR_TYPE = { diff --git a/sdk/python/feast/templates/snowflake/bootstrap.py b/sdk/python/feast/templates/snowflake/bootstrap.py new file mode 100644 index 0000000000..3712651a5d --- /dev/null +++ b/sdk/python/feast/templates/snowflake/bootstrap.py @@ -0,0 +1,91 @@ +import click +import snowflake.connector + +from feast.infra.utils.snowflake_utils import write_pandas + + +def bootstrap(): + # Bootstrap() will automatically be called from the init_repo() during `feast init` + + import pathlib + from datetime import datetime, timedelta + + from feast.driver_test_data import create_driver_hourly_stats_df + + repo_path = pathlib.Path(__file__).parent.absolute() + config_file = repo_path / "feature_store.yaml" + + project_name = str(repo_path)[str(repo_path).rfind("/") + 1 :] + + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + + driver_entities = [1001, 1002, 1003, 1004, 1005] + driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date) + + repo_path = pathlib.Path(__file__).parent.absolute() + data_path = repo_path / "data" + data_path.mkdir(exist_ok=True) + driver_stats_path = data_path / "driver_stats.parquet" + driver_df.to_parquet(path=str(driver_stats_path), allow_truncated_timestamps=True) + + snowflake_deployment_url = click.prompt( + "Snowflake Deployment URL (exclude .snowflakecomputing.com):" + ) + snowflake_user = click.prompt("Snowflake User Name:") + snowflake_password = click.prompt("Snowflake Password:", hide_input=True) + snowflake_role = click.prompt("Snowflake Role Name (Case Sensitive):") + snowflake_warehouse = click.prompt("Snowflake Warehouse Name (Case Sensitive):") + snowflake_database = click.prompt("Snowflake Database Name (Case Sensitive):") + + if click.confirm( + f'Should I upload example data to Snowflake (overwriting "{project_name}_feast_driver_hourly_stats" table)?', + default=True, + ): + + conn = snowflake.connector.connect( + account=snowflake_deployment_url, + user=snowflake_user, + password=snowflake_password, + role=snowflake_role, + warehouse=snowflake_warehouse, + application="feast", + ) + + cur = conn.cursor() + cur.execute(f'CREATE DATABASE IF NOT EXISTS "{snowflake_database}"') + cur.execute(f'USE DATABASE "{snowflake_database}"') + cur.execute('CREATE SCHEMA IF NOT EXISTS "PUBLIC"') + cur.execute('USE SCHEMA "PUBLIC"') + cur.execute(f'DROP TABLE IF EXISTS "{project_name}_feast_driver_hourly_stats"') + write_pandas( + conn, + driver_df, + f"{project_name}_feast_driver_hourly_stats", + auto_create_table=True, + ) + conn.close() + + repo_path = pathlib.Path(__file__).parent.absolute() + config_file = repo_path / "feature_store.yaml" + + replace_str_in_file( + config_file, "SNOWFLAKE_DEPLOYMENT_URL", snowflake_deployment_url + ) + replace_str_in_file(config_file, "SNOWFLAKE_USER", snowflake_user) + replace_str_in_file(config_file, "SNOWFLAKE_PASSWORD", snowflake_password) + replace_str_in_file(config_file, "SNOWFLAKE_ROLE", snowflake_role) + replace_str_in_file(config_file, "SNOWFLAKE_WAREHOUSE", snowflake_warehouse) + replace_str_in_file(config_file, "SNOWFLAKE_DATABASE", snowflake_database) + + +def replace_str_in_file(file_path, match_str, sub_str): + with open(file_path, "r") as f: + contents = f.read() + contents = contents.replace(match_str, sub_str) + with open(file_path, "wt") as f: + f.write(contents) + + +if __name__ == "__main__": + bootstrap() diff --git a/sdk/python/feast/templates/snowflake/driver_repo.py b/sdk/python/feast/templates/snowflake/driver_repo.py new file mode 100644 index 0000000000..a63c6cb503 --- /dev/null +++ b/sdk/python/feast/templates/snowflake/driver_repo.py @@ -0,0 +1,64 @@ +from datetime import timedelta + +import yaml + +from feast import Entity, Feature, FeatureView, SnowflakeSource, ValueType + +# Define an entity for the driver. Entities can be thought of as primary keys used to +# retrieve features. Entities are also used to join multiple tables/views during the +# construction of feature vectors +driver = Entity( + # Name of the entity. Must be unique within a project + name="driver_id", + # The join key of an entity describes the storage level field/column on which + # features can be looked up. The join key is also used to join feature + # tables/views when building feature vectors + join_key="driver_id", +) + +# Indicates a data source from which feature values can be retrieved. Sources are queried when building training +# datasets or materializing features into an online store. +project_name = yaml.safe_load(open("feature_store.yaml"))["project"] + +driver_stats_source = SnowflakeSource( + # The Snowflake table where features can be found + database=yaml.safe_load(open("feature_store.yaml"))["offline_store"]["database"], + table=f"{project_name}_feast_driver_hourly_stats", + # The event timestamp is used for point-in-time joins and for ensuring only + # features within the TTL are returned + event_timestamp_column="event_timestamp", + # The (optional) created timestamp is used to ensure there are no duplicate + # feature rows in the offline store or when building training datasets + created_timestamp_column="created", +) + +# Feature views are a grouping based on how features are stored in either the +# online or offline store. +driver_stats_fv = FeatureView( + # The unique name of this feature view. Two feature views in a single + # project cannot have the same name + name="driver_hourly_stats", + # The list of entities specifies the keys required for joining or looking + # up features from this feature view. The reference provided in this field + # correspond to the name of a defined entity (or entities) + entities=["driver_id"], + # The timedelta is the maximum age that each feature value may have + # relative to its lookup time. For historical features (used in training), + # TTL is relative to each timestamp provided in the entity dataframe. + # TTL also allows for eviction of keys from online stores and limits the + # amount of historical scanning required for historical feature values + # during retrieval + ttl=timedelta(weeks=52), + # The list of features defined below act as a schema to both define features + # for both materialization of features into a store, and are used as references + # during retrieval for building a training dataset or serving features + features=[ + Feature(name="conv_rate", dtype=ValueType.FLOAT), + Feature(name="acc_rate", dtype=ValueType.FLOAT), + Feature(name="avg_daily_trips", dtype=ValueType.INT64), + ], + # Batch sources are used to find feature values. In the case of this feature + # view we will query a source table on Redshift for driver statistics + # features + batch_source=driver_stats_source, +) diff --git a/sdk/python/feast/templates/snowflake/feature_store.yaml b/sdk/python/feast/templates/snowflake/feature_store.yaml new file mode 100644 index 0000000000..9757ea2ead --- /dev/null +++ b/sdk/python/feast/templates/snowflake/feature_store.yaml @@ -0,0 +1,11 @@ +project: my_project +registry: registry.db +provider: local +offline_store: + type: snowflake.offline + account: SNOWFLAKE_DEPLOYMENT_URL + user: SNOWFLAKE_USER + password: SNOWFLAKE_PASSWORD + role: SNOWFLAKE_ROLE + warehouse: SNOWFLAKE_WAREHOUSE + database: SNOWFLAKE_DATABASE diff --git a/sdk/python/feast/templates/snowflake/test.py b/sdk/python/feast/templates/snowflake/test.py new file mode 100644 index 0000000000..32aa6380d5 --- /dev/null +++ b/sdk/python/feast/templates/snowflake/test.py @@ -0,0 +1,65 @@ +from datetime import datetime, timedelta + +import pandas as pd +from driver_repo import driver, driver_stats_fv + +from feast import FeatureStore + + +def main(): + pd.set_option("display.max_columns", None) + pd.set_option("display.width", 1000) + + # Load the feature store from the current path + fs = FeatureStore(repo_path=".") + + # Deploy the feature store to Snowflake + print("Deploying feature store to Snowflake...") + fs.apply([driver, driver_stats_fv]) + + # Select features + features = ["driver_hourly_stats:conv_rate", "driver_hourly_stats:acc_rate"] + + # Create an entity dataframe. This is the dataframe that will be enriched with historical features + entity_df = pd.DataFrame( + { + "event_timestamp": [ + pd.Timestamp(dt, unit="ms", tz="UTC").round("ms") + for dt in pd.date_range( + start=datetime.now() - timedelta(days=3), + end=datetime.now(), + periods=3, + ) + ], + "driver_id": [1001, 1002, 1003], + } + ) + + print("Retrieving training data...") + + # Retrieve historical features by joining the entity dataframe to the Snowflake table source + training_df = fs.get_historical_features( + features=features, entity_df=entity_df + ).to_df() + + print() + print(training_df) + + print() + print("Loading features into the online store...") + fs.materialize_incremental(end_date=datetime.now()) + + print() + print("Retrieving online features...") + + # Retrieve features from the online store + online_features = fs.get_online_features( + features=features, entity_rows=[{"driver_id": 1001}, {"driver_id": 1002}], + ).to_dict() + + print() + print(pd.DataFrame.from_dict(online_features)) + + +if __name__ == "__main__": + main() diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 599be85fdf..e39a4ecb81 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -126,6 +126,8 @@ def python_type_to_feast_value_type( "uint64": ValueType.INT64, "int32": ValueType.INT32, "uint32": ValueType.INT32, + "int16": ValueType.INT32, + "uint16": ValueType.INT32, "uint8": ValueType.INT32, "int8": ValueType.INT32, "bool": ValueType.BOOL, @@ -480,6 +482,28 @@ def redshift_to_feast_value_type(redshift_type_as_str: str) -> ValueType: return type_map[redshift_type_as_str.lower()] +def snowflake_python_type_to_feast_value_type( + snowflake_python_type_as_str: str, +) -> ValueType: + + type_map = { + "str": ValueType.STRING, + "float64": ValueType.DOUBLE, + "int64": ValueType.INT64, + "uint64": ValueType.INT64, + "int32": ValueType.INT32, + "uint32": ValueType.INT32, + "int16": ValueType.INT32, + "uint16": ValueType.INT32, + "uint8": ValueType.INT32, + "int8": ValueType.INT32, + "datetime64[ns]": ValueType.UNIX_TIMESTAMP, + "object": ValueType.UNKNOWN, + } + + return type_map[snowflake_python_type_as_str.lower()] + + def pa_to_redshift_value_type(pa_type: pyarrow.DataType) -> str: # PyArrow types: https://arrow.apache.org/docs/python/api/datatypes.html # Redshift type: https://docs.aws.amazon.com/redshift/latest/dg/c_Supported_data_types.html diff --git a/sdk/python/requirements/py3.7-ci-requirements.txt b/sdk/python/requirements/py3.7-ci-requirements.txt index 87ab9f9813..293b44e053 100644 --- a/sdk/python/requirements/py3.7-ci-requirements.txt +++ b/sdk/python/requirements/py3.7-ci-requirements.txt @@ -26,6 +26,10 @@ appdirs==1.4.4 # via black asgiref==3.4.1 # via uvicorn +asn1crypto==1.4.0 + # via + # oscrypto + # snowflake-connector-python assertpy==1.1 # via feast (setup.py) async-timeout==4.0.2 @@ -73,16 +77,19 @@ certifi==2021.10.8 # minio # msrest # requests + # snowflake-connector-python cffi==1.15.0 # via # azure-datalake-store # cryptography + # snowflake-connector-python cfgv==3.3.1 # via pre-commit charset-normalizer==2.0.10 # via # aiohttp # requests + # snowflake-connector-python click==8.0.3 # via # black @@ -101,6 +108,8 @@ cryptography==3.3.2 # feast (setup.py) # moto # msal + # pyopenssl + # snowflake-connector-python decorator==5.1.1 # via gcsfs deprecated==1.2.13 @@ -229,6 +238,7 @@ idna==3.3 # via # anyio # requests + # snowflake-connector-python # yarl imagesize==1.3.0 # via sphinx @@ -316,6 +326,8 @@ numpy==1.21.5 # pyarrow oauthlib==3.1.1 # via requests-oauthlib +oscrypto==1.2.1 + # via snowflake-connector-python packaging==21.3 # via # deprecation @@ -329,6 +341,7 @@ pandas==1.3.5 # via # feast (setup.py) # pandavro + # snowflake-connector-python pandavro==1.5.2 # via feast (setup.py) pathspec==0.9.0 @@ -373,7 +386,9 @@ py==1.11.0 py-cpuinfo==8.0.0 # via pytest-benchmark pyarrow==6.0.1 - # via feast (setup.py) + # via + # feast (setup.py) + # snowflake-connector-python pyasn1==0.4.8 # via # pyasn1-modules @@ -384,6 +399,8 @@ pycodestyle==2.8.0 # via flake8 pycparser==2.21 # via cffi +pycryptodomex==3.13.0 + # via snowflake-connector-python pydantic==1.9.0 # via # fastapi @@ -396,6 +413,9 @@ pyjwt[crypto]==2.3.0 # via # adal # msal + # snowflake-connector-python +pyopenssl==21.0.0 + # via snowflake-connector-python pyparsing==3.0.7 # via # httplib2 @@ -444,6 +464,7 @@ pytz==2021.3 # google-api-core # moto # pandas + # snowflake-connector-python pyyaml==6.0 # via # feast (setup.py) @@ -471,6 +492,7 @@ requests==2.27.1 # msrest # requests-oauthlib # responses + # snowflake-connector-python # sphinx requests-oauthlib==1.3.0 # via @@ -497,6 +519,7 @@ six==1.16.0 # mock # msrestazure # pandavro + # pyopenssl # python-dateutil # responses # virtualenv @@ -504,6 +527,8 @@ sniffio==1.2.0 # via anyio snowballstemmer==2.2.0 # via sphinx +snowflake-connector-python[pandas]==2.7.3 + # via feast (setup.py) sphinx==4.3.2 # via # feast (setup.py) diff --git a/sdk/python/requirements/py3.8-ci-requirements.txt b/sdk/python/requirements/py3.8-ci-requirements.txt index 851a0b7054..3cdc118144 100644 --- a/sdk/python/requirements/py3.8-ci-requirements.txt +++ b/sdk/python/requirements/py3.8-ci-requirements.txt @@ -26,6 +26,10 @@ appdirs==1.4.4 # via black asgiref==3.4.1 # via uvicorn +asn1crypto==1.4.0 + # via + # oscrypto + # snowflake-connector-python assertpy==1.1 # via feast (setup.py) async-timeout==4.0.2 @@ -71,16 +75,19 @@ certifi==2021.10.8 # minio # msrest # requests + # snowflake-connector-python cffi==1.15.0 # via # azure-datalake-store # cryptography + # snowflake-connector-python cfgv==3.3.1 # via pre-commit charset-normalizer==2.0.10 # via # aiohttp # requests + # snowflake-connector-python click==8.0.3 # via # black @@ -99,6 +106,8 @@ cryptography==3.3.2 # feast (setup.py) # moto # msal + # pyopenssl + # snowflake-connector-python decorator==5.1.1 # via gcsfs deprecated==1.2.13 @@ -227,6 +236,7 @@ idna==3.3 # via # anyio # requests + # snowflake-connector-python # yarl imagesize==1.3.0 # via sphinx @@ -302,6 +312,8 @@ numpy==1.22.1 # pyarrow oauthlib==3.1.1 # via requests-oauthlib +oscrypto==1.2.1 + # via snowflake-connector-python packaging==21.3 # via # deprecation @@ -315,6 +327,7 @@ pandas==1.3.5 # via # feast (setup.py) # pandavro + # snowflake-connector-python pandavro==1.5.2 # via feast (setup.py) pathspec==0.9.0 @@ -359,7 +372,9 @@ py==1.11.0 py-cpuinfo==8.0.0 # via pytest-benchmark pyarrow==6.0.1 - # via feast (setup.py) + # via + # feast (setup.py) + # snowflake-connector-python pyasn1==0.4.8 # via # pyasn1-modules @@ -370,6 +385,8 @@ pycodestyle==2.8.0 # via flake8 pycparser==2.21 # via cffi +pycryptodomex==3.13.0 + # via snowflake-connector-python pydantic==1.9.0 # via # fastapi @@ -382,6 +399,9 @@ pyjwt[crypto]==2.3.0 # via # adal # msal + # snowflake-connector-python +pyopenssl==21.0.0 + # via snowflake-connector-python pyparsing==3.0.7 # via # httplib2 @@ -430,6 +450,7 @@ pytz==2021.3 # google-api-core # moto # pandas + # snowflake-connector-python pyyaml==6.0 # via # feast (setup.py) @@ -457,6 +478,7 @@ requests==2.27.1 # msrest # requests-oauthlib # responses + # snowflake-connector-python # sphinx requests-oauthlib==1.3.0 # via @@ -483,6 +505,7 @@ six==1.16.0 # mock # msrestazure # pandavro + # pyopenssl # python-dateutil # responses # virtualenv @@ -490,6 +513,8 @@ sniffio==1.2.0 # via anyio snowballstemmer==2.2.0 # via sphinx +snowflake-connector-python[pandas]==2.7.3 + # via feast (setup.py) sphinx==4.3.2 # via # feast (setup.py) diff --git a/sdk/python/requirements/py3.9-ci-requirements.txt b/sdk/python/requirements/py3.9-ci-requirements.txt index 76ed9f1237..69247a2c7d 100644 --- a/sdk/python/requirements/py3.9-ci-requirements.txt +++ b/sdk/python/requirements/py3.9-ci-requirements.txt @@ -26,6 +26,10 @@ appdirs==1.4.4 # via black asgiref==3.4.1 # via uvicorn +asn1crypto==1.4.0 + # via + # oscrypto + # snowflake-connector-python assertpy==1.1 # via feast (setup.py) async-timeout==4.0.2 @@ -71,16 +75,19 @@ certifi==2021.10.8 # minio # msrest # requests + # snowflake-connector-python cffi==1.15.0 # via # azure-datalake-store # cryptography + # snowflake-connector-python cfgv==3.3.1 # via pre-commit charset-normalizer==2.0.10 # via # aiohttp # requests + # snowflake-connector-python click==8.0.3 # via # black @@ -99,6 +106,8 @@ cryptography==3.3.2 # feast (setup.py) # moto # msal + # pyopenssl + # snowflake-connector-python decorator==5.1.1 # via gcsfs deprecated==1.2.13 @@ -227,6 +236,7 @@ idna==3.3 # via # anyio # requests + # snowflake-connector-python # yarl imagesize==1.3.0 # via sphinx @@ -300,6 +310,8 @@ numpy==1.22.1 # pyarrow oauthlib==3.1.1 # via requests-oauthlib +oscrypto==1.2.1 + # via snowflake-connector-python packaging==21.3 # via # deprecation @@ -313,6 +325,7 @@ pandas==1.3.5 # via # feast (setup.py) # pandavro + # snowflake-connector-python pandavro==1.5.2 # via feast (setup.py) pathspec==0.9.0 @@ -357,7 +370,9 @@ py==1.11.0 py-cpuinfo==8.0.0 # via pytest-benchmark pyarrow==6.0.1 - # via feast (setup.py) + # via + # feast (setup.py) + # snowflake-connector-python pyasn1==0.4.8 # via # pyasn1-modules @@ -368,6 +383,8 @@ pycodestyle==2.8.0 # via flake8 pycparser==2.21 # via cffi +pycryptodomex==3.13.0 + # via snowflake-connector-python pydantic==1.9.0 # via # fastapi @@ -380,6 +397,9 @@ pyjwt[crypto]==2.3.0 # via # adal # msal + # snowflake-connector-python +pyopenssl==21.0.0 + # via snowflake-connector-python pyparsing==3.0.7 # via # httplib2 @@ -428,6 +448,7 @@ pytz==2021.3 # google-api-core # moto # pandas + # snowflake-connector-python pyyaml==6.0 # via # feast (setup.py) @@ -455,6 +476,7 @@ requests==2.27.1 # msrest # requests-oauthlib # responses + # snowflake-connector-python # sphinx requests-oauthlib==1.3.0 # via @@ -481,6 +503,7 @@ six==1.16.0 # mock # msrestazure # pandavro + # pyopenssl # python-dateutil # responses # virtualenv @@ -488,6 +511,8 @@ sniffio==1.2.0 # via anyio snowballstemmer==2.2.0 # via sphinx +snowflake-connector-python[pandas]==2.7.3 + # via feast (setup.py) sphinx==4.3.2 # via # feast (setup.py) diff --git a/sdk/python/setup.py b/sdk/python/setup.py index bae1695bf1..cb5381813b 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -86,6 +86,10 @@ "docker>=5.0.2", ] +SNOWFLAKE_REQUIRED = [ + "snowflake-connector-python[pandas]>=2.7.3", +] + CI_REQUIRED = ( [ "cryptography==3.3.2", @@ -130,6 +134,7 @@ + GCP_REQUIRED + REDIS_REQUIRED + AWS_REQUIRED + + SNOWFLAKE_REQUIRED ) DEV_REQUIRED = ["mypy-protobuf>=3.1.0", "grpcio-testing==1.*"] + CI_REQUIRED @@ -231,6 +236,7 @@ def run(self): "gcp": GCP_REQUIRED, "aws": AWS_REQUIRED, "redis": REDIS_REQUIRED, + "snowflake": SNOWFLAKE_REQUIRED }, include_package_data=True, license="Apache", diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index f0fb0b28fd..a9953d5977 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -29,6 +29,9 @@ from tests.integration.feature_repos.universal.data_sources.redshift import ( RedshiftDataSourceCreator, ) +from tests.integration.feature_repos.universal.data_sources.snowflake import ( + SnowflakeDataSourceCreator, +) from tests.integration.feature_repos.universal.feature_views import ( conv_rate_plus_100_feature_view, create_conv_rate_request_data_source, @@ -83,6 +86,12 @@ offline_store_creator=RedshiftDataSourceCreator, online_store=REDIS_CONFIG, ), + # Snowflake configurations + IntegrationTestRepoConfig( + provider="aws", # no list features, no feature server + offline_store_creator=SnowflakeDataSourceCreator, + online_store=REDIS_CONFIG, + ), ] ) full_repo_configs_module = os.environ.get(FULL_REPO_CONFIGS_MODULE_ENV_NAME) diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py new file mode 100644 index 0000000000..1ecae0317b --- /dev/null +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py @@ -0,0 +1,81 @@ +import os +import uuid +from typing import Dict, List, Optional + +import pandas as pd + +from feast import SnowflakeSource +from feast.data_source import DataSource +from feast.infra.offline_stores.snowflake import SnowflakeOfflineStoreConfig +from feast.infra.offline_stores.snowflake_source import SavedDatasetSnowflakeStorage +from feast.infra.utils.snowflake_utils import get_snowflake_conn, write_pandas +from feast.repo_config import FeastConfigBaseModel +from tests.integration.feature_repos.universal.data_source_creator import ( + DataSourceCreator, +) + + +class SnowflakeDataSourceCreator(DataSourceCreator): + + tables: List[str] = [] + + def __init__(self, project_name: str): + super().__init__() + self.project_name = project_name + self.offline_store_config = SnowflakeOfflineStoreConfig( + type="snowflake.offline", + account=os.environ["SNOWFLAKE_CI_DEPLOYMENT"], + user=os.environ["SNOWFLAKE_CI_USER"], + password=os.environ["SNOWFLAKE_CI_PASSWORD"], + role=os.environ["SNOWFLAKE_CI_ROLE"], + warehouse=os.environ["SNOWFLAKE_CI_WAREHOUSE"], + database="FEAST", + ) + + def create_data_source( + self, + df: pd.DataFrame, + destination_name: str, + suffix: Optional[str] = None, + event_timestamp_column="ts", + created_timestamp_column="created_ts", + field_mapping: Dict[str, str] = None, + ) -> DataSource: + + snowflake_conn = get_snowflake_conn(self.offline_store_config) + + destination_name = self.get_prefixed_table_name(destination_name) + + write_pandas(snowflake_conn, df, destination_name, auto_create_table=True) + + self.tables.append(destination_name) + + return SnowflakeSource( + table=destination_name, + event_timestamp_column=event_timestamp_column, + created_timestamp_column=created_timestamp_column, + date_partition_column="", + field_mapping=field_mapping or {"ts_1": "ts"}, + ) + + def create_saved_dataset_destination(self) -> SavedDatasetSnowflakeStorage: + table = self.get_prefixed_table_name( + f"persisted_ds_{str(uuid.uuid4()).replace('-', '_')}" + ) + self.tables.append(table) + + return SavedDatasetSnowflakeStorage(table_ref=table) + + def create_offline_store_config(self) -> FeastConfigBaseModel: + return self.offline_store_config + + def get_prefixed_table_name(self, suffix: str) -> str: + return f"{self.project_name}_{suffix}" + + def teardown(self): + snowflake_conn = get_snowflake_conn(self.offline_store_config) + + with snowflake_conn as conn: + cur = conn.cursor() + for table in self.tables: + cur.execute(f'DROP TABLE IF EXISTS "{table}"') diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index 147e20aee1..4a396c7e4d 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -26,6 +26,9 @@ construct_universal_feature_views, table_name_from_data_source, ) +from tests.integration.feature_repos.universal.data_sources.snowflake import ( + SnowflakeDataSourceCreator, +) from tests.integration.feature_repos.universal.entities import ( customer, driver, @@ -469,7 +472,13 @@ def test_historical_features_with_entities_from_query( if not orders_table: raise pytest.skip("Offline source is not sql-based") - entity_df_query = f"SELECT customer_id, driver_id, order_id, origin_id, destination_id, event_timestamp FROM {orders_table}" + if ( + environment.test_repo_config.offline_store_creator.__name__ + == SnowflakeDataSourceCreator.__name__ + ): + entity_df_query = f'''SELECT "customer_id", "driver_id", "order_id", "origin_id", "destination_id", "event_timestamp" FROM "{orders_table}"''' + else: + entity_df_query = f"SELECT customer_id, driver_id, order_id, origin_id, destination_id, event_timestamp FROM {orders_table}" store.apply([driver(), customer(), location(), *feature_views.values()])