Skip to content

Commit

Permalink
Feast integration (#410)
Browse files Browse the repository at this point in the history
* Initial version

Signed-off-by: Eduardo Apolinario <[email protected]>

* Add venv to dockerfile

Signed-off-by: Eduardo Apolinario <[email protected]>

* Rename feast integration dir

Signed-off-by: Eduardo Apolinario <[email protected]>

* Configure minio in the image

Signed-off-by: Eduardo Apolinario <[email protected]>

* Refactoring + retrieve offline features

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove all_together

Signed-off-by: Eduardo Apolinario <[email protected]>

* Attempt to add s3 credentials to image

Signed-off-by: Eduardo Apolinario <[email protected]>

* Fix s3 endpoint

Signed-off-by: Eduardo Apolinario <[email protected]>

* custom provider

Signed-off-by: Eduardo Apolinario <[email protected]>

* Transform FeatureView prior to executing queries

Signed-off-by: Eduardo Apolinario <[email protected]>

* Set PYTHONPATH

Signed-off-by: Eduardo Apolinario <[email protected]>

* Set PYTHONPATH to multiple values

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove "custom_provider" from path

Signed-off-by: Eduardo Apolinario <[email protected]>

* Replace minio endpoint

Signed-off-by: Eduardo Apolinario <[email protected]>

* Print env vars

Signed-off-by: Eduardo Apolinario <[email protected]>

* Set FEAST_S3_ENDPOINT_URL while building feature store

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove minio credentials from image

Signed-off-by: Eduardo Apolinario <[email protected]>

* Add aws env vars

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove mention to local provider

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove piping of registry object

Signed-off-by: Eduardo Apolinario <[email protected]>

* Create random path via FlyteContext

Signed-off-by: Eduardo Apolinario <[email protected]>

* Revert "Remove piping of registry object"

This reverts commit ccdf326.

Signed-off-by: Eduardo Apolinario <[email protected]>

* Clean up feature description and remove debugging statements

Signed-off-by: Eduardo Apolinario <[email protected]>

* Add tasks up to `train_model`

Signed-off-by: Eduardo Apolinario <[email protected]>

* Rename workflow

Signed-off-by: Eduardo Apolinario <[email protected]>

* Comment use of custom provider

Signed-off-by: Eduardo Apolinario <[email protected]>

* Rename workflow

Signed-off-by: Eduardo Apolinario <[email protected]>

* fix error in training

Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Eduardo Apolinario <[email protected]>

* Add TODO

Signed-off-by: Eduardo Apolinario <[email protected]>

* Import feature_eng tasks directly

Signed-off-by: Eduardo Apolinario <[email protected]>

* Add store_online task

Signed-off-by: Eduardo Apolinario <[email protected]>

* Copy remote file to a local file and replace batch_source in materialize

Signed-off-by: Eduardo Apolinario <[email protected]>

* Add some debugging statements and fix local execution parameter

Signed-off-by: Eduardo Apolinario <[email protected]>

* Add remaining steps to workflow

Signed-off-by: Eduardo Apolinario <[email protected]>

* Regenerate requirements files

Signed-off-by: Eduardo Apolinario <[email protected]>

* Regenerate requirements and put replacement of remote files back in custom provider

Signed-off-by: Eduardo Apolinario <[email protected]>

* Add more logging

Signed-off-by: Eduardo Apolinario <[email protected]>

* Regenerate requirements again

Signed-off-by: Eduardo Apolinario <[email protected]>

* Add workflow return type

Signed-off-by: Eduardo Apolinario <[email protected]>

* Include a directory prefix in the model filename

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove unused overrides in custom provider and comment use of localize_feature_view

Signed-off-by: Eduardo Apolinario <[email protected]>

* Add type transformer

Signed-off-by: Eduardo Apolinario <[email protected]>

* Pipe _Feature_Store to all interactions with feast

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove unnecessary override in custom provider

Signed-off-by: Eduardo Apolinario <[email protected]>

* Rearrange initialization of FeatureStore for better legibility

Signed-off-by: Eduardo Apolinario <[email protected]>

* Revert "Remove unnecessary override in custom provider"

This reverts commit 2808ba0.

Signed-off-by: Eduardo Apolinario <[email protected]>

* Use create_node to enforce order

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove unused function

Signed-off-by: Eduardo Apolinario <[email protected]>

* Guard env vars behind a check

Signed-off-by: Eduardo Apolinario <[email protected]>

* Expose inputs to workflow

Signed-off-by: Eduardo Apolinario <[email protected]>

* Task to build FeatureStore

Signed-off-by: Eduardo Apolinario <[email protected]>

* Do not guard env vars behind a check

Signed-off-by: Eduardo Apolinario <[email protected]>

* Experiment with converted_df

Signed-off-by: Eduardo Apolinario <[email protected]>

* Comments

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove commented code from type transformer

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove unused portion of sandbox.config

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove TODO

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove registry parameter from local execution

Signed-off-by: Eduardo Apolinario <[email protected]>

* No need for type transformers

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove mentions to type transformers

Signed-off-by: Eduardo Apolinario <[email protected]>

* Copy README.rst from #322

Signed-off-by: Eduardo Apolinario <[email protected]>

* Step 3 of guide on adding a new integration

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove extraneous print statement and turn comments into docstrings in custom provider

Signed-off-by: Eduardo Apolinario <[email protected]>

* Comments on README.rst

Signed-off-by: Eduardo Apolinario <[email protected]>

* Fix link to feast

Signed-off-by: Eduardo Apolinario <[email protected]>

* Fix serialization of feast_integration dir

Signed-off-by: Eduardo Apolinario <[email protected]>

Co-authored-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Samhita Alla <[email protected]>
  • Loading branch information
3 people authored Sep 20, 2021
1 parent 451e463 commit 40dc71f
Show file tree
Hide file tree
Showing 12 changed files with 875 additions and 1 deletion.
4 changes: 3 additions & 1 deletion .github/workflows/ghcr_push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ jobs:
path: case_studies/ml_training
- name: eda
path: case_studies/feature_engineering
- name: feast_integration
path: case_studies/feature_engineering
steps:
- uses: actions/checkout@v2
with:
Expand All @@ -64,4 +66,4 @@ jobs:
registry: ghcr.io
build_extra_args: "--compress=true --build-arg=tag=ghcr.io/${{ github.repository_owner }}/flytecookbook:${{ matrix.directory.name }}-${{ github.sha }}"
context: ./cookbook/${{ matrix.directory.path }}
dockerfile: ${{ matrix.directory.name }}/Dockerfile
dockerfile: ${{ matrix.directory.name }}/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
FROM python:3.8-buster

WORKDIR /root
ENV VENV /opt/venv
ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
ENV PYTHONPATH "$PYTHONPATH:/root/feast_integration"

# Install the AWS cli separately to prevent issues with boto being written over
RUN pip3 install awscli

# Virtual environment
RUN python3 -m venv ${VENV}
RUN ${VENV}/bin/pip install wheel
ENV PATH="${VENV}/bin:$PATH"

# Install Python dependencies
COPY feast_integration/requirements.txt /root/.
RUN ${VENV}/bin/pip install -r /root/requirements.txt

COPY feast_integration/sandbox.config /root/
COPY in_container.mk /root/Makefile

# Copy the actual co
COPY feast_integration/ /root/feast_integration/

# This tag is supplied by the build script and will be used to determine the version
# when registering tasks, workflows, and launch plans
ARG tag
ENV FLYTE_INTERNAL_IMAGE $tag

# Copy over the helper script that the SDK relies on
RUN cp ${VENV}/bin/flytekit_venv /usr/local/bin/
RUN chmod a+x /usr/local/bin/flytekit_venv

# Enable the virtualenv for this image. Note this relies on the VENV variable we've set in this image.
ENTRYPOINT ["/usr/local/bin/flytekit_venv"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
PREFIX=feast
include ../../../common/common.mk
include ../../../common/leaf.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
Feast Integration
-----------------

**Feature Engineering** off-late has become one of the most prominent topics in Machine Learning.
It is the process of transforming raw data into features that better represent the underlying problem to the predictive models, resulting in improved model accuracy on unseen data.

** `Feast<https://feast.dev/>`_ is an operational data system for managing and serving machine learning features to models in production.**

Flyte provides a way to train models and perform feature engineering as a single pipeline.
But, it provides no way to serve these features to production when the model matures and is ready to be served in production.

Flyte adds the capability of engineering the features and Feast provides the feature registry and online serving system. One thing that Flyte makes possible is incremental development of features and only turning on the sync to online stores when you are confident about the features

In this tutorial, we'll walk through how Feast can be used to store and retrieve features to train and test the model curated using the Flyte pipeline.

Dataset
=======
We'll be using the horse colic dataset wherein we'll determine if the lesion of the horse is surgical or not. This is a modified version of the original dataset.

The dataset will have the following columns:

.. list-table:: Horse Colic Features
:widths: 25 25 25 25 25

* - surgery
- Age
- Hospital Number
- rectal temperature
- pulse
* - respiratory rate
- temperature of extremities
- peripheral pulse
- mucous membranes
- capillary refill time
* - pain
- peristalsis
- abdominal distension
- nasogastric tube
- nasogastric reflux
* - nasogastric reflux PH
- rectal examination
- abdomen
- packed cell volume
- total protein
* - abdominocentesis appearance
- abdomcentesis total protein
- outcome
- surgical lesion
- timestamp

The horse colic dataset will be a compressed zip file consisting of the SQLite DB. For this example we just wanted a dataset that was available online, but this could be easily plugged into another dataset / data management system like Snowflake, Athena, Hive, BigQuery or Spark, all of those are supported by Flyte.

Takeaways
=========
The example we're trying to demonstrate is a simple feature engineering job that you can seamlessly construct with Flyte. Here's what the nitty-gritties are:

#. Source data is from SQL-like data sources
#. Procreated feature transforms
#. Ability to create a low-code platform
#. Feast integration
#. Serve features to production using Feast
#. TaskTemplate within an imperative workflow

.. tip::

If you're a data scientist, you needn't worry about the infrastructure overhead. Flyte provides an easy-to-use interface which looks just like a typical library.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import pandas
from feast.entity import Entity
from feast.feature_table import FeatureTable
from feast.feature_view import FeatureView
from feast.infra.local import LocalProvider
from feast.infra.offline_stores.file_source import FileSource
from feast.infra.offline_stores.offline_store import RetrievalJob
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.repo_config import RepoConfig
from flytekit.core.context_manager import FlyteContext
from tqdm import tqdm


class FlyteCustomProvider(LocalProvider):
def __init__(self, config: RepoConfig, repo_path):
super().__init__(config)

def materialize_single_feature_view(
self,
config: RepoConfig,
feature_view: FeatureView,
start_date: datetime,
end_date: datetime,
registry: Registry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
"""
Loads the latest feature values for a specific feature value from the offline store into the online store.
"""
self._localize_feature_view(feature_view)

super().materialize_single_feature_view(
config, feature_view, start_date, end_date, registry, project, tqdm_builder
)

def get_historical_features(
self,
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pandas.DataFrame, str],
registry: Registry,
project: str,
full_feature_names: bool,
) -> RetrievalJob:
"""
Returns a training dataframe from the offline store
"""
# We substitute the remote s3 file with a reference to a local file in each feature view being requested
for fv in feature_views:
self._localize_feature_view(fv)

return super().get_historical_features(
config,
feature_views,
feature_refs,
entity_df,
registry,
project,
full_feature_names,
)

def _localize_feature_view(self, feature_view: FeatureView):
"""
This function ensures that the `FeatureView` object points to files in the local disk
"""
if not isinstance(feature_view.batch_source, FileSource):
return
# Copy parquet file to a local file
file_source: FileSource = feature_view.batch_source
random_local_path = FlyteContext.current_context().file_access.get_random_local_path(file_source.path)
FlyteContext.current_context().file_access.get_data(
file_source.path,
random_local_path,
is_multipart=True,
)
feature_view.batch_source=FileSource(
path=random_local_path,
event_timestamp_column=file_source.event_timestamp_column,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from flytekit.configuration import aws
from datetime import datetime
import pandas as pd
import os
from typing import Type
from dataclasses import dataclass
from dataclasses_json import dataclass_json
from feast import repo_config
from feast.feature_store import FeatureStore
from feast.repo_config import RepoConfig
from flytekit import FlyteContext
from flytekit.core.type_engine import TypeEngine, TypeTransformer
from flytekit.models.literals import Literal, Scalar
from flytekit.models.types import LiteralType, SimpleType
from feast.infra.offline_stores.file import FileOfflineStoreConfig
from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig
from feast import FeatureStore as FeastFeatureStore
from google.protobuf.struct_pb2 import Struct
from google.protobuf.json_format import MessageToDict
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.feature_service import FeatureService


@dataclass_json
@dataclass
class FeatureStoreConfig:
registry_path: str
project: str
s3_bucket: str
online_store_path: str = 'online.db'


@dataclass_json
@dataclass
class FeatureStore:
config: FeatureStoreConfig

def _build_feast_feature_store(self):
os.environ["FEAST_S3_ENDPOINT_URL"] = aws.S3_ENDPOINT.get()
os.environ["AWS_ACCESS_KEY_ID"] = aws.S3_ACCESS_KEY_ID.get()
os.environ["AWS_SECRET_ACCESS_KEY"] = aws.S3_SECRET_ACCESS_KEY.get()

config = RepoConfig(
registry=f"s3://{self.config.s3_bucket}/{self.config.registry_path}",
project=self.config.project,
# Notice the use of a custom provider.
provider="custom_provider.provider.FlyteCustomProvider",
offline_store=FileOfflineStoreConfig(),
online_store=SqliteOnlineStoreConfig(path=self.config.online_store_path),
)
return FeastFeatureStore(config=config)

def apply(
self,
objects: Union[
Entity,
FeatureView,
FeatureService,
List[Union[FeatureView, Entity, FeatureService]],
],
) -> None:
fs = self._build_feast_feature_store()
fs.apply(objects)

# Applying also initializes the sqlite tables in the online store
FlyteContext.current_context().file_access.upload(self.config.online_store_path, f"s3://{self.config.s3_bucket}/{self.config.online_store_path}")

def get_historical_features(
self,
entity_df: Union[pd.DataFrame, str],
features: Optional[Union[List[str], FeatureService]] = None,
) -> pd.DataFrame:
fs = self._build_feast_feature_store()
retrieval_job = fs.get_historical_features(
entity_df=entity_df,
features=features,
)
return retrieval_job.to_df()

def materialize(
self,
start_date: datetime,
end_date: datetime,
feature_views: Optional[List[str]] = None,
) -> None:
FlyteContext.current_context().file_access.download(f"s3://{self.config.s3_bucket}/{self.config.online_store_path}", self.config.online_store_path)
fs = self._build_feast_feature_store()
fs.materialize(
start_date=start_date,
end_date=end_date,
)
FlyteContext.current_context().file_access.upload(self.config.online_store_path, f"s3://{self.config.s3_bucket}/{self.config.online_store_path}")

def get_online_features(
self,
features: Union[List[str], FeatureService],
entity_rows: List[Dict[str, Any]],
feature_refs: Optional[List[str]] = None,
full_feature_names: bool = False,
) -> Dict[str, Any]:
FlyteContext.current_context().file_access.download(f"s3://{self.config.s3_bucket}/{self.config.online_store_path}", self.config.online_store_path)
fs = self._build_feast_feature_store()

online_response = fs.get_online_features(features, entity_rows, feature_refs, full_feature_names)
return online_response.to_dict()
Loading

0 comments on commit 40dc71f

Please sign in to comment.