Skip to content

Commit

Permalink
Sample XGBoost workflow that uses an open source dataset (#3)
Browse files Browse the repository at this point in the history
* Sample XGBoost workflow that uses an open source dataset

 - This is a sample workflow that implements a simple XGBoost trainer
for Diabetes dataset
- The code is similar to https://machinelearningmastery.com/develop-first-xgboost-model-python-scikit-learn/
- Dataset is available here - https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv
- Information of the dataset is available here
https://github.com/jbrownlee/Datasets/blob/master/pima-indians-diabetes.names

* updated requirements

* Updated and compiling

* Updated description etc

* sandbox config updated

* updated with unit test

* updated

* Unit test working

* update

* updates

* readme

* using direct api of schema to set pandas data frames

* With tests running latest code

* bump flytekit

* Dockerfile updated

* updated memory

* requirements as string

* 200mi for all tasks
  • Loading branch information
Ketan Umare authored Nov 24, 2019
1 parent 82f8a1e commit a6ec170
Show file tree
Hide file tree
Showing 13 changed files with 301 additions and 5 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__pycache__/
.pyc
.idea
2 changes: 1 addition & 1 deletion python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ COPY . .

# Set this environment variable. It will be used by the flytekit SDK during the registration/compilation steps
ARG IMAGE_TAG
ENV FLYTE_INTERNAL_IMAGE "docker.io/lyft/flytesnacks:$IMAGE_TAG"
ENV FLYTE_INTERNAL_IMAGE "docker.io/lyft/$IMAGE_TAG"

# Enable the virtualenv for this image. Note this relies on the VENV variable we've set in this image.
ENTRYPOINT ["/opt/flytekit_venv"]
12 changes: 10 additions & 2 deletions python/README.rst
Original file line number Diff line number Diff line change
@@ -1,2 +1,10 @@
# flytesnacks
A repository of Flyte example workflows
Typical Python Workflows
========================

This section provides some canonical examples of how to author tasks and workflows that are entirely written in python and do not need any additional dependencies to be installed from flytekit. The
aim is to provide canonical examples of various mechanics available in Flyte, and answer questions like
- How to write a task - illustrated with an example?
- How to write a workflow - illustrated with an example?
- How to accept inputs and produce outputs from a task and a workflow?
- How to use complex datatypes like - Schemas, Blobs and CSVs?

37 changes: 37 additions & 0 deletions python/multi_step_linear/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
Example of a MultiStep Linear Workflow
======================================

Introduction:
-------------
The workflow is a simple multistep xgboost trainer. It is not required to split the training of xgboost into multiple steps, but there are pros and cons of doing so.

Pros:
- Each task/step is standalone and can be used for various other pipelines
- Each step can be unit tested
- Data splitting, cleaning etc can be done using a more scalable system like Spark
- State is always saved between steps, so it is cheap to recover from failures, especially if caching=True
- Visibility is high

Cons:
- Performance for small datasets is a concern. The reason is, the intermediate data is durably stored and the state recorded. Each step is essnetially a checkpoint

Steps of the Pipeline
----------------------
- Step1: Gather data and split it into training and validation sets
- Step2: Fit the actual model
- Step3: Run a set of predictions on the validation set. The function is designed to be more generic, it can be used to simply predict given a set of observations (dataset)
- Step4: Calculate the accuracy score for the predictions

The workflow is designed for a dataset as defined in
https://github.com/jbrownlee/Datasets/blob/master/pima-indians-diabetes.names

An example dataset is available at
https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv

Important things to note:
-------------------------
- Usage of Schema Type. Schema type allows passing a type safe row vector from one task to task. The row vector is also directly loaded into a pandas dataframe
We could use an unstructured Schema (By simply omiting the column types). this will allow any data to be accepted by the train algorithm.

- We pass the file as a CSV input. The file is auto-loaded.

194 changes: 194 additions & 0 deletions python/multi_step_linear/diabetes_xgboost.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import joblib
import pandas as pd
from flytekit.sdk.tasks import python_task, outputs, inputs
from flytekit.sdk.types import Types
from flytekit.sdk.workflow import workflow_class, Output, Input
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier

# Since we are working with a specific dataset, we will create a strictly typed schema for the dataset.
# If we wanted a generic data splitter we could use a Generic schema without any column type and name information
# Example file: https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv
# CSV Columns
# 1. Number of times pregnant
# 2. Plasma glucose concentration a 2 hours in an oral glucose tolerance test
# 3. Diastolic blood pressure (mm Hg)
# 4. Triceps skin fold thickness (mm)
# 5. 2-Hour serum insulin (mu U/ml)
# 6. Body mass index (weight in kg/(height in m)^2)
# 7. Diabetes pedigree function
# 8. Age (years)
# 9. Class variable (0 or 1)
# Example Row: 6,148,72,35,0,33.6,0.627,50,1
TYPED_COLUMNS = [
('#preg', Types.Integer),
('pgc_2h', Types.Integer),
('diastolic_bp', Types.Integer),
('tricep_skin_fold_mm', Types.Integer),
('serum_insulin_2h', Types.Integer),
('bmi', Types.Float),
('diabetes_pedigree', Types.Float),
('age', Types.Integer),
('class', Types.Integer),
]
# the input dataset schema
DATASET_SCHEMA = Types.Schema(TYPED_COLUMNS)
# the first 8 columns are features
FEATURES_SCHEMA = Types.Schema(TYPED_COLUMNS[:8])
# the last column is the class
CLASSES_SCHEMA = Types.Schema([TYPED_COLUMNS[-1]])


class XGBoostModelHyperparams(object):
"""
These are the xgboost hyper parameters available in scikit-learn library.
"""

def __init__(self, max_depth=3, learning_rate=0.1, n_estimators=100,
objective="binary:logistic", booster='gbtree',
n_jobs=1, **kwargs):
self.n_jobs = int(n_jobs)
self.booster = booster
self.objective = objective
self.n_estimators = int(n_estimators)
self.learning_rate = learning_rate
self.max_depth = int(max_depth)

def to_dict(self):
return self.__dict__

@classmethod
def from_dict(cls, d):
return cls(**d)


# load data
# Example file: https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv
@inputs(dataset=Types.CSV, seed=Types.Integer, test_split_ratio=Types.Float)
@outputs(x_train=FEATURES_SCHEMA, x_test=FEATURES_SCHEMA, y_train=CLASSES_SCHEMA, y_test=CLASSES_SCHEMA)
@python_task(cache_version='1.0', cache=True, memory_limit="200Mi")
def get_traintest_splitdatabase(ctx, dataset, seed, test_split_ratio, x_train, x_test, y_train, y_test):
"""
Retrieves the training dataset from the given blob location and then splits it using the split ratio and returns the result
This splitter is only for the dataset that has the format as specified in the example csv. The last column is assumed to be
the class and all other columns 0-8 the features.
The data is returned as a schema, which gets converted to a parquet file in the back.
"""
dataset.download()
column_names = [k for k in DATASET_SCHEMA.columns.keys()]
df = pd.read_csv(dataset.local_path, names=column_names)

# Select all features
x = df[column_names[:8]]
# Select only the classes
y = df[[column_names[-1]]]

# split data into train and test sets
_x_train, _x_test, _y_train, _y_test = train_test_split(
x, y, test_size=test_split_ratio, random_state=seed)

# TODO also add support for Spark dataframe, but make the pyspark dependency optional
x_train.set(_x_train)
x_test.set(_x_test)
y_train.set(_y_train)
y_test.set(_y_test)


@inputs(x=FEATURES_SCHEMA, y=CLASSES_SCHEMA, hyperparams=Types.Generic) # TODO support arbitrary jsonifiable classes
@outputs(model=Types.Blob) # TODO: Support for subtype format=".joblib.dat"))
@python_task(cache_version='1.0', cache=True, memory_limit="200Mi")
def fit(ctx, x, y, hyperparams, model):
"""
This function takes the given input features and their corresponding classes to train a XGBClassifier.
NOTE: We have simplified the number of hyper parameters we take for demo purposes
"""
with x as r:
x_df = r.read()
with y as r:
y_df = r.read()

hp = XGBoostModelHyperparams.from_dict(hyperparams)
# fit model no training data
m = XGBClassifier(n_jobs=hp.n_jobs, max_depth=hp.max_depth, n_estimators=hp.n_estimators, booster=hp.booster,
objective=hp.objective, learning_rate=hp.learning_rate)
m.fit(x_df, y_df)

# TODO model Blob should be a file like object
fname = "model.joblib.dat"
joblib.dump(m, fname)
model.set(fname)


@inputs(x=FEATURES_SCHEMA, model_ser=Types.Blob) # TODO: format=".joblib.dat"))
@outputs(predictions=CLASSES_SCHEMA)
@python_task(cache_version='1.0', cache=True, memory_limit="200Mi")
def predict(ctx, x, model_ser, predictions):
"""
Given a any trained model, serialized using joblib (this method can be shared!) and features, this method returns
predictions.
"""
model_ser.download()
model = joblib.load(model_ser.local_path)
# make predictions for test data
with x as r:
x_df = r.read()
y_pred = model.predict(x_df)

col = [k for k in CLASSES_SCHEMA.columns.keys()]
y_pred_df = pd.DataFrame(y_pred, columns=col, dtype="int64")
y_pred_df.round(0)
predictions.set(y_pred_df)


@inputs(predictions=CLASSES_SCHEMA, y=CLASSES_SCHEMA)
@outputs(accuracy=Types.Float)
@python_task(cache_version='1.0', cache=True, memory_limit="200Mi")
def metrics(ctx, predictions, y, accuracy):
"""
Compares the predictions with the actuals and returns the accuracy score.
"""
with predictions as r:
pred_df = r.read()

with y as r:
y_df = r.read()

# evaluate predictions
acc = accuracy_score(y_df, pred_df)

print("Accuracy: %.2f%%" % (acc * 100.0))
accuracy.set(float(acc))


@workflow_class
class DiabetesXGBoostModelTrainer(object):
"""
This pipeline trains an XGBoost mode for any given dataset that matches the schema as specified in
https://github.com/jbrownlee/Datasets/blob/master/pima-indians-diabetes.names.
"""

# Inputs dataset, fraction of the dataset to be split out for validations and seed to use to perform the split
dataset = Input(Types.CSV, default=Types.CSV.create_at_known_location(
"https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv"),
help="A CSV File that matches the format https://github.com/jbrownlee/Datasets/blob/master/pima-indians-diabetes.names")

test_split_ratio = Input(Types.Float, default=0.33, help="Ratio of how much should be test to Train")
seed = Input(Types.Integer, default=7, help="Seed to use for splitting.")

# the actual algorithm
split = get_traintest_splitdatabase(dataset=dataset, seed=seed, test_split_ratio=test_split_ratio)
fit_task = fit(x=split.outputs.x_train, y=split.outputs.y_train, hyperparams=XGBoostModelHyperparams(
max_depth=4,
).to_dict())
predicted = predict(model_ser=fit_task.outputs.model, x=split.outputs.x_test)
score_task = metrics(predictions=predicted.outputs.predictions, y=split.outputs.y_test)

# Outputs: joblib seralized model and accuracy of the model
model = Output(fit_task.outputs.model, sdk_type=Types.Blob)
accuracy = Output(score_task.outputs.accuracy, sdk_type=Types.Float)
5 changes: 4 additions & 1 deletion python/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
flytekit[schema]==0.1.8
flytekit[schema]==0.3.0
opencv-python==3.4.4.19
xgboost==0.90
scikit-learn==0.21.3
joblib==0.14.0
2 changes: 1 addition & 1 deletion python/sandbox.config
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[sdk]
workflow_packages=workflows
workflow_packages=single_step,multi_step_linear
python_venv=flytekit_venv

[auth]
Expand Down
12 changes: 12 additions & 0 deletions python/single_step/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Simplest Example of a Flyte Workflow
======================================

Introduction:
-------------
In this example, we take one image as an input and run canny edge detection algorithm on that image. the output is an edge highlighted image.

Note:
-----
We could have used a Types.Blob to represent the input, but we also wanted to show that this complete type-safety is optional
Users can choose to use string to represent remote objects.
For examples where we use more complex types and see how they interplay with other python libraries look at other examples, like the **DiabetesXGBoostModelTrainer**
File renamed without changes.
1 change: 1 addition & 0 deletions python/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from multi_step_linear.diabetes_xgboost import DiabetesXGBoostModelTrainer
Empty file added python/tests/__init__.py
Empty file.
Empty file.
38 changes: 38 additions & 0 deletions python/tests/multi_step_linear/test_diabetes_xgboost.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from flytekit.sdk.test_utils import flyte_test
from flytekit.sdk.types import Types

from multi_step_linear import diabetes_xgboost as dxgb


@flyte_test
def test_DiabetesXGBoostModelTrainer():
"""
This shows how each task can be unit tested and yet changed into the workflow.
TODO: Have one test to run the entire workflow end to end.
"""

dataset = Types.CSV.create_at_known_location(
"https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv")

# Test get dataset
result = dxgb.get_traintest_splitdatabase.unit_test(dataset=dataset, seed=7, test_split_ratio=0.33)
assert "x_train" in result
assert "y_train" in result
assert "x_test" in result
assert "y_test" in result

# Test fit
m = dxgb.fit.unit_test(x=result["x_train"], y=result["y_train"], hyperparams=dxgb.XGBoostModelHyperparams(max_depth=4).to_dict())

assert "model" in m

p = dxgb.predict.unit_test(x=result["x_test"], model_ser=m["model"])

assert "predictions" in p

metric = dxgb.metrics.unit_test(predictions=p["predictions"], y=result["y_test"])

assert "accuracy" in metric

print(metric["accuracy"])
assert metric["accuracy"] * 100.0 > 75.0

0 comments on commit a6ec170

Please sign in to comment.