Skip to content

Commit

Permalink
Fix Feast's Jupyter notebook code (#482)
Browse files Browse the repository at this point in the history
* fix Feast's Jupyter notebook code

Signed-off-by: Samhita Alla <[email protected]>

* remove cell outputs

Signed-off-by: Samhita Alla <[email protected]>

* working

Signed-off-by: Samhita Alla <[email protected]>
  • Loading branch information
samhita-alla authored Dec 9, 2021
1 parent 6cb5ad6 commit 6874282
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,59 @@
"cells": [
{
"cell_type": "markdown",
"id": "56ad7306",
"id": "8711cbe4",
"metadata": {},
"source": [
"# Demo of running a Flyte + Feast, feature engineering and training pipeline\n",
"In this demo we will learn how to interact with Feast through Flyte. The goal will be to train a simple [Gaussian Naive Bayes model using sklearn](https://scikit-learn.org/stable/modules/generated/sklearn.naive_bayes.GaussianNB.html) on the [Horse-Colic dataset from UCI](https://archive.ics.uci.edu/ml/datasets/Horse+Colic).\n",
"The model aims to classify if the lesion of the horse is surgical or not. This is a modified version of the original dataset.\n",
"# Demo showcasing Flyte & Feast Integration—Feature Engineering and Training Pipeline\n",
"\n",
"In this demo, we will learn how to interact with Feast through Flyte. The goal will be to train a simple [Gaussian Naive Bayes model using sklearn](https://scikit-learn.org/stable/modules/generated/sklearn.naive_bayes.GaussianNB.html) on the [Horse-Colic dataset from UCI](https://archive.ics.uci.edu/ml/datasets/Horse+Colic).\n",
"\n",
"The model aims to classify if the lesion of the horse is surgical or not. It uses a modified version of the original dataset.\n",
"\n",
"**NOTE**\n",
"We will not really dive into the dataset or the model, as the aim of this tutorial is to show how you can use Feast as the feature store and use Flyte to engineer the features that can be identical across your online predictions as well as offline training"
"We will not dive into the dataset or the model as the aim of this tutorial is to show how you can use Feast as a feature store and Flyte to engineer the features that can be identical across your online and offline training."
]
},
{
"cell_type": "markdown",
"id": "07dbb780",
"id": "f79f28b4",
"metadata": {},
"source": [
"## Step 1: Check out the code for the pipeline\n",
"We have used [flytekit](https://docs.flyte.org/projects/flytekit/en/latest/) flyte's python SDK to express the pipeline in pure python. The actual workflow code is auto-documented and rendered using sphinx [here](https://flyte--424.org.readthedocs.build/projects/cookbook/en/424/auto/case_studies/feature_engineering/feast_integration/index.html) *to be merged soon*"
"### Step 1: Code 💻\n",
"\n",
"We have used [Flytekit](https://docs.flyte.org/projects/flytekit/en/latest/)—Flyte's Python SDK to express the pipeline in pure Python. The actual workflow code is auto-documented and rendered using sphinx [here](https://docs.flyte.org/projects/cookbook/en/latest/auto/case_studies/feature_engineering/feast_integration/index.html)."
]
},
{
"cell_type": "markdown",
"id": "6ecc8e06",
"id": "8fcea449",
"metadata": {},
"source": [
"## Step 2: Launch an execution\n",
"We can use the [FlyteConsole](https://github.com/flyteorg/flyteconsole) to launch, monitor and introspect Flyte executions, but in this case we will use [flytekit.remote](https://docs.flyte.org/projects/flytekit/en/latest/design/control_plane.html) to interact with the Flyte backend.\n",
"### Step 2: Launch an execution 🚀\n",
"\n",
"We can use [FlyteConsole](https://github.com/flyteorg/flyteconsole) to launch, monitor, and introspect Flyte executions. However, in our case, we will use [flytekit.remote](https://docs.flyte.org/projects/flytekit/en/latest/design/control_plane.html) to interact with the Flyte backend.\n",
"\n",
"#### Set up Flytekit remote from config\n",
"\n",
"### Setup flytekit remote from config\n",
"To work with flytesandbox, we have created a simple local config that points to FlyteSandbox server and execution environment. We will initialize flytekit remote with this server. We will also pin it to one project and domain.\n",
"To work with Flyte-sandbox, we need to create a simple local config at `~/.flyte/config`\n",
"that points to Flyte-sandbox server and execution environment. We will initialize Flytekit remote with this server.\n",
"\n",
"**Note** this also sets up access to S3 or other equivalent datastores needed by FEAST"
"Example configuration:\n",
"```\n",
"[platform]\n",
"url = localhost:30081\n",
"insecure = True\n",
"```\n",
"\n",
"We will also pin FlyteRemote to one project and domain.\n",
"\n",
"**NOTE** The integration also sets up access to S3 or other equivalent datastores needed by FEAST."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f8dd18c7",
"id": "a2330891",
"metadata": {
"scrolled": true
},
Expand All @@ -51,119 +66,111 @@
},
{
"cell_type": "markdown",
"id": "9cd0b83e",
"id": "64d6295a",
"metadata": {},
"source": [
"### Retrieve the latest registered version of the pipeline\n",
"FlyteRemote provides convienient methods to retrieve a version of the pipeline from the remote server.\n",
"#### Retrieve the latest registered version of the pipeline\n",
"\n",
"FlyteRemote provides convienient methods to retrieve version of the pipeline from the remote server.\n",
"\n",
"**Note** It is possible to get a specific version of workflow and trigger a launch for that, but, we will just get the latest"
"**NOTE** It is possible to get a specific version of the workflow and trigger a launch for that, but let's just get the latest."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9ccda3d5",
"id": "d28014f8",
"metadata": {},
"outputs": [],
"source": [
"# from feast_integration.feast_workflow import feast_workflow\n",
"lp = remote.fetch_launch_plan(name=\"feast_integration.feast_workflow.feast_workflow\")\n",
"lp.id.version"
]
},
{
"cell_type": "markdown",
"id": "5e8ccc35",
"id": "c71210a7",
"metadata": {},
"source": [
"### Launch an execution\n",
"`remote.execute` makes it simple to start an execution for the launchplan. We will not provide any inputs and just use the default inputs"
"#### Launch\n",
"\n",
"`remote.execute` simplifies starting an execution for the launch plan. Let's use the default inputs."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a461fe08",
"id": "c13770fc",
"metadata": {},
"outputs": [],
"source": [
"exe = remote.execute(lp, inputs={})\n",
"print(f\"http://localhost:30081/console/projects/{exe.id.project}/domains/{exe.id.domain}/executions/{exe.id.name}\")"
"execution = remote.execute(lp, inputs={})\n",
"print(f\"http://localhost:30081/console/projects/{execution.id.project}/domains/{execution.id.domain}/executions/{execution.id.name}\")"
]
},
{
"cell_type": "markdown",
"id": "adb68140",
"id": "07bd9e37",
"metadata": {},
"source": [
"## Step 3: Now wait for the execution to complete\n",
"It is possible to launch a sync execution and wait for it to complete, but since all the processes are completely detached (you can even close your laptop) and come back to it later, we will show how to sync the execution back."
"### Step 3: Wait for the execution to complete \n",
"\n",
"It is possible to launch a sync execution and wait for it to complete, but since all the processes are completely detached (you can even close your laptop and come back to it later), we will show how to sync the execution back.\n",
"\n",
"**Side Note**\n",
"It is possible to fetch an existing execution or simply retrieve a started execution. Also, if you launch an execution with the same name, Flyte will respect that and not restart a new execution!"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "502711ef",
"id": "a8bd9614",
"metadata": {},
"outputs": [],
"source": [
"from flytekit.models.core.execution import WorkflowExecutionPhase\n",
"exe = remote.sync(exe)\n",
"print(f\"Execution {exe.id.name} is in Phase - {WorkflowExecutionPhase.enum_to_string(exe.closure.phase)}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9fda8da9",
"metadata": {},
"outputs": [],
"source": [
"exe.sync()"
"\n",
"synced_execution = remote.sync(execution)\n",
"print(f\"Execution {synced_execution.id.name} is in Phase - {WorkflowExecutionPhase.enum_to_string(synced_execution.closure.phase)}\")"
]
},
{
"cell_type": "markdown",
"id": "4c2074cc",
"id": "65e5b181",
"metadata": {},
"source": [
"## Step 4: Lets sync data from this execution\n",
"### Step 4: Retrieve output\n",
"\n",
"**Side Note**\n",
"It is possible to fetch an existing execution or simply retrieve a started execution. Also if you launch an execution with the same name, flyte will respect and not restart a new execution!\n",
"\n",
"To fetch an execution\n",
"```python\n",
"exe = remote.fetch_workflow_execution(name='f9f180a56e67b4c9781e')\n",
"exe = remote.sync(exe)\n",
"```"
"Let's fetch the workflow outputs."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3e778b53",
"id": "ab24b1c0",
"metadata": {},
"outputs": [],
"source": [
"from feast_dataobjects import FeatureStore\n",
"fs = exe.raw_outputs.get('o0', FeatureStore)\n",
"model = exe.outputs['o1']"
"\n",
"# \"raw_outputs\" in FlyteRemote helps associate type to the output, and resolves Literals to Python objects.\n",
"# For example, a data class is returned as a marshmallow schema (serialized) when \"outputs\" is used but is returned as a data class when \"raw_outputs\" is used.\n",
"fs = synced_execution.raw_outputs.get(\"o0\", FeatureStore)\n",
"model = synced_execution.outputs['o1']"
]
},
{
"cell_type": "markdown",
"id": "545e00c9",
"id": "fcd55057",
"metadata": {},
"source": [
"#### Lets inspect the feature store configuration"
"Next, we inspect the feature store configuration and model. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a5dff70e",
"id": "d31c96a8",
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -172,16 +179,16 @@
},
{
"cell_type": "markdown",
"id": "20db4c76",
"id": "af8277d3",
"metadata": {},
"source": [
"#### Also, the model is now available locally as a JobLibSerialized file and can be downloaded and loaded"
"**NOTE** The output model is available locally as a JobLibSerialized file, which can be downloaded and loaded."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3747b3a3",
"id": "8a841e22",
"metadata": {
"scrolled": true
},
Expand All @@ -192,20 +199,25 @@
},
{
"cell_type": "markdown",
"id": "95ae557b",
"id": "7a535b4d",
"metadata": {},
"source": [
"## Step 5: Cool, Let's predict\n",
"So we have the model and a feature store!, how can you run predictions. Flytekit will automatically manage the IO for you and you can simply re-use the prediction function from the workflow."
"### Step 5: Cool, let's predict\n",
"\n",
"We now have the model and feature store with us! So, how can we generate predictions? We can simply re-use the `predict` function from the workflow; Flytekit will automatically manage the IO for us.\n",
"\n",
"**NOTE** We set a couple of environment variables to facilitate the AWS access."
]
},
{
"cell_type": "markdown",
"id": "77c20c5d",
"id": "dff58f63",
"metadata": {},
"source": [
"### Lets load some features from the online feature store\n",
"We are re-using the feature definition from the flyte workflow\n",
"#### Load features from an online feature store\n",
"\n",
"Let's re-use the feature definition from the Flyte workflow.\n",
"\n",
"```python\n",
"inference_point = fs.get_online_features(FEAST_FEATURES, [{\"Hospital Number\": \"533738\"}])\n",
"```"
Expand All @@ -214,28 +226,37 @@
{
"cell_type": "code",
"execution_count": null,
"id": "88abda57",
"id": "c7a2c3c4",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"from feast_workflow import predict, FEAST_FEATURES\n",
"\n",
"os.environ[\"FLYTE_AWS_ENDPOINT\"] = os.environ[\"FEAST_S3_ENDPOINT_URL\"] = \"http://localhost:30084/\"\n",
"os.environ[\"FLYTE_AWS_ACCESS_KEY_ID\"] = os.environ[\"AWS_ACCESS_KEY_ID\"] = \"minio\"\n",
"os.environ[\"FLYTE_AWS_SECRET_ACCESS_KEY\"] = os.environ[\"AWS_SECRET_ACCESS_KEY\"] = \"miniostorage\"\n",
"\n",
"inference_point = fs.get_online_features(FEAST_FEATURES, [{\"Hospital Number\": \"533738\"}])\n",
"\n",
"inference_point"
]
},
{
"cell_type": "markdown",
"id": "ef5745a6",
"id": "9a49e572",
"metadata": {},
"source": [
"### Now run a prediction\n",
"Notice how we are passing the serialized model and some loaded features"
"#### Generate a prediction\n",
"\n",
"Notice how we are passing the serialized model and some loaded features."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f0ff6c96",
"id": "e44c62e2",
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -244,21 +265,25 @@
},
{
"cell_type": "markdown",
"id": "1452207b",
"id": "b632e957",
"metadata": {},
"source": [
"## Done! \n",
"We can ofcourse observe the intermediates from the workflow, which we saw in the UI, we can also download any intermediate data.\n",
"### Next Steps \n",
"\n",
"We can, of course, observe the intermediates from the workflow in the UI and download the intermediate data.\n",
"\n",
"## Future\n",
"We want to further improve this experience, to allow for the same prediction method to run in your inference server and in a workflow. It is almost there now, but you need to remove the `model de-serialization` as this happens for the current predict method."
"### Future 🔮\n",
"\n",
"We want to improve the integration experience further to allow the `predict` function to run in an inference server and a workflow. We are almost there, but we need to remove `model de-serialization` in the `predict` method."
]
}
],
"metadata": {
"interpreter": {
"hash": "dc3fb656270592a65285897570586647c2377dd9205211f8c7206e5246caf1a6"
},
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"display_name": "Python 3.8.10 64-bit ('feast': pyenv)",
"name": "python3"
},
"language_info": {
Expand All @@ -271,7 +296,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.2"
"version": "3.8.10"
}
},
"nbformat": 4,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


class FlyteCustomProvider(LocalProvider):
def __init__(self, config: RepoConfig, repo_path):
def __init__(self, config: RepoConfig):
super().__init__(config)

def materialize_single_feature_view(
Expand Down Expand Up @@ -78,13 +78,17 @@ def _localize_feature_view(self, feature_view: FeatureView):

# Copy parquet file to a local file
file_source: FileSource = feature_view.batch_source
random_local_path = FlyteContext.current_context().file_access.get_random_local_path(file_source.path)
random_local_path = (
FlyteContext.current_context().file_access.get_random_local_path(
file_source.path
)
)
FlyteContext.current_context().file_access.get_data(
file_source.path,
random_local_path,
is_multipart=True,
)
feature_view.batch_source=FileSource(
feature_view.batch_source = FileSource(
path=random_local_path,
event_timestamp_column=file_source.event_timestamp_column,
)
Loading

0 comments on commit 6874282

Please sign in to comment.