diff --git a/.github/workflows/ghcr_push.yml b/.github/workflows/ghcr_push.yml index dfd733e47..d6c60831d 100644 --- a/.github/workflows/ghcr_push.yml +++ b/.github/workflows/ghcr_push.yml @@ -27,8 +27,6 @@ jobs: path: integrations/kubernetes - name: kfpytorch path: integrations/kubernetes - - name: sqlite_datacleaning - path: case_studies/feature_engineering - name: sagemaker_training path: integrations/aws - name: sagemaker_pytorch @@ -41,6 +39,8 @@ jobs: path: integrations/flytekit_plugins - name: house_price_prediction path: case_studies/ml_training + - name: feast_integration + path: case_studies/feature_engineering steps: - uses: actions/checkout@v2 with: diff --git a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/Dockerfile b/cookbook/case_studies/feature_engineering/feast_integration/Dockerfile similarity index 83% rename from cookbook/case_studies/feature_engineering/sqlite_datacleaning/Dockerfile rename to cookbook/case_studies/feature_engineering/feast_integration/Dockerfile index 0491d9ddc..f13443d32 100644 --- a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/Dockerfile +++ b/cookbook/case_studies/feature_engineering/feast_integration/Dockerfile @@ -26,22 +26,20 @@ RUN python3.8 -m venv ${VENV} RUN ${VENV}/bin/pip install wheel # Install Python dependencies -COPY sqlite_datacleaning/requirements.txt /root +COPY feast_integration/requirements.txt /root RUN ${VENV}/bin/pip install -r /root/requirements.txt # Copy the makefile targets to expose on the container. This makes it easier to register. COPY in_container.mk /root/Makefile -COPY sqlite_datacleaning/sandbox.config /root +COPY feast_integration/sandbox.config /root # Copy the actual code -COPY sqlite_datacleaning/ /root/sqlite_datacleaning/ +COPY feast_integration/ /root/feast_integration/ # Copy over the helper script that the SDK relies on RUN cp ${VENV}/bin/flytekit_venv /usr/local/bin/ RUN chmod a+x /usr/local/bin/flytekit_venv -RUN pip install -U https://github.com/flyteorg/flytekit/archive/62391eaff894188bb723f382af3de29a977233ce.zip#egg=flytekit - # This tag is supplied by the build script and will be used to determine the version # when registering tasks, workflows, and launch plans ARG tag diff --git a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/Makefile b/cookbook/case_studies/feature_engineering/feast_integration/Makefile similarity index 70% rename from cookbook/case_studies/feature_engineering/sqlite_datacleaning/Makefile rename to cookbook/case_studies/feature_engineering/feast_integration/Makefile index 0cb45a5b1..d6731206a 100644 --- a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/Makefile +++ b/cookbook/case_studies/feature_engineering/feast_integration/Makefile @@ -1,3 +1,3 @@ -PREFIX=sqlite_datacleaning +PREFIX=feast_integration include ../../../common/Makefile include ../../../common/leaf.mk diff --git a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/README.rst b/cookbook/case_studies/feature_engineering/feast_integration/README.rst similarity index 62% rename from cookbook/case_studies/feature_engineering/sqlite_datacleaning/README.rst rename to cookbook/case_studies/feature_engineering/feast_integration/README.rst index ffe75ef51..b7bea4a1d 100644 --- a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/README.rst +++ b/cookbook/case_studies/feature_engineering/feast_integration/README.rst @@ -1,77 +1,70 @@ -Data Cleaning -------------- -Feature Engineering off-late has become one of the most prominent topics in Machine Learning. -It is the process of transforming raw data into features that better represent the underlying problem to the predictive models, resulting in improved model accuracy on unseen data. - -This tutorial will implement data cleaning of SQLite3 data, which does both data imputation and univariate feature selection. These are so-called feature engineering techniques. - -Why SQLite3? -============ -SQLite3 is written such that the task doesn't depend on the user's image. It basically: +Feast Integration +----------------- -- Shifts the burden of writing the Dockerfile from the user using the task in workflows, to the author of the task type -- Allows the author to optimize the image that the task runs -- Works locally and remotely - -.. note:: - - SQLite3 container is special; the definition of the Python classes themselves is bundled in Flytekit, hence we just use the Flytekit image. - -.. tip:: +**Feature Engineering** off-late has become one of the most prominent topics in Machine Learning. +It is the process of transforming raw data into features that better represent the underlying problem to the predictive models, resulting in improved model accuracy on unseen data. - SQLite3 is being used to showcase the example of using a ``TaskTemplate``. This is the same for SQLAlchemy. As for Athena, BigQuery, Hive plugins, a container is not required. The queries are registered with FlyteAdmin and sent directly to the respective engines. +**Feast (Feature Store) is an operational data system for managing and serving machine learning features to models in production.** -Where does Flyte fit in? -======================== Flyte provides a way to train models and perform feature engineering as a single pipeline. +But, it provides no way to serve these features to production when the model matures and is ready to be served in production. -.. admonition:: What's so special about this example? +This is where the integration between Flyte and Feast can help users take their models and features from prototyping all the way to production cost-effectively and efficiently. 🚀 - The pipeline doesn't build a container as such; it re-uses the pre-built task containers to construct the workflow! +In this tutorial, we'll walk through how Feast can be used to store and retrieve features to train and test the model curated using the Flyte pipeline. Dataset ======= -We'll be using the horse colic dataset wherein we'll determine if the lesion of the horse was surgical or not. This is a modified version of the original dataset. +We'll be using the horse colic dataset wherein we'll determine if the lesion of the horse is surgical or not. This is a modified version of the original dataset. The dataset will have the following columns: .. list-table:: Horse Colic Features - :widths: 25 25 25 + :widths: 25 25 25 25 25 * - surgery - Age - Hospital Number - * - rectal temperature + - rectal temperature - pulse - - respiratory rate - * - temperature of extremities + * - respiratory rate + - temperature of extremities - peripheral pulse - mucous membranes - * - capillary refill time - - pain + - capillary refill time + * - pain - peristalsis - * - abdominal distension + - abdominal distension - nasogastric tube - nasogastric reflux * - nasogastric reflux PH - rectal examination - abdomen - * - packed cell volume + - packed cell volume - total protein - - abdominocentesis appearance - * - abdomcentesis total protein + * - abdominocentesis appearance + - abdomcentesis total protein - outcome - surgical lesion + - timestamp The horse colic dataset will be a compressed zip file consisting of the SQLite DB. -Steps to Build the Pipeline -=========================== -- Define two feature engineering tasks -- "data imputation" and "univariate feature selection" -- Reference the tasks in the actual file -- Define an SQLite3 Task and generate FlyteSchema -- Pass the inputs through an imperative workflow to validate the dataset -- Return the resultant DataFrame +Why SQLite3? +^^^^^^^^^^^^ +SQLite3 is written such that the task doesn't depend on the user's image. It basically: + +- Shifts the burden of writing the Dockerfile from the user using the task in workflows, to the author of the task type +- Allows the author to optimize the image that the task runs +- Works locally and remotely + +.. note:: + + SQLite3 container is special; the definition of the Python classes themselves is bundled in Flytekit, hence we just use the Flytekit image. + +.. tip:: + + SQLite3 is being used to showcase the example of using a ``TaskTemplate``. This is the same for SQLAlchemy. As for Athena, BigQuery, Hive plugins, a container is not required. The queries are registered with FlyteAdmin and sent directly to the respective engines. Takeaways ========= @@ -80,11 +73,11 @@ The example we're trying to demonstrate is a simple feature engineering job that #. Source data is from SQL-like data sources #. Procreated feature transforms #. Ability to create a low-code platform +#. Feast integration +#. Serve features to production using Feast #. TaskTemplate within an imperative workflow .. tip:: If you're a data scientist, you needn't worry about the infrastructure overhead. Flyte provides an easy-to-use interface which looks just like a typical library. -Code Walkthrough -================ diff --git a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/__init__.py b/cookbook/case_studies/feature_engineering/feast_integration/__init__.py similarity index 100% rename from cookbook/case_studies/feature_engineering/sqlite_datacleaning/__init__.py rename to cookbook/case_studies/feature_engineering/feast_integration/__init__.py diff --git a/cookbook/case_studies/feature_engineering/feast_integration/feast_wf.py b/cookbook/case_studies/feature_engineering/feast_integration/feast_wf.py new file mode 100644 index 000000000..708e13a05 --- /dev/null +++ b/cookbook/case_studies/feature_engineering/feast_integration/feast_wf.py @@ -0,0 +1,349 @@ +""" +Flyte Pipeline with Feast +------------------------- + +This workflow makes use of the feature engineering tasks defined in the other file. We'll build an end-to-end Flyte pipeline utilizing "Feast". + +Here the step-by-step process: + +* Fetch the SQLite3 data as a data frame +* Perform mean-median-imputation +* Store the updated features in an offline store +* Retrieve the features from an offline store +* Perform univariate-feature-selection +* Train a Naive Bayes model +* Load features into an online store +* Fetch one feature vector for inference +* Generate prediction + +.. tip:: + + You can simply import the feature engineering tasks, but we use references because we are referring to the existing code. + +For Feast to work, make sure ``feature_store.yaml`` file is present in ``feast_repo_path`` workflow input. + +.. code-block:: yaml + :linenos: + + project: feature_engineering + registry: data/registry.db + provider: local + online_store: + path: data/online_store.db +""" +# %% +# Let's import the libraries. +import logging +import random +import typing +from datetime import datetime, timedelta + +import joblib +import pandas as pd +from feast import Entity, Feature, FeatureView, FileSource, ValueType +from feast.feature_store import FeatureStore +from flytekit import Workflow, reference_task, task +from flytekit.extras.sqlite3.task import SQLite3Config, SQLite3Task +from flytekit.types.file import JoblibSerializedFile +from flytekit.types.schema import FlyteSchema +from sklearn.model_selection import train_test_split +from sklearn.naive_bayes import GaussianNB + +# %% +# We define the necessary data holders. +logger = logging.getLogger(__file__) +FEAST_FEATURES = [ + "horse_colic_stats:rectal temperature", + "horse_colic_stats:total protein", + "horse_colic_stats:peripheral pulse", + "horse_colic_stats:surgical lesion", + "horse_colic_stats:abdominal distension", + "horse_colic_stats:nasogastric tube", + "horse_colic_stats:outcome", + "horse_colic_stats:packed cell volume", + "horse_colic_stats:nasogastric reflux PH", +] +DATABASE_URI = "https://cdn.discordapp.com/attachments/545481172399030272/861575373783040030/horse_colic.db.zip" +DATA_CLASS = "surgical lesion" + +# %% +# Next, we define the reference tasks. A :py:func:`flytekit.reference_task` references the Flyte tasks that have already been defined, serialized, and registered. +# The primary advantage of using a reference task is to reduce the redundancy; we needn't define the task(s) again if we have multiple datasets that need to be feature-engineered. +@reference_task( + project="flytesnacks", + domain="development", + name="feast_integration.feature_eng_tasks.mean_median_imputer", + version="v1", +) +def mean_median_imputer( + dataframe: pd.DataFrame, + imputation_method: str, +) -> FlyteSchema: + ... + + +@reference_task( + project="flytesnacks", + domain="development", + name="feast_integration.feature_eng_tasks.univariate_selection", + version="v1", +) +def univariate_selection( + dataframe: pd.DataFrame, num_features: int, data_class: str, feature_view_name: str +) -> pd.DataFrame: + ... + + +# %% +# .. note:: +# +# The ``version`` varies depending on the version assigned during the task registration process. + +# %% +# We define a task to set the ``dtype`` of the ``timestamp`` column to ``datetime``. +@task +def set_dtype(df: FlyteSchema, column_name: str) -> FlyteSchema: + # convert string to datetime in the data frame + df = df.open().all() + df[column_name] = pd.to_datetime(df[column_name]) + return df + + +# %% +# We define two tasks, namely ``store_offline`` and ``retrieve_offline`` to store and retrieve the historial features. +# +# .. list-table:: Decoding the ``Feast`` Jargon +# :widths: 25 25 +# +# * - ``FeatureStore`` +# - A FeatureStore object is used to define, create, and retrieve features. +# * - ``Entity`` +# - Represents a collection of entities and associated metadata. It's usually the primary key of your data. +# * - ``FeatureView`` +# - A FeatureView defines a logical grouping of serveable features. +# * - ``FileSource`` +# - File data sources allow for the retrieval of historical feature values from files on disk for building training datasets, as well as for materializing features into an online store. +# * - ``fs.apply()`` +# - Register objects to metadata store and update related infrastructure. +# * - ``get_historical_features()`` +# - Enrich an entity dataframe with historical feature values for either training or batch scoring. +@task +def store_offline(dataframe: FlyteSchema, repo_path: str) -> (str, str): + fs = FeatureStore(repo_path=repo_path) + horse_colic_entity = Entity(name="Hospital Number", value_type=ValueType.STRING) + + horse_colic_feature_view = FeatureView( + name="horse_colic_stats", + entities=["Hospital Number"], + features=[ + Feature(name="rectal temperature", dtype=ValueType.FLOAT), + Feature(name="total protein", dtype=ValueType.FLOAT), + Feature(name="peripheral pulse", dtype=ValueType.FLOAT), + Feature(name="surgical lesion", dtype=ValueType.STRING), + Feature(name="abdominal distension", dtype=ValueType.FLOAT), + Feature(name="nasogastric tube", dtype=ValueType.STRING), + Feature(name="outcome", dtype=ValueType.STRING), + Feature(name="packed cell volume", dtype=ValueType.FLOAT), + Feature(name="nasogastric reflux PH", dtype=ValueType.FLOAT), + ], + input=FileSource( + path=str(dataframe.remote_path), + event_timestamp_column="timestamp", + ), + ttl=timedelta(days=1), + ) + + fs.apply([horse_colic_entity, horse_colic_feature_view]) + return repo_path, horse_colic_feature_view.name + + +@task +def retrieve_offline(repo_path: str) -> pd.DataFrame: + fs = FeatureStore(repo_path=repo_path) + entity_df = pd.DataFrame.from_dict( + { + "Hospital Number": [ + "530101", + "5290409", + "5291329", + "530051", + "529518", + "530101", + "529340", + "5290409", + "530034", + ], + "event_timestamp": [ + datetime(2021, 6, 25, 16, 36, 27), + datetime(2021, 6, 25, 16, 36, 27), + datetime(2021, 6, 25, 16, 36, 27), + datetime(2021, 6, 25, 16, 36, 27), + datetime(2021, 6, 25, 16, 36, 27), + datetime(2021, 7, 5, 11, 36, 1), + datetime(2021, 6, 25, 16, 36, 27), + datetime(2021, 7, 5, 11, 50, 40), + datetime(2021, 6, 25, 16, 36, 27), + ], + } + ) + + retrieval_job = fs.get_historical_features( + entity_df=entity_df, + feature_refs=FEAST_FEATURES, + ) + + feature_data = retrieval_job.to_df() + return feature_data + + +# %% +# Next, we train the Naive Bayes model using the data that's been fetched from the feature store. +@task +def train_model( + dataset: pd.DataFrame, data_class: str, feature_view_name: str +) -> JoblibSerializedFile: + X_train, _, y_train, _ = train_test_split( + dataset, + dataset[feature_view_name + "__" + data_class], + test_size=0.33, + random_state=42, + ) + model = GaussianNB() + model.fit(X_train, y_train) + model.feature_names = list(X_train.columns.values) + fname = "model.joblib.dat" + joblib.dump(model, fname) + return fname + + +# %% +# To perform inferencing, we define two tasks: ``store_online`` and ``retrieve_online``. +# +# .. list-table:: Decoding the ``Feast`` Jargon +# :widths: 25 25 +# +# * - ``materialize()`` +# - Materialize data from the offline store into the online store. +# * - ``get_online_features()`` +# - Retrieves the latest online feature data. +# +# .. note:: +# One key difference between the online store and data source is that only the latest feature values are stored per entity key. No historical values are stored. +# Our dataset has two such entries with the same ``Hospital Number`` but different time stamps. Only data point with the latest timestamp is picked from the online store. +@task +def store_online(repo_path: str) -> str: + store = FeatureStore(repo_path=repo_path) + store.materialize( + start_date=datetime.utcnow() - timedelta(days=50), + end_date=datetime.utcnow() - timedelta(minutes=10), + ) + return repo_path + + +@task +def retrieve_online( + repo_path: str, dataset: pd.DataFrame +) -> typing.Dict[str, typing.List[str]]: + store = FeatureStore(repo_path=repo_path) + feature_refs = FEAST_FEATURES + + inference_data = random.choice(dataset["Hospital Number"]) + logger.info(f"Hospital Number chosen for inference is: {inference_data}") + + entity_rows = [{"Hospital Number": inference_data}] + + online_response = store.get_online_features(feature_refs, entity_rows) + online_response_dict = online_response.to_dict() + return online_response_dict + + +# %% +# We define a task to test the model using the inference point fetched earlier. +@task +def test_model( + model_ser: JoblibSerializedFile, + inference_point: typing.Dict[str, typing.List[str]], +) -> typing.List[str]: + + # Load model + model = joblib.load(model_ser) + f_names = model.feature_names + + test_list = [] + for each_name in f_names: + test_list.append(inference_point[each_name][0]) + prediction = model.predict([test_list]) + return prediction + + +# %% +# Finally we define the workflow that streamlines the whole pipeline building and feature serving process. +# We're using an ``imperative-style workflow`` to convert the step-by-step text into Flyte-compatible pipeline. +wb = Workflow(name="feast_integration.workflow.fe_wf") +wb.add_workflow_input("imputation_method", str) +wb.add_workflow_input("feast_repo_path", str) +wf_in = wb.add_workflow_input("num_features_univariate", int) + +sql_task = SQLite3Task( + name="sqlite3.horse_colic", + query_template="select * from data", + output_schema_type=FlyteSchema, + task_config=SQLite3Config( + uri=DATABASE_URI, + compressed=True, + ), +) + +node_t1 = wb.add_entity(sql_task) + +node_t2 = wb.add_entity( + mean_median_imputer, + dataframe=node_t1.outputs["results"], + imputation_method=wb.inputs["imputation_method"], +) + +node_t3 = wb.add_entity(set_dtype, df=node_t2.outputs["o0"], column_name="timestamp") + +node_t4 = wb.add_entity( + store_offline, + dataframe=node_t3.outputs["o0"], + repo_path=wb.inputs["feast_repo_path"], +) +node_t5 = wb.add_entity(retrieve_offline, repo_path=node_t4.outputs["o0"]) + +node_t6 = wb.add_entity( + univariate_selection, + dataframe=node_t5.outputs["o0"], + data_class=DATA_CLASS, + num_features=wf_in, + feature_view_name=node_t4.outputs["o1"], +) + +node_t7 = wb.add_entity( + train_model, + dataset=node_t6.outputs["o0"], + data_class=DATA_CLASS, + feature_view_name=node_t4.outputs["o1"], +) + +node_t8 = wb.add_entity(store_online, repo_path=node_t4.outputs["o0"]) +node_t9 = wb.add_entity( + retrieve_online, repo_path=node_t8.outputs["o0"], dataset=node_t1.outputs["results"] +) + +node_t10 = wb.add_entity( + test_model, + model_ser=node_t7.outputs["o0"], + inference_point=node_t9.outputs["o0"], +) + +wb.add_workflow_output( + "output_from_t10", node_t10.outputs["o0"], python_type=typing.List[str] +) + +if __name__ == "__main__": + + wb(imputation_method="mean", num_features_univariate=7, feast_repo_path=".") + +# %% +# You should see prediction against the test input as the workflow output. diff --git a/cookbook/case_studies/feature_engineering/feast_integration/feature_eng_tasks.py b/cookbook/case_studies/feature_engineering/feast_integration/feature_eng_tasks.py new file mode 100644 index 000000000..f880d747c --- /dev/null +++ b/cookbook/case_studies/feature_engineering/feast_integration/feature_eng_tasks.py @@ -0,0 +1,82 @@ +""" +Feature Engineering Tasks +------------------------- + +We'll define the relevant feature engineering tasks to clean up the SQLite3 data. +""" + +# %% +# First, let's import the required libraries. +import numpy as np +import pandas as pd +from flytekit import task +from flytekit.types.schema import FlyteSchema +from numpy.core.fromnumeric import sort +from sklearn.feature_selection import SelectKBest, f_classif +from sklearn.impute import SimpleImputer + +# %% +# There are a specific set of columns for which imputation isn't required. We ignore them. +NO_IMPUTATION_COLS = [ + "Hospital Number", + "surgery", + "Age", + "outcome", + "surgical lesion", + "timestamp", +] + +# %% +# Next, we define a ``mean_median_imputer`` task to fill in the missing values of the dataset, for which we use `SimpleImputer `__ class from the ``scikit-learn`` library. +@task +def mean_median_imputer( + dataframe: pd.DataFrame, + imputation_method: str, +) -> FlyteSchema: + + dataframe = dataframe.replace("?", np.nan) + if imputation_method not in ["median", "mean"]: + raise ValueError("imputation_method takes only values 'median' or 'mean'") + + imputer = SimpleImputer(missing_values=np.nan, strategy=imputation_method) + + imputer = imputer.fit( + dataframe[dataframe.columns[~dataframe.columns.isin(NO_IMPUTATION_COLS)]] + ) + dataframe[ + dataframe.columns[~dataframe.columns.isin(NO_IMPUTATION_COLS)] + ] = imputer.transform( + dataframe[dataframe.columns[~dataframe.columns.isin(NO_IMPUTATION_COLS)]] + ) + return dataframe + + +# %% +# Let's define the other task called ``univariate_selection`` that does feature selection. +# The `SelectKBest `__ method removes all but the highest scoring features (data frame columns). +@task +def univariate_selection( + dataframe: pd.DataFrame, num_features: int, data_class: str, feature_view_name: str +) -> pd.DataFrame: + + # Remove ``timestamp`` and ``Hospital Number`` columns as they ought to be present in the dataset + dataframe = dataframe.drop(["event_timestamp", "Hospital Number"], axis=1) + + if num_features > 9: + raise ValueError( + f"Number of features must be <= 9; you've given {num_features}" + ) + + X = dataframe.iloc[:, dataframe.columns != data_class] + y = dataframe.loc[:, feature_view_name + "__" + data_class] + test = SelectKBest(score_func=f_classif, k=num_features) + fit = test.fit(X, y) + indices = sort((-fit.scores_).argsort()[:num_features]) + column_names = map(dataframe.columns.__getitem__, indices) + features = fit.transform(X) + final_df = pd.DataFrame(features, columns=column_names) + return final_df + + +# %% +# The aforementioned feature engineering tasks are used as ``reference tasks`` while building the Flyte pipeline with Feast. diff --git a/cookbook/case_studies/feature_engineering/feast_integration/feature_store.yaml b/cookbook/case_studies/feature_engineering/feast_integration/feature_store.yaml new file mode 100644 index 000000000..86a481275 --- /dev/null +++ b/cookbook/case_studies/feature_engineering/feast_integration/feature_store.yaml @@ -0,0 +1,5 @@ +project: feature_engineering +registry: data/registry.db +provider: local +online_store: + path: data/online_store.db \ No newline at end of file diff --git a/cookbook/case_studies/feature_engineering/feast_integration/requirements.in b/cookbook/case_studies/feature_engineering/feast_integration/requirements.in new file mode 100644 index 000000000..d77fa95fd --- /dev/null +++ b/cookbook/case_studies/feature_engineering/feast_integration/requirements.in @@ -0,0 +1,4 @@ +flytekit>=0.20.0 +scikit-learn +numpy +feast \ No newline at end of file diff --git a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/requirements.txt b/cookbook/case_studies/feature_engineering/feast_integration/requirements.txt similarity index 55% rename from cookbook/case_studies/feature_engineering/sqlite_datacleaning/requirements.txt rename to cookbook/case_studies/feature_engineering/feast_integration/requirements.txt index da9b27029..d1346376e 100644 --- a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/requirements.txt +++ b/cookbook/case_studies/feature_engineering/feast_integration/requirements.txt @@ -1,17 +1,25 @@ # -# This file is autogenerated by pip-compile +# This file is autogenerated by pip-compile with python 3.8 # To update, run: # # pip-compile requirements.in # attrs==21.2.0 - # via scantree + # via + # jsonschema + # scantree +cachetools==4.2.2 + # via google-auth certifi==2021.5.30 # via requests chardet==4.0.0 # via requests click==7.1.2 - # via flytekit + # via + # feast + # flytekit +colorama==0.4.4 + # via feast croniter==1.0.13 # via flytekit dataclasses-json==0.5.4 @@ -24,26 +32,53 @@ dirhash==0.2.1 # via flytekit docker-image-py==0.1.10 # via flytekit -flyteidl==0.19.2 +fastavro==1.4.2 + # via + # feast + # pandavro +feast==0.11.0 + # via -r requirements.in +flyteidl==0.19.11 # via flytekit -flytekit==0.19.1 +flytekit==0.20.0 # via -r requirements.in +google-api-core==1.30.0 + # via feast +google-auth==1.32.1 + # via google-api-core +googleapis-common-protos==1.52.0 + # via + # feast + # google-api-core grpcio==1.38.0 - # via flytekit + # via + # feast + # flytekit idna==2.10 # via requests importlib-metadata==4.5.0 # via keyring +jinja2==3.0.1 + # via feast joblib==1.0.1 # via scikit-learn +jsonschema==3.2.0 + # via feast keyring==23.0.1 # via flytekit -marshmallow-enum==1.5.1 - # via dataclasses-json +markupsafe==2.0.1 + # via jinja2 marshmallow==3.12.1 # via # dataclasses-json # marshmallow-enum + # marshmallow-jsonschema +marshmallow-enum==1.5.1 + # via dataclasses-json +marshmallow-jsonschema==0.12.0 + # via flytekit +mmh3==3.0.0 + # via feast mypy-extensions==0.4.3 # via typing-inspect natsort==7.1.1 @@ -52,42 +87,75 @@ numpy==1.20.3 # via # -r requirements.in # pandas + # pandavro # pyarrow # scikit-learn # scipy +packaging==21.0 + # via google-api-core pandas==1.2.4 - # via flytekit + # via + # feast + # flytekit + # pandavro +pandavro==1.5.2 + # via feast pathspec==0.8.1 # via scantree protobuf==3.17.3 # via + # feast # flyteidl # flytekit + # google-api-core + # googleapis-common-protos py==1.10.0 # via retry pyarrow==3.0.0 - # via flytekit + # via + # feast + # flytekit +pyasn1==0.4.8 + # via + # pyasn1-modules + # rsa +pyasn1-modules==0.2.8 + # via google-auth +pydantic==1.8.2 + # via feast +pyparsing==2.4.7 + # via packaging +pyrsistent==0.18.0 + # via jsonschema python-dateutil==2.8.1 # via # croniter # flytekit # pandas +python-json-logger==2.0.1 + # via flytekit pytimeparse==1.1.8 # via flytekit pytz==2018.4 # via # flytekit + # google-api-core # pandas +pyyaml==5.3.1 + # via feast regex==2021.4.4 # via docker-image-py requests==2.25.1 # via # flytekit + # google-api-core # responses responses==0.13.3 # via flytekit retry==0.9.2 # via flytekit +rsa==4.7.2 + # via google-auth scantree==0.0.1 # via dirhash scikit-learn==0.24.2 @@ -97,21 +165,36 @@ scipy==1.6.3 six==1.16.0 # via # flytekit + # google-api-core + # google-auth # grpcio + # jsonschema + # pandavro # protobuf # python-dateutil # responses # scantree + # tenacity sortedcontainers==2.4.0 # via flytekit statsd==3.3.0 # via flytekit stringcase==1.2.0 # via dataclasses-json +tabulate==0.8.9 + # via feast +tenacity==7.0.0 + # via feast threadpoolctl==2.1.0 # via scikit-learn +toml==0.10.2 + # via feast +tqdm==4.61.2 + # via feast typing-extensions==3.10.0.0 - # via typing-inspect + # via + # pydantic + # typing-inspect typing-inspect==0.7.1 # via dataclasses-json urllib3==1.25.11 @@ -127,3 +210,6 @@ wrapt==1.12.1 # flytekit zipp==3.4.1 # via importlib-metadata + +# The following packages are considered to be unsafe in a requirements file: +# setuptools diff --git a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/sandbox.config b/cookbook/case_studies/feature_engineering/feast_integration/sandbox.config similarity index 82% rename from cookbook/case_studies/feature_engineering/sqlite_datacleaning/sandbox.config rename to cookbook/case_studies/feature_engineering/feast_integration/sandbox.config index fc6f3e717..60072fca0 100644 --- a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/sandbox.config +++ b/cookbook/case_studies/feature_engineering/feast_integration/sandbox.config @@ -1,5 +1,5 @@ [sdk] -workflow_packages=sqlite_datacleaning +workflow_packages=feast_integration python_venv=flytekit_venv [auth] diff --git a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/datacleaning_tasks.py b/cookbook/case_studies/feature_engineering/sqlite_datacleaning/datacleaning_tasks.py deleted file mode 100644 index 9cc28be69..000000000 --- a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/datacleaning_tasks.py +++ /dev/null @@ -1,58 +0,0 @@ -""" -Data Cleaning Tasks -------------------------- - -We'll define the relevant feature engineering tasks to clean up the SQLite3 data. -""" - -# %% -# Firstly, let's import the required libraries. -import numpy as np -import pandas as pd -from flytekit import task -from numpy.core.fromnumeric import sort -from sklearn.feature_selection import SelectKBest, f_classif -from sklearn.impute import SimpleImputer - -# %% -# Next, we define a ``mean_median_imputer`` task to fill in the missing values of the dataset, for which we use `SimpleImputer `__ class picked from the ``scikit-learn`` library. -@task -def mean_median_imputer( - dataframe: pd.DataFrame, - imputation_method: str, -) -> pd.DataFrame: - - dataframe = dataframe.replace("?", np.nan) - if imputation_method not in ["median", "mean"]: - raise ValueError("imputation_method takes only values 'median' or 'mean'") - - imputer = SimpleImputer(missing_values=np.nan, strategy=imputation_method) - - imputer = imputer.fit(dataframe) - dataframe[:] = imputer.transform(dataframe) - - return dataframe - -# %% -# This task returns the filled-in dataframe. - -# %% -# Let's define one other task called ``univariate_selection`` that does feature selection. -# The `SelectKBest `__ method removes all but the highest scoring features. - -@task -def univariate_selection( - dataframe: pd.DataFrame, split_mask: int, num_features: int -) -> pd.DataFrame: - - X = dataframe.iloc[:, 0:split_mask] - y = dataframe.iloc[:, split_mask] - test = SelectKBest(score_func=f_classif, k=num_features) - fit = test.fit(X, y) - indices = sort((-fit.scores_).argsort()[:num_features]) - column_names = map(dataframe.columns.__getitem__, indices) - features = fit.transform(X) - return pd.DataFrame(features, columns=column_names) - -# %% -# This task returns a dataframe with the specified number of columns. diff --git a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/datacleaning_workflow.py b/cookbook/case_studies/feature_engineering/sqlite_datacleaning/datacleaning_workflow.py deleted file mode 100644 index c69458692..000000000 --- a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/datacleaning_workflow.py +++ /dev/null @@ -1,113 +0,0 @@ -""" -Data Cleaning Imperative Workflow ---------------------------------------- - -This workflow makes use of the feature engineering tasks defined in the other file. -We'll build an SQLite3 data cleaning pipeline utilizing these tasks. - -.. tip:: - - You can simply import the tasks, but we use references because we are referring to the existing code. -""" - -# %% -# Let's import the libraries. -import pandas as pd -from flytekit import CronSchedule, LaunchPlan, Workflow, kwtypes, reference_task -from flytekit.extras.sqlite3.task import SQLite3Config, SQLite3Task -from flytekit.types.schema import FlyteSchema - -# %% -# Next, we define the reference tasks. A :py:func:`flytekit.reference_task` references the Flyte tasks that have already been defined, serialized, and registered. -# The primary advantage of using a reference task is to reduce the redundancy; we needn't define the task(s) again if we have multiple datasets that need to be feature-engineered. -@reference_task( - project="flytesnacks", - domain="development", - name="sqlite_datacleaning.tasks.mean_median_imputer", - version="fast4f51f7895819256f2540a08c97a51194", -) -def mean_median_imputer( - dataframe: pd.DataFrame, - imputation_method: str, -) -> pd.DataFrame: - ... - - -@reference_task( - project="flytesnacks", - domain="development", - name="sqlite_datacleaning.tasks.univariate_selection", - version="fast4f51f7895819256f2540a08c97a51194", -) -def univariate_selection( - dataframe: pd.DataFrame, - split_mask: int, - num_features: int, -) -> pd.DataFrame: - ... - - -# %% -# .. note:: -# -# The ``version`` varies depending on the version assigned during the task registration process. - -# %% -# Finally, we define an imperative workflow that accepts the two reference tasks we've prototyped above. The data flow can be interpreted as follows: -# -# #. An SQLite3 task is defined to fetch the data batch -# #. The output (FlyteSchema) is passed to the ``mean_median_imputer`` task -# #. The output produced by ``mean_median_imputer`` is given to the ``univariate_selection`` task -# #. The dataframe generated by ``univariate_selection`` is the workflow output -wb = Workflow(name="sqlite_datacleaning.workflow.fe_wf") -wb.add_workflow_input("imputation_method", str) -wb.add_workflow_input("limit", int) -wf_in = wb.add_workflow_input("num_features", int) - -sql_task = SQLite3Task( - name="sqlite3.horse_colic", - query_template="select * from data limit {{ .inputs.limit }}", - inputs=kwtypes(limit=int), - output_schema_type=FlyteSchema, - task_config=SQLite3Config( - uri="https://cdn.discordapp.com/attachments/545481172399030272/852144760273502248/horse_colic.db.zip", - compressed=True, - ), -) - -node_t1 = wb.add_entity( - sql_task, - limit=wb.inputs["limit"], -) -node_t2 = wb.add_entity( - mean_median_imputer, - dataframe=node_t1.outputs["results"], - imputation_method=wb.inputs["imputation_method"], -) -node_t3 = wb.add_entity( - univariate_selection, - dataframe=node_t2.outputs["o0"], - split_mask=23, - num_features=wf_in, -) -wb.add_workflow_output( - "output_from_t3", node_t3.outputs["o0"], python_type=pd.DataFrame -) - -DEFAULT_INPUTS = {"limit": 100, "imputation_method": "mean", "num_features": 15} - -sqlite_dataclean_lp = LaunchPlan.get_or_create( - workflow=wb, - name="sqlite_datacleaning", - default_inputs=DEFAULT_INPUTS, - schedule=CronSchedule("0 10 * * ? *"), -) - -if __name__ == "__main__": - print( - wb( - limit=100, - imputation_method="mean", - num_features=15, - ) - ) diff --git a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/requirements.in b/cookbook/case_studies/feature_engineering/sqlite_datacleaning/requirements.in deleted file mode 100644 index 444439d7d..000000000 --- a/cookbook/case_studies/feature_engineering/sqlite_datacleaning/requirements.in +++ /dev/null @@ -1,3 +0,0 @@ -flytekit>=0.19.1 -scikit-learn -numpy diff --git a/cookbook/docs/conf.py b/cookbook/docs/conf.py index 32b24fcf3..8d66b8bfe 100644 --- a/cookbook/docs/conf.py +++ b/cookbook/docs/conf.py @@ -120,8 +120,8 @@ class CustomSorter(FileNameSortKey): "diabetes.py", "house_price_predictor.py", "multiregion_house_price_predictor.py", - "datacleaning_tasks.py", - "datacleaning_workflow.py", + "feature_eng_tasks.py", + "feast_wf.py", ] """ Take a look at the code for the default sorter included in the sphinx_gallery to see how this works. @@ -231,7 +231,7 @@ def __call__(self, filename): "../core/type_system", "../case_studies/ml_training/pima_diabetes", "../case_studies/ml_training/house_price_prediction", - "../case_studies/feature_engineering/sqlite_datacleaning", + "../case_studies/feature_engineering/feast_integration", "../testing", "../core/containerization", "../deployment/workflow", @@ -260,7 +260,7 @@ def __call__(self, filename): "auto/core/type_system", "auto/case_studies/ml_training/pima_diabetes", "auto/case_studies/ml_training/house_price_prediction", - "auto/case_studies/feature_engineering/sqlite_datacleaning", + "auto/case_studies/feature_engineering/feast_integration", "auto/testing", "auto/core/containerization", "auto/deployment/workflow", diff --git a/cookbook/docs/feature_engineering.rst b/cookbook/docs/feature_engineering.rst index f4936cb2a..b5a28de38 100644 --- a/cookbook/docs/feature_engineering.rst +++ b/cookbook/docs/feature_engineering.rst @@ -7,16 +7,16 @@ Feature Engineering .. panels:: :header: text-center - .. link-button:: auto/case_studies/feature_engineering/sqlite_datacleaning/index + .. link-button:: auto/case_studies/feature_engineering/feast_integration/index :type: ref - :text: Data Cleaning + :text: Feast Integration :classes: btn-block stretched-link ^^^^^^^^^^^^ - Perform data imputation and univariate feature selection on SQLite3 dataset + Flyte & Feast can help users take their models and features from prototyping all the way to production cost-effectively and efficiently. .. toctree:: :maxdepth: -1 :caption: Contents :hidden: - auto/case_studies/feature_engineering/sqlite_datacleaning/index + auto/case_studies/feature_engineering/feast_integration/index diff --git a/cookbook/flyte_tests_manifest.json b/cookbook/flyte_tests_manifest.json index 8a29be17f..ccf83a8f8 100644 --- a/cookbook/flyte_tests_manifest.json +++ b/cookbook/flyte_tests_manifest.json @@ -86,5 +86,13 @@ "exit_success": true, "exit_message": "" } - }] + },{ + "name": "case-studies-feast", + "priority": "P2", + "path": "case_studies/feature_engineering/feast_integration", + "exitCondition": { + "exit_success": true, + "exit_message": "" + } +}]