Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Feast's Jupyter notebook code #482

Merged
merged 3 commits into from
Dec 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,59 @@
"cells": [
{
"cell_type": "markdown",
"id": "56ad7306",
"id": "8711cbe4",
"metadata": {},
"source": [
"# Demo of running a Flyte + Feast, feature engineering and training pipeline\n",
"In this demo we will learn how to interact with Feast through Flyte. The goal will be to train a simple [Gaussian Naive Bayes model using sklearn](https://scikit-learn.org/stable/modules/generated/sklearn.naive_bayes.GaussianNB.html) on the [Horse-Colic dataset from UCI](https://archive.ics.uci.edu/ml/datasets/Horse+Colic).\n",
"The model aims to classify if the lesion of the horse is surgical or not. This is a modified version of the original dataset.\n",
"# Demo showcasing Flyte & Feast Integration—Feature Engineering and Training Pipeline\n",
"\n",
"In this demo, we will learn how to interact with Feast through Flyte. The goal will be to train a simple [Gaussian Naive Bayes model using sklearn](https://scikit-learn.org/stable/modules/generated/sklearn.naive_bayes.GaussianNB.html) on the [Horse-Colic dataset from UCI](https://archive.ics.uci.edu/ml/datasets/Horse+Colic).\n",
"\n",
"The model aims to classify if the lesion of the horse is surgical or not. It uses a modified version of the original dataset.\n",
"\n",
"**NOTE**\n",
"We will not really dive into the dataset or the model, as the aim of this tutorial is to show how you can use Feast as the feature store and use Flyte to engineer the features that can be identical across your online predictions as well as offline training"
"We will not dive into the dataset or the model as the aim of this tutorial is to show how you can use Feast as a feature store and Flyte to engineer the features that can be identical across your online and offline training."
]
},
{
"cell_type": "markdown",
"id": "07dbb780",
"id": "f79f28b4",
"metadata": {},
"source": [
"## Step 1: Check out the code for the pipeline\n",
"We have used [flytekit](https://docs.flyte.org/projects/flytekit/en/latest/) flyte's python SDK to express the pipeline in pure python. The actual workflow code is auto-documented and rendered using sphinx [here](https://flyte--424.org.readthedocs.build/projects/cookbook/en/424/auto/case_studies/feature_engineering/feast_integration/index.html) *to be merged soon*"
"### Step 1: Code 💻\n",
"\n",
"We have used [Flytekit](https://docs.flyte.org/projects/flytekit/en/latest/)—Flyte's Python SDK to express the pipeline in pure Python. The actual workflow code is auto-documented and rendered using sphinx [here](https://docs.flyte.org/projects/cookbook/en/latest/auto/case_studies/feature_engineering/feast_integration/index.html)."
]
},
{
"cell_type": "markdown",
"id": "6ecc8e06",
"id": "8fcea449",
"metadata": {},
"source": [
"## Step 2: Launch an execution\n",
"We can use the [FlyteConsole](https://github.com/flyteorg/flyteconsole) to launch, monitor and introspect Flyte executions, but in this case we will use [flytekit.remote](https://docs.flyte.org/projects/flytekit/en/latest/design/control_plane.html) to interact with the Flyte backend.\n",
"### Step 2: Launch an execution 🚀\n",
"\n",
"We can use [FlyteConsole](https://github.com/flyteorg/flyteconsole) to launch, monitor, and introspect Flyte executions. However, in our case, we will use [flytekit.remote](https://docs.flyte.org/projects/flytekit/en/latest/design/control_plane.html) to interact with the Flyte backend.\n",
"\n",
"#### Set up Flytekit remote from config\n",
"\n",
"### Setup flytekit remote from config\n",
"To work with flytesandbox, we have created a simple local config that points to FlyteSandbox server and execution environment. We will initialize flytekit remote with this server. We will also pin it to one project and domain.\n",
"To work with Flyte-sandbox, we need to create a simple local config at `~/.flyte/config`\n",
"that points to Flyte-sandbox server and execution environment. We will initialize Flytekit remote with this server.\n",
"\n",
"**Note** this also sets up access to S3 or other equivalent datastores needed by FEAST"
"Example configuration:\n",
"```\n",
"[platform]\n",
"url = localhost:30081\n",
"insecure = True\n",
"```\n",
"\n",
"We will also pin FlyteRemote to one project and domain.\n",
"\n",
"**NOTE** The integration also sets up access to S3 or other equivalent datastores needed by FEAST."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f8dd18c7",
"id": "a2330891",
"metadata": {
"scrolled": true
},
Expand All @@ -51,119 +66,111 @@
},
{
"cell_type": "markdown",
"id": "9cd0b83e",
"id": "64d6295a",
"metadata": {},
"source": [
"### Retrieve the latest registered version of the pipeline\n",
"FlyteRemote provides convienient methods to retrieve a version of the pipeline from the remote server.\n",
"#### Retrieve the latest registered version of the pipeline\n",
"\n",
"FlyteRemote provides convienient methods to retrieve version of the pipeline from the remote server.\n",
"\n",
"**Note** It is possible to get a specific version of workflow and trigger a launch for that, but, we will just get the latest"
"**NOTE** It is possible to get a specific version of the workflow and trigger a launch for that, but let's just get the latest."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9ccda3d5",
"id": "d28014f8",
"metadata": {},
"outputs": [],
"source": [
"# from feast_integration.feast_workflow import feast_workflow\n",
"lp = remote.fetch_launch_plan(name=\"feast_integration.feast_workflow.feast_workflow\")\n",
"lp.id.version"
]
},
{
"cell_type": "markdown",
"id": "5e8ccc35",
"id": "c71210a7",
"metadata": {},
"source": [
"### Launch an execution\n",
"`remote.execute` makes it simple to start an execution for the launchplan. We will not provide any inputs and just use the default inputs"
"#### Launch\n",
"\n",
"`remote.execute` simplifies starting an execution for the launch plan. Let's use the default inputs."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a461fe08",
"id": "c13770fc",
"metadata": {},
"outputs": [],
"source": [
"exe = remote.execute(lp, inputs={})\n",
"print(f\"http://localhost:30081/console/projects/{exe.id.project}/domains/{exe.id.domain}/executions/{exe.id.name}\")"
"execution = remote.execute(lp, inputs={})\n",
"print(f\"http://localhost:30081/console/projects/{execution.id.project}/domains/{execution.id.domain}/executions/{execution.id.name}\")"
]
},
{
"cell_type": "markdown",
"id": "adb68140",
"id": "07bd9e37",
"metadata": {},
"source": [
"## Step 3: Now wait for the execution to complete\n",
"It is possible to launch a sync execution and wait for it to complete, but since all the processes are completely detached (you can even close your laptop) and come back to it later, we will show how to sync the execution back."
"### Step 3: Wait for the execution to complete \n",
"\n",
"It is possible to launch a sync execution and wait for it to complete, but since all the processes are completely detached (you can even close your laptop and come back to it later), we will show how to sync the execution back.\n",
"\n",
"**Side Note**\n",
"It is possible to fetch an existing execution or simply retrieve a started execution. Also, if you launch an execution with the same name, Flyte will respect that and not restart a new execution!"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "502711ef",
"id": "a8bd9614",
"metadata": {},
"outputs": [],
"source": [
"from flytekit.models.core.execution import WorkflowExecutionPhase\n",
"exe = remote.sync(exe)\n",
"print(f\"Execution {exe.id.name} is in Phase - {WorkflowExecutionPhase.enum_to_string(exe.closure.phase)}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9fda8da9",
"metadata": {},
"outputs": [],
"source": [
"exe.sync()"
"\n",
"synced_execution = remote.sync(execution)\n",
"print(f\"Execution {synced_execution.id.name} is in Phase - {WorkflowExecutionPhase.enum_to_string(synced_execution.closure.phase)}\")"
]
},
{
"cell_type": "markdown",
"id": "4c2074cc",
"id": "65e5b181",
"metadata": {},
"source": [
"## Step 4: Lets sync data from this execution\n",
"### Step 4: Retrieve output\n",
"\n",
"**Side Note**\n",
"It is possible to fetch an existing execution or simply retrieve a started execution. Also if you launch an execution with the same name, flyte will respect and not restart a new execution!\n",
"\n",
"To fetch an execution\n",
"```python\n",
"exe = remote.fetch_workflow_execution(name='f9f180a56e67b4c9781e')\n",
"exe = remote.sync(exe)\n",
"```"
"Let's fetch the workflow outputs."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3e778b53",
"id": "ab24b1c0",
"metadata": {},
"outputs": [],
"source": [
"from feast_dataobjects import FeatureStore\n",
"fs = exe.raw_outputs.get('o0', FeatureStore)\n",
"model = exe.outputs['o1']"
"\n",
"# \"raw_outputs\" in FlyteRemote helps associate type to the output, and resolves Literals to Python objects.\n",
"# For example, a data class is returned as a marshmallow schema (serialized) when \"outputs\" is used but is returned as a data class when \"raw_outputs\" is used.\n",
"fs = synced_execution.raw_outputs.get(\"o0\", FeatureStore)\n",
"model = synced_execution.outputs['o1']"
]
},
{
"cell_type": "markdown",
"id": "545e00c9",
"id": "fcd55057",
"metadata": {},
"source": [
"#### Lets inspect the feature store configuration"
"Next, we inspect the feature store configuration and model. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a5dff70e",
"id": "d31c96a8",
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -172,16 +179,16 @@
},
{
"cell_type": "markdown",
"id": "20db4c76",
"id": "af8277d3",
"metadata": {},
"source": [
"#### Also, the model is now available locally as a JobLibSerialized file and can be downloaded and loaded"
"**NOTE** The output model is available locally as a JobLibSerialized file, which can be downloaded and loaded."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3747b3a3",
"id": "8a841e22",
"metadata": {
"scrolled": true
},
Expand All @@ -192,20 +199,25 @@
},
{
"cell_type": "markdown",
"id": "95ae557b",
"id": "7a535b4d",
"metadata": {},
"source": [
"## Step 5: Cool, Let's predict\n",
"So we have the model and a feature store!, how can you run predictions. Flytekit will automatically manage the IO for you and you can simply re-use the prediction function from the workflow."
"### Step 5: Cool, let's predict\n",
"\n",
"We now have the model and feature store with us! So, how can we generate predictions? We can simply re-use the `predict` function from the workflow; Flytekit will automatically manage the IO for us.\n",
"\n",
"**NOTE** We set a couple of environment variables to facilitate the AWS access."
]
},
{
"cell_type": "markdown",
"id": "77c20c5d",
"id": "dff58f63",
"metadata": {},
"source": [
"### Lets load some features from the online feature store\n",
"We are re-using the feature definition from the flyte workflow\n",
"#### Load features from an online feature store\n",
"\n",
"Let's re-use the feature definition from the Flyte workflow.\n",
"\n",
"```python\n",
"inference_point = fs.get_online_features(FEAST_FEATURES, [{\"Hospital Number\": \"533738\"}])\n",
"```"
Expand All @@ -214,28 +226,37 @@
{
"cell_type": "code",
"execution_count": null,
"id": "88abda57",
"id": "c7a2c3c4",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"from feast_workflow import predict, FEAST_FEATURES\n",
"\n",
"os.environ[\"FLYTE_AWS_ENDPOINT\"] = os.environ[\"FEAST_S3_ENDPOINT_URL\"] = \"http://localhost:30084/\"\n",
"os.environ[\"FLYTE_AWS_ACCESS_KEY_ID\"] = os.environ[\"AWS_ACCESS_KEY_ID\"] = \"minio\"\n",
"os.environ[\"FLYTE_AWS_SECRET_ACCESS_KEY\"] = os.environ[\"AWS_SECRET_ACCESS_KEY\"] = \"miniostorage\"\n",
"\n",
"inference_point = fs.get_online_features(FEAST_FEATURES, [{\"Hospital Number\": \"533738\"}])\n",
"\n",
"inference_point"
]
},
{
"cell_type": "markdown",
"id": "ef5745a6",
"id": "9a49e572",
"metadata": {},
"source": [
"### Now run a prediction\n",
"Notice how we are passing the serialized model and some loaded features"
"#### Generate a prediction\n",
"\n",
"Notice how we are passing the serialized model and some loaded features."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f0ff6c96",
"id": "e44c62e2",
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -244,21 +265,25 @@
},
{
"cell_type": "markdown",
"id": "1452207b",
"id": "b632e957",
"metadata": {},
"source": [
"## Done! \n",
"We can ofcourse observe the intermediates from the workflow, which we saw in the UI, we can also download any intermediate data.\n",
"### Next Steps \n",
"\n",
"We can, of course, observe the intermediates from the workflow in the UI and download the intermediate data.\n",
"\n",
"## Future\n",
"We want to further improve this experience, to allow for the same prediction method to run in your inference server and in a workflow. It is almost there now, but you need to remove the `model de-serialization` as this happens for the current predict method."
"### Future 🔮\n",
"\n",
"We want to improve the integration experience further to allow the `predict` function to run in an inference server and a workflow. We are almost there, but we need to remove `model de-serialization` in the `predict` method."
]
}
],
"metadata": {
"interpreter": {
"hash": "dc3fb656270592a65285897570586647c2377dd9205211f8c7206e5246caf1a6"
},
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"display_name": "Python 3.8.10 64-bit ('feast': pyenv)",
"name": "python3"
},
"language_info": {
Expand All @@ -271,7 +296,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.2"
"version": "3.8.10"
}
},
"nbformat": 4,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


class FlyteCustomProvider(LocalProvider):
def __init__(self, config: RepoConfig, repo_path):
def __init__(self, config: RepoConfig):
super().__init__(config)

def materialize_single_feature_view(
Expand Down Expand Up @@ -78,13 +78,17 @@ def _localize_feature_view(self, feature_view: FeatureView):

# Copy parquet file to a local file
file_source: FileSource = feature_view.batch_source
random_local_path = FlyteContext.current_context().file_access.get_random_local_path(file_source.path)
random_local_path = (
FlyteContext.current_context().file_access.get_random_local_path(
file_source.path
)
)
FlyteContext.current_context().file_access.get_data(
file_source.path,
random_local_path,
is_multipart=True,
)
feature_view.batch_source=FileSource(
feature_view.batch_source = FileSource(
path=random_local_path,
event_timestamp_column=file_source.event_timestamp_column,
)
Loading