From 6874282f2bff151b15733b0f9599420e18008223 Mon Sep 17 00:00:00 2001 From: Samhita Alla Date: Thu, 9 Dec 2021 22:21:58 +0530 Subject: [PATCH] Fix Feast's Jupyter notebook code (#482) * fix Feast's Jupyter notebook code Signed-off-by: Samhita Alla * remove cell outputs Signed-off-by: Samhita Alla * working Signed-off-by: Samhita Alla --- .../feast_integration/Feast_Flyte_Demo.ipynb | 187 ++++++++++-------- .../custom_provider/provider.py | 10 +- .../feast_integration/feast_dataobjects.py | 26 ++- .../feast_integration/feast_workflow.py | 69 ++++--- .../feast_integration/requirements.in | 3 +- .../feast_integration/requirements.txt | 174 +++++++++++----- 6 files changed, 301 insertions(+), 168 deletions(-) diff --git a/cookbook/case_studies/feature_engineering/feast_integration/Feast_Flyte_Demo.ipynb b/cookbook/case_studies/feature_engineering/feast_integration/Feast_Flyte_Demo.ipynb index 8517553f0..f4724a9fc 100644 --- a/cookbook/case_studies/feature_engineering/feast_integration/Feast_Flyte_Demo.ipynb +++ b/cookbook/case_studies/feature_engineering/feast_integration/Feast_Flyte_Demo.ipynb @@ -2,44 +2,59 @@ "cells": [ { "cell_type": "markdown", - "id": "56ad7306", + "id": "8711cbe4", "metadata": {}, "source": [ - "# Demo of running a Flyte + Feast, feature engineering and training pipeline\n", - "In this demo we will learn how to interact with Feast through Flyte. The goal will be to train a simple [Gaussian Naive Bayes model using sklearn](https://scikit-learn.org/stable/modules/generated/sklearn.naive_bayes.GaussianNB.html) on the [Horse-Colic dataset from UCI](https://archive.ics.uci.edu/ml/datasets/Horse+Colic).\n", - "The model aims to classify if the lesion of the horse is surgical or not. This is a modified version of the original dataset.\n", + "# Demo showcasing Flyte & Feast Integration—Feature Engineering and Training Pipeline\n", + "\n", + "In this demo, we will learn how to interact with Feast through Flyte. The goal will be to train a simple [Gaussian Naive Bayes model using sklearn](https://scikit-learn.org/stable/modules/generated/sklearn.naive_bayes.GaussianNB.html) on the [Horse-Colic dataset from UCI](https://archive.ics.uci.edu/ml/datasets/Horse+Colic).\n", + "\n", + "The model aims to classify if the lesion of the horse is surgical or not. It uses a modified version of the original dataset.\n", "\n", "**NOTE**\n", - "We will not really dive into the dataset or the model, as the aim of this tutorial is to show how you can use Feast as the feature store and use Flyte to engineer the features that can be identical across your online predictions as well as offline training" + "We will not dive into the dataset or the model as the aim of this tutorial is to show how you can use Feast as a feature store and Flyte to engineer the features that can be identical across your online and offline training." ] }, { "cell_type": "markdown", - "id": "07dbb780", + "id": "f79f28b4", "metadata": {}, "source": [ - "## Step 1: Check out the code for the pipeline\n", - "We have used [flytekit](https://docs.flyte.org/projects/flytekit/en/latest/) flyte's python SDK to express the pipeline in pure python. The actual workflow code is auto-documented and rendered using sphinx [here](https://flyte--424.org.readthedocs.build/projects/cookbook/en/424/auto/case_studies/feature_engineering/feast_integration/index.html) *to be merged soon*" + "### Step 1: Code 💻\n", + "\n", + "We have used [Flytekit](https://docs.flyte.org/projects/flytekit/en/latest/)—Flyte's Python SDK to express the pipeline in pure Python. The actual workflow code is auto-documented and rendered using sphinx [here](https://docs.flyte.org/projects/cookbook/en/latest/auto/case_studies/feature_engineering/feast_integration/index.html)." ] }, { "cell_type": "markdown", - "id": "6ecc8e06", + "id": "8fcea449", "metadata": {}, "source": [ - "## Step 2: Launch an execution\n", - "We can use the [FlyteConsole](https://github.com/flyteorg/flyteconsole) to launch, monitor and introspect Flyte executions, but in this case we will use [flytekit.remote](https://docs.flyte.org/projects/flytekit/en/latest/design/control_plane.html) to interact with the Flyte backend.\n", + "### Step 2: Launch an execution 🚀\n", + "\n", + "We can use [FlyteConsole](https://github.com/flyteorg/flyteconsole) to launch, monitor, and introspect Flyte executions. However, in our case, we will use [flytekit.remote](https://docs.flyte.org/projects/flytekit/en/latest/design/control_plane.html) to interact with the Flyte backend.\n", + "\n", + "#### Set up Flytekit remote from config\n", "\n", - "### Setup flytekit remote from config\n", - "To work with flytesandbox, we have created a simple local config that points to FlyteSandbox server and execution environment. We will initialize flytekit remote with this server. We will also pin it to one project and domain.\n", + "To work with Flyte-sandbox, we need to create a simple local config at `~/.flyte/config`\n", + "that points to Flyte-sandbox server and execution environment. We will initialize Flytekit remote with this server.\n", "\n", - "**Note** this also sets up access to S3 or other equivalent datastores needed by FEAST" + "Example configuration:\n", + "```\n", + "[platform]\n", + "url = localhost:30081\n", + "insecure = True\n", + "```\n", + "\n", + "We will also pin FlyteRemote to one project and domain.\n", + "\n", + "**NOTE** The integration also sets up access to S3 or other equivalent datastores needed by FEAST." ] }, { "cell_type": "code", "execution_count": null, - "id": "f8dd18c7", + "id": "a2330891", "metadata": { "scrolled": true }, @@ -51,119 +66,111 @@ }, { "cell_type": "markdown", - "id": "9cd0b83e", + "id": "64d6295a", "metadata": {}, "source": [ - "### Retrieve the latest registered version of the pipeline\n", - "FlyteRemote provides convienient methods to retrieve a version of the pipeline from the remote server.\n", + "#### Retrieve the latest registered version of the pipeline\n", + "\n", + "FlyteRemote provides convienient methods to retrieve version of the pipeline from the remote server.\n", "\n", - "**Note** It is possible to get a specific version of workflow and trigger a launch for that, but, we will just get the latest" + "**NOTE** It is possible to get a specific version of the workflow and trigger a launch for that, but let's just get the latest." ] }, { "cell_type": "code", "execution_count": null, - "id": "9ccda3d5", + "id": "d28014f8", "metadata": {}, "outputs": [], "source": [ - "# from feast_integration.feast_workflow import feast_workflow\n", "lp = remote.fetch_launch_plan(name=\"feast_integration.feast_workflow.feast_workflow\")\n", "lp.id.version" ] }, { "cell_type": "markdown", - "id": "5e8ccc35", + "id": "c71210a7", "metadata": {}, "source": [ - "### Launch an execution\n", - "`remote.execute` makes it simple to start an execution for the launchplan. We will not provide any inputs and just use the default inputs" + "#### Launch\n", + "\n", + "`remote.execute` simplifies starting an execution for the launch plan. Let's use the default inputs." ] }, { "cell_type": "code", "execution_count": null, - "id": "a461fe08", + "id": "c13770fc", "metadata": {}, "outputs": [], "source": [ - "exe = remote.execute(lp, inputs={})\n", - "print(f\"http://localhost:30081/console/projects/{exe.id.project}/domains/{exe.id.domain}/executions/{exe.id.name}\")" + "execution = remote.execute(lp, inputs={})\n", + "print(f\"http://localhost:30081/console/projects/{execution.id.project}/domains/{execution.id.domain}/executions/{execution.id.name}\")" ] }, { "cell_type": "markdown", - "id": "adb68140", + "id": "07bd9e37", "metadata": {}, "source": [ - "## Step 3: Now wait for the execution to complete\n", - "It is possible to launch a sync execution and wait for it to complete, but since all the processes are completely detached (you can even close your laptop) and come back to it later, we will show how to sync the execution back." + "### Step 3: Wait for the execution to complete \n", + "\n", + "It is possible to launch a sync execution and wait for it to complete, but since all the processes are completely detached (you can even close your laptop and come back to it later), we will show how to sync the execution back.\n", + "\n", + "**Side Note**\n", + "It is possible to fetch an existing execution or simply retrieve a started execution. Also, if you launch an execution with the same name, Flyte will respect that and not restart a new execution!" ] }, { "cell_type": "code", "execution_count": null, - "id": "502711ef", + "id": "a8bd9614", "metadata": {}, "outputs": [], "source": [ "from flytekit.models.core.execution import WorkflowExecutionPhase\n", - "exe = remote.sync(exe)\n", - "print(f\"Execution {exe.id.name} is in Phase - {WorkflowExecutionPhase.enum_to_string(exe.closure.phase)}\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "9fda8da9", - "metadata": {}, - "outputs": [], - "source": [ - "exe.sync()" + "\n", + "synced_execution = remote.sync(execution)\n", + "print(f\"Execution {synced_execution.id.name} is in Phase - {WorkflowExecutionPhase.enum_to_string(synced_execution.closure.phase)}\")" ] }, { "cell_type": "markdown", - "id": "4c2074cc", + "id": "65e5b181", "metadata": {}, "source": [ - "## Step 4: Lets sync data from this execution\n", + "### Step 4: Retrieve output\n", "\n", - "**Side Note**\n", - "It is possible to fetch an existing execution or simply retrieve a started execution. Also if you launch an execution with the same name, flyte will respect and not restart a new execution!\n", - "\n", - "To fetch an execution\n", - "```python\n", - "exe = remote.fetch_workflow_execution(name='f9f180a56e67b4c9781e')\n", - "exe = remote.sync(exe)\n", - "```" + "Let's fetch the workflow outputs." ] }, { "cell_type": "code", "execution_count": null, - "id": "3e778b53", + "id": "ab24b1c0", "metadata": {}, "outputs": [], "source": [ "from feast_dataobjects import FeatureStore\n", - "fs = exe.raw_outputs.get('o0', FeatureStore)\n", - "model = exe.outputs['o1']" + "\n", + "# \"raw_outputs\" in FlyteRemote helps associate type to the output, and resolves Literals to Python objects.\n", + "# For example, a data class is returned as a marshmallow schema (serialized) when \"outputs\" is used but is returned as a data class when \"raw_outputs\" is used.\n", + "fs = synced_execution.raw_outputs.get(\"o0\", FeatureStore)\n", + "model = synced_execution.outputs['o1']" ] }, { "cell_type": "markdown", - "id": "545e00c9", + "id": "fcd55057", "metadata": {}, "source": [ - "#### Lets inspect the feature store configuration" + "Next, we inspect the feature store configuration and model. " ] }, { "cell_type": "code", "execution_count": null, - "id": "a5dff70e", + "id": "d31c96a8", "metadata": {}, "outputs": [], "source": [ @@ -172,16 +179,16 @@ }, { "cell_type": "markdown", - "id": "20db4c76", + "id": "af8277d3", "metadata": {}, "source": [ - "#### Also, the model is now available locally as a JobLibSerialized file and can be downloaded and loaded" + "**NOTE** The output model is available locally as a JobLibSerialized file, which can be downloaded and loaded." ] }, { "cell_type": "code", "execution_count": null, - "id": "3747b3a3", + "id": "8a841e22", "metadata": { "scrolled": true }, @@ -192,20 +199,25 @@ }, { "cell_type": "markdown", - "id": "95ae557b", + "id": "7a535b4d", "metadata": {}, "source": [ - "## Step 5: Cool, Let's predict\n", - "So we have the model and a feature store!, how can you run predictions. Flytekit will automatically manage the IO for you and you can simply re-use the prediction function from the workflow." + "### Step 5: Cool, let's predict\n", + "\n", + "We now have the model and feature store with us! So, how can we generate predictions? We can simply re-use the `predict` function from the workflow; Flytekit will automatically manage the IO for us.\n", + "\n", + "**NOTE** We set a couple of environment variables to facilitate the AWS access." ] }, { "cell_type": "markdown", - "id": "77c20c5d", + "id": "dff58f63", "metadata": {}, "source": [ - "### Lets load some features from the online feature store\n", - "We are re-using the feature definition from the flyte workflow\n", + "#### Load features from an online feature store\n", + "\n", + "Let's re-use the feature definition from the Flyte workflow.\n", + "\n", "```python\n", "inference_point = fs.get_online_features(FEAST_FEATURES, [{\"Hospital Number\": \"533738\"}])\n", "```" @@ -214,28 +226,37 @@ { "cell_type": "code", "execution_count": null, - "id": "88abda57", + "id": "c7a2c3c4", "metadata": {}, "outputs": [], "source": [ + "import os\n", + "\n", "from feast_workflow import predict, FEAST_FEATURES\n", + "\n", + "os.environ[\"FLYTE_AWS_ENDPOINT\"] = os.environ[\"FEAST_S3_ENDPOINT_URL\"] = \"http://localhost:30084/\"\n", + "os.environ[\"FLYTE_AWS_ACCESS_KEY_ID\"] = os.environ[\"AWS_ACCESS_KEY_ID\"] = \"minio\"\n", + "os.environ[\"FLYTE_AWS_SECRET_ACCESS_KEY\"] = os.environ[\"AWS_SECRET_ACCESS_KEY\"] = \"miniostorage\"\n", + "\n", "inference_point = fs.get_online_features(FEAST_FEATURES, [{\"Hospital Number\": \"533738\"}])\n", + "\n", "inference_point" ] }, { "cell_type": "markdown", - "id": "ef5745a6", + "id": "9a49e572", "metadata": {}, "source": [ - "### Now run a prediction\n", - "Notice how we are passing the serialized model and some loaded features" + "#### Generate a prediction\n", + "\n", + "Notice how we are passing the serialized model and some loaded features." ] }, { "cell_type": "code", "execution_count": null, - "id": "f0ff6c96", + "id": "e44c62e2", "metadata": {}, "outputs": [], "source": [ @@ -244,21 +265,25 @@ }, { "cell_type": "markdown", - "id": "1452207b", + "id": "b632e957", "metadata": {}, "source": [ - "## Done! \n", - "We can ofcourse observe the intermediates from the workflow, which we saw in the UI, we can also download any intermediate data.\n", + "### Next Steps \n", + "\n", + "We can, of course, observe the intermediates from the workflow in the UI and download the intermediate data.\n", "\n", - "## Future\n", - "We want to further improve this experience, to allow for the same prediction method to run in your inference server and in a workflow. It is almost there now, but you need to remove the `model de-serialization` as this happens for the current predict method." + "### Future 🔮\n", + "\n", + "We want to improve the integration experience further to allow the `predict` function to run in an inference server and a workflow. We are almost there, but we need to remove `model de-serialization` in the `predict` method." ] } ], "metadata": { + "interpreter": { + "hash": "dc3fb656270592a65285897570586647c2377dd9205211f8c7206e5246caf1a6" + }, "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", + "display_name": "Python 3.8.10 64-bit ('feast': pyenv)", "name": "python3" }, "language_info": { @@ -271,7 +296,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.2" + "version": "3.8.10" } }, "nbformat": 4, diff --git a/cookbook/case_studies/feature_engineering/feast_integration/custom_provider/provider.py b/cookbook/case_studies/feature_engineering/feast_integration/custom_provider/provider.py index 9ab6252ef..0f8d7e3db 100644 --- a/cookbook/case_studies/feature_engineering/feast_integration/custom_provider/provider.py +++ b/cookbook/case_studies/feature_engineering/feast_integration/custom_provider/provider.py @@ -20,7 +20,7 @@ class FlyteCustomProvider(LocalProvider): - def __init__(self, config: RepoConfig, repo_path): + def __init__(self, config: RepoConfig): super().__init__(config) def materialize_single_feature_view( @@ -78,13 +78,17 @@ def _localize_feature_view(self, feature_view: FeatureView): # Copy parquet file to a local file file_source: FileSource = feature_view.batch_source - random_local_path = FlyteContext.current_context().file_access.get_random_local_path(file_source.path) + random_local_path = ( + FlyteContext.current_context().file_access.get_random_local_path( + file_source.path + ) + ) FlyteContext.current_context().file_access.get_data( file_source.path, random_local_path, is_multipart=True, ) - feature_view.batch_source=FileSource( + feature_view.batch_source = FileSource( path=random_local_path, event_timestamp_column=file_source.event_timestamp_column, ) diff --git a/cookbook/case_studies/feature_engineering/feast_integration/feast_dataobjects.py b/cookbook/case_studies/feature_engineering/feast_integration/feast_dataobjects.py index f43628fb6..5c54ad4b6 100644 --- a/cookbook/case_studies/feature_engineering/feast_integration/feast_dataobjects.py +++ b/cookbook/case_studies/feature_engineering/feast_integration/feast_dataobjects.py @@ -29,7 +29,7 @@ class FeatureStoreConfig: registry_path: str project: str s3_bucket: str - online_store_path: str = 'online.db' + online_store_path: str = "online.db" @dataclass_json @@ -65,7 +65,10 @@ def apply( fs.apply(objects) # Applying also initializes the sqlite tables in the online store - FlyteContext.current_context().file_access.upload(self.config.online_store_path, f"s3://{self.config.s3_bucket}/{self.config.online_store_path}") + FlyteContext.current_context().file_access.upload( + self.config.online_store_path, + f"s3://{self.config.s3_bucket}/{self.config.online_store_path}", + ) def get_historical_features( self, @@ -85,13 +88,19 @@ def materialize( end_date: datetime, feature_views: Optional[List[str]] = None, ) -> None: - FlyteContext.current_context().file_access.download(f"s3://{self.config.s3_bucket}/{self.config.online_store_path}", self.config.online_store_path) + FlyteContext.current_context().file_access.download( + f"s3://{self.config.s3_bucket}/{self.config.online_store_path}", + self.config.online_store_path, + ) fs = self._build_feast_feature_store() fs.materialize( start_date=start_date, end_date=end_date, ) - FlyteContext.current_context().file_access.upload(self.config.online_store_path, f"s3://{self.config.s3_bucket}/{self.config.online_store_path}") + FlyteContext.current_context().file_access.upload( + self.config.online_store_path, + f"s3://{self.config.s3_bucket}/{self.config.online_store_path}", + ) def get_online_features( self, @@ -100,8 +109,13 @@ def get_online_features( feature_refs: Optional[List[str]] = None, full_feature_names: bool = False, ) -> Dict[str, Any]: - FlyteContext.current_context().file_access.download(f"s3://{self.config.s3_bucket}/{self.config.online_store_path}", self.config.online_store_path) + FlyteContext.current_context().file_access.download( + f"s3://{self.config.s3_bucket}/{self.config.online_store_path}", + self.config.online_store_path, + ) fs = self._build_feast_feature_store() - online_response = fs.get_online_features(features, entity_rows, feature_refs, full_feature_names) + online_response = fs.get_online_features( + features, entity_rows, feature_refs, full_feature_names + ) return online_response.to_dict() diff --git a/cookbook/case_studies/feature_engineering/feast_integration/feast_workflow.py b/cookbook/case_studies/feature_engineering/feast_integration/feast_workflow.py index a6fce78a4..94c32e5c1 100644 --- a/cookbook/case_studies/feature_engineering/feast_integration/feast_workflow.py +++ b/cookbook/case_studies/feature_engineering/feast_integration/feast_workflow.py @@ -19,6 +19,7 @@ import logging import typing + # %% # Let's import the libraries. from datetime import datetime, timedelta @@ -28,7 +29,7 @@ import joblib import pandas as pd from feast import Entity, Feature, FeatureStore, FeatureView, FileSource, ValueType -from flytekit import task, workflow, TaskMetadata +from flytekit import task, workflow, TaskMetadata, Resources from flytekit.configuration import aws from flytekit.extras.sqlite3.task import SQLite3Config, SQLite3Task from flytekit.types.file import JoblibSerializedFile @@ -66,7 +67,7 @@ @task(cache=True, cache_version="1.0") def create_bucket(bucket_name: str) -> str: client = boto3.client( - 's3', + "s3", aws_access_key_id=aws.S3_ACCESS_KEY_ID.get(), aws_secret_access_key=aws.S3_SECRET_ACCESS_KEY.get(), use_ssl=False, @@ -127,7 +128,7 @@ def create_bucket(bucket_name: str) -> str: # hence serialization of the feature store is required. this is because FEAST registries are single files and # Flyte workflows can be highly concurrent # -@task(cache=True, cache_version="1.0") +@task(cache=True, cache_version="1.0", limits=Resources(mem="400Mi")) def store_offline(feature_store: FeatureStore, dataframe: FlyteSchema) -> FeatureStore: horse_colic_entity = Entity(name="Hospital Number", value_type=ValueType.STRING) @@ -158,7 +159,7 @@ def store_offline(feature_store: FeatureStore, dataframe: FlyteSchema) -> Featur return feature_store -@task(cache=True, cache_version="1.0") +@task(cache=True, cache_version="1.0", limits=Resources(mem="400Mi")) def load_historical_features(feature_store: FeatureStore) -> FlyteSchema: entity_df = pd.DataFrame.from_dict( { @@ -187,7 +188,9 @@ def load_historical_features(feature_store: FeatureStore) -> FlyteSchema: } ) - return feature_store.get_historical_features(entity_df=entity_df, features=FEAST_FEATURES) # noqa + return feature_store.get_historical_features( + entity_df=entity_df, features=FEAST_FEATURES + ) # noqa # %% @@ -223,10 +226,10 @@ def train_model(dataset: pd.DataFrame, data_class: str) -> JoblibSerializedFile: # # One key difference between an online and offline store is that only the latest feature values are stored per entity # key in an online store, unlike an offline store where all feature values are stored. -# Our dataset has two such entries with the same ``Hospital Number`` but different time stamps. +# Our dataset has two such entries with the same ``Hospital Number`` but different time stamps. # Only data point with the latest timestamp will be stored in the online store. # -@task(cache=True, cache_version="1.0") +@task(cache=True, cache_version="1.0", limits=Resources(mem="400Mi")) def store_online(feature_store: FeatureStore) -> FeatureStore: feature_store.materialize( start_date=datetime.utcnow() - timedelta(days=250), @@ -254,7 +257,7 @@ def predict(model_ser: JoblibSerializedFile, features: dict) -> typing.List[str] # Next, we need to convert timestamp column in the underlying dataframe, otherwise its type is written as string. @task(cache=True, cache_version="1.0") def convert_timestamp_column( - dataframe: FlyteSchema, timestamp_column: str + dataframe: FlyteSchema, timestamp_column: str ) -> FlyteSchema: df = dataframe.open().all() df[timestamp_column] = pd.to_datetime(df[timestamp_column]) @@ -264,9 +267,15 @@ def convert_timestamp_column( # %% # The ``build_feature_store`` task is a medium to access Feast methods by building a feature store. @task(cache=True, cache_version="1.0") -def build_feature_store(s3_bucket: str, registry_path: str, online_store_path: str) -> FeatureStore: - feature_store_config = FeatureStoreConfig(project="horsecolic", s3_bucket=s3_bucket, registry_path=registry_path, - online_store_path=online_store_path) +def build_feature_store( + s3_bucket: str, registry_path: str, online_store_path: str +) -> FeatureStore: + feature_store_config = FeatureStoreConfig( + project="horsecolic", + s3_bucket=s3_bucket, + registry_path=registry_path, + online_store_path=online_store_path, + ) return FeatureStore(config=feature_store_config) @@ -291,7 +300,9 @@ def retrieve_online(feature_store: FeatureStore, dataset: pd.DataFrame) -> dict: # the following workflow is a separate workflow that can be run indepedently to create features and store them offline # This can be run periodically or triggered independently @workflow -def featurize(feature_store: FeatureStore, imputation_method: str = "mean") -> (FlyteSchema, FeatureStore): +def featurize( + feature_store: FeatureStore, imputation_method: str = "mean" +) -> (FlyteSchema, FeatureStore): # Load parquet file from sqlite task df = load_horse_colic_sql() @@ -299,9 +310,7 @@ def featurize(feature_store: FeatureStore, imputation_method: str = "mean") -> ( df = mean_median_imputer(dataframe=df, imputation_method=imputation_method) # Convert timestamp column from string to datetime. - converted_df = convert_timestamp_column( - dataframe=df, timestamp_column="timestamp" - ) + converted_df = convert_timestamp_column(dataframe=df, timestamp_column="timestamp") return df, store_offline(feature_store=feature_store, dataframe=converted_df) @@ -333,28 +342,38 @@ def trainer(df: FlyteSchema, num_features_univariate: int = 7) -> JoblibSerializ # we construct the following workflow, composing other workflows @workflow def feast_workflow( - imputation_method: str = "mean", - num_features_univariate: int = 7, - s3_bucket: str = "feast-integration", - registry_path: str = "registry.db", - online_store_path: str = "online.db", + imputation_method: str = "mean", + num_features_univariate: int = 7, + s3_bucket: str = "feast-integration", + registry_path: str = "registry.db", + online_store_path: str = "online.db", ) -> (FeatureStore, JoblibSerializedFile, typing.List[str]): # Create bucket if it does not already exist # & Build feature store - feature_store = build_feature_store(s3_bucket=create_bucket(bucket_name=s3_bucket), registry_path=registry_path, - online_store_path=online_store_path) + feature_store = build_feature_store( + s3_bucket=create_bucket(bucket_name=s3_bucket), + registry_path=registry_path, + online_store_path=online_store_path, + ) # Feature engineering - df, loaded_feature_store = featurize(feature_store=feature_store, imputation_method=imputation_method) + df, loaded_feature_store = featurize( + feature_store=feature_store, imputation_method=imputation_method + ) # Demonstrate how to load features from offline store historical_features = load_historical_features(feature_store=loaded_feature_store) - model = trainer(df=historical_features, num_features_univariate=num_features_univariate) + model = trainer( + df=historical_features, num_features_univariate=num_features_univariate + ) online_feature_store = store_online(feature_store=loaded_feature_store) # Use a feature retrieved from the online store for inference - predictions = predict(model_ser=model, features=retrieve_online(feature_store=online_feature_store, dataset=df)) # noqa + predictions = predict( + model_ser=model, + features=retrieve_online(feature_store=online_feature_store, dataset=df), + ) # noqa return online_feature_store, model, predictions diff --git a/cookbook/case_studies/feature_engineering/feast_integration/requirements.in b/cookbook/case_studies/feature_engineering/feast_integration/requirements.in index a166ebe4f..ec0ddafc4 100644 --- a/cookbook/case_studies/feature_engineering/feast_integration/requirements.in +++ b/cookbook/case_studies/feature_engineering/feast_integration/requirements.in @@ -1,4 +1,5 @@ -flytekit>=0.23.0b1 +-r ../../../common/requirements-common.in scikit-learn numpy feast[aws] +proto-plus diff --git a/cookbook/case_studies/feature_engineering/feast_integration/requirements.txt b/cookbook/case_studies/feature_engineering/feast_integration/requirements.txt index 432ff7697..d97ea731f 100644 --- a/cookbook/case_studies/feature_engineering/feast_integration/requirements.txt +++ b/cookbook/case_studies/feature_engineering/feast_integration/requirements.txt @@ -4,14 +4,18 @@ # # /Library/Developer/CommandLineTools/usr/bin/make requirements.txt # -anyio==3.3.2 +absl-py==0.12.0 + # via tensorflow-metadata +anyio==3.4.0 # via starlette +arrow==1.2.1 + # via jinja2-time asgiref==3.4.1 # via uvicorn attrs==21.2.0 - # via - # jsonschema - # scantree + # via jsonschema +binaryornot==0.4.4 + # via cookiecutter boto3==1.17.112 # via feast botocore==1.20.112 @@ -20,19 +24,30 @@ botocore==1.20.112 # s3transfer cachetools==4.2.4 # via google-auth -certifi==2021.5.30 +certifi==2021.10.8 # via requests -charset-normalizer==2.0.6 +chardet==4.0.0 + # via binaryornot +charset-normalizer==2.0.9 # via requests +checksumdir==1.2.0 + # via flytekit click==7.1.2 # via + # cookiecutter # feast # flytekit # uvicorn +cloudpickle==2.0.0 + # via flytekit colorama==0.4.4 # via feast -croniter==1.0.15 +cookiecutter==1.7.3 + # via flytekit +croniter==1.1.0 # via flytekit +cycler==0.11.0 + # via matplotlib dataclasses-json==0.5.6 # via flytekit decorator==5.1.0 @@ -41,104 +56,135 @@ deprecated==1.2.13 # via flytekit dill==0.3.4 # via feast -dirhash==0.2.1 - # via flytekit -diskcache==5.2.1 +diskcache==5.3.0 # via flytekit +docker==5.0.3 + # via feast docker-image-py==0.1.12 # via flytekit -docstring-parser==0.11 +docstring-parser==0.13 # via flytekit fastapi==0.70.0 # via feast -fastavro==1.4.5 +fastavro==1.4.7 # via # feast # pandavro -feast[aws]==0.13.0 +feast[aws]==0.16.0 # via -r requirements.in -flyteidl==0.21.4 +flyteidl==0.21.12 # via flytekit -flytekit==0.23.0b6 - # via -r requirements.in -google-api-core==2.1.0 +flytekit==0.25.0b3 + # via -r ../../../common/requirements-common.in +fonttools==4.28.3 + # via matplotlib +google-api-core==2.3.0 # via feast -google-auth==2.3.0 +google-auth==2.3.3 # via google-api-core googleapis-common-protos==1.52.0 # via # feast # google-api-core -grpcio==1.41.0 + # tensorflow-metadata +grpcio==1.42.0 # via # feast # flytekit + # grpcio-reflection +grpcio-reflection==1.42.0 + # via feast h11==0.12.0 # via uvicorn -httptools==0.2.0 +httptools==0.3.0 # via uvicorn -idna==3.2 +idna==3.3 # via # anyio # requests -importlib-metadata==4.8.1 +importlib-metadata==4.8.2 # via keyring -jinja2==3.0.2 - # via feast +importlib-resources==5.4.0 + # via jsonschema +jinja2==3.0.3 + # via + # cookiecutter + # feast + # jinja2-time +jinja2-time==0.2.0 + # via cookiecutter jmespath==0.10.0 # via # boto3 # botocore joblib==1.1.0 # via scikit-learn -jsonschema==4.0.1 +jsonschema==4.2.1 # via feast -keyring==23.2.1 +keyring==23.4.0 # via flytekit +kiwisolver==1.3.2 + # via matplotlib markupsafe==2.0.1 # via jinja2 -marshmallow==3.13.0 +marshmallow==3.14.1 # via # dataclasses-json # marshmallow-enum # marshmallow-jsonschema marshmallow-enum==1.5.1 # via dataclasses-json -marshmallow-jsonschema==0.12.0 +marshmallow-jsonschema==0.13.0 # via flytekit +matplotlib==3.5.0 + # via -r ../../../common/requirements-common.in mmh3==3.0.0 # via feast mypy-extensions==0.4.3 # via typing-inspect -natsort==7.1.1 +natsort==8.0.0 # via flytekit -numpy==1.21.2 +numpy==1.21.4 # via # -r requirements.in + # matplotlib # pandas # pandavro # pyarrow # scikit-learn # scipy -pandas==1.3.3 +packaging==21.3 + # via + # matplotlib + # setuptools-scm +pandas==1.3.4 # via # feast # flytekit # pandavro pandavro==1.5.2 # via feast -pathspec==0.9.0 - # via scantree -protobuf==3.18.1 +pillow==8.4.0 + # via matplotlib +poyo==0.5.0 + # via cookiecutter +proto-plus==1.19.6 + # via + # -r requirements.in + # feast +protobuf==3.19.1 # via # feast # flyteidl # flytekit # google-api-core # googleapis-common-protos -py==1.10.0 + # grpcio-reflection + # proto-plus + # tensorflow-metadata +py==1.11.0 # via retry -pyarrow==3.0.0 +pyarrow==6.0.1 # via # feast # flytekit @@ -152,57 +198,69 @@ pydantic==1.8.2 # via # fastapi # feast +pyparsing==3.0.6 + # via + # matplotlib + # packaging pyrsistent==0.18.0 # via jsonschema python-dateutil==2.8.1 # via + # arrow # botocore # croniter # flytekit + # matplotlib # pandas -python-dotenv==0.19.0 +python-dotenv==0.19.2 # via uvicorn python-json-logger==2.0.2 # via flytekit +python-slugify==5.0.2 + # via cookiecutter pytimeparse==1.1.8 # via flytekit pytz==2018.4 # via # flytekit # pandas -pyyaml==5.4.1 +pyyaml==6.0 # via # feast # uvicorn -regex==2021.9.30 +regex==2021.11.10 # via docker-image-py requests==2.26.0 # via + # cookiecutter + # docker # flytekit # google-api-core # responses -responses==0.14.0 +responses==0.16.0 # via flytekit retry==0.9.2 # via flytekit -rsa==4.7.2 +rsa==4.8 # via google-auth s3transfer==0.4.2 # via boto3 -scantree==0.0.1 - # via dirhash -scikit-learn==1.0 +scikit-learn==1.0.1 # via -r requirements.in -scipy==1.7.1 +scipy==1.7.3 # via scikit-learn +setuptools-scm==6.3.2 + # via matplotlib six==1.16.0 # via + # absl-py + # cookiecutter # flytekit + # google-auth # grpcio # pandavro # python-dateutil # responses - # scantree sniffio==1.2.0 # via anyio sortedcontainers==2.4.0 @@ -215,13 +273,19 @@ tabulate==0.8.9 # via feast tenacity==8.0.1 # via feast +tensorflow-metadata==1.5.0 + # via feast +text-unidecode==1.3 + # via python-slugify threadpoolctl==3.0.0 # via scikit-learn toml==0.10.2 # via feast +tomli==1.2.2 + # via setuptools-scm tqdm==4.62.3 # via feast -typing-extensions==3.10.0.2 +typing-extensions==4.0.1 # via # pydantic # typing-inspect @@ -233,22 +297,28 @@ urllib3==1.26.7 # flytekit # requests # responses -uvicorn[standard]==0.15.0 +uvicorn[standard]==0.16.0 # via feast uvloop==0.16.0 # via uvicorn watchgod==0.7 # via uvicorn -websockets==10.0 +websocket-client==1.2.3 + # via docker +websockets==10.1 # via uvicorn wheel==0.37.0 - # via flytekit -wrapt==1.13.1 + # via + # -r ../../../common/requirements-common.in + # flytekit +wrapt==1.13.3 # via # deprecated # flytekit zipp==3.6.0 - # via importlib-metadata + # via + # importlib-metadata + # importlib-resources # The following packages are considered to be unsafe in a requirements file: # setuptools