diff --git a/jobs/ml-ops/README.md b/jobs/ml-ops/README.md index 8aca67a..5503dd2 100644 --- a/jobs/ml-ops/README.md +++ b/jobs/ml-ops/README.md @@ -37,14 +37,22 @@ Set your Scaleway access key, secret key and project ID in environment variables export TF_VAR_access_key= export TF_VAR_secret_key= export TF_VAR_project_id= # you can create a separate project for this example +``` + +By default, both jobs and container trigger in the example run regularly on a schedule. The default values for these schedules are configured in `jobs/ml-ops/terraform/variables.tf`, and can be overridden using Terraform variables, e.g. `export TF_VAR_data_fetch_cron_schedule="0 10 * * *"`. +Then deploy MLOps infrastructure using the following: + +```console cd terraform terraform init terraform plan terraform apply ``` -### Step 2. Run the data and training Jobs +### Step 2. Optional: trigger jobs manually + +The pipeline is automatic, all jobs will be run at their respective scheduled time. This step can be ignored unless for debugging or test purposes. To run the jobs for the data and training, we can use the Scaleway CLI: @@ -60,12 +68,17 @@ You can also trigger the jobs from the [Jobs section](https://console.scaleway.c ### Step 3. Use the inference API +Load model with the latest version using: + ``` cd terraform export INFERENCE_URL=$(terraform output raw endpoint) +curl -X POST ${INFERENCE_URL} +``` -curl -X POST ${INFERENCE_URL}/load +Then post data to infer the class: +``` curl -X POST \ -H "Content-Type: application/json" \ -d @../inference/example.json diff --git a/jobs/ml-ops/data/main.py b/jobs/ml-ops/data/main.py index fa7b9e2..00fed0c 100644 --- a/jobs/ml-ops/data/main.py +++ b/jobs/ml-ops/data/main.py @@ -1,8 +1,9 @@ -import boto3 import os import urllib.request import zipfile +import boto3 + DATA_DIR = "dataset" ZIP_URL = "http://archive.ics.uci.edu/static/public/222/bank+marketing.zip" diff --git a/jobs/ml-ops/inference/data.py b/jobs/ml-ops/inference/data.py index d570cb8..8853b88 100644 --- a/jobs/ml-ops/inference/data.py +++ b/jobs/ml-ops/inference/data.py @@ -1,5 +1,5 @@ -import pandas as pd import numpy as np +import pandas as pd from pydantic import BaseModel diff --git a/jobs/ml-ops/inference/loader.py b/jobs/ml-ops/inference/loader.py new file mode 100644 index 0000000..856df48 --- /dev/null +++ b/jobs/ml-ops/inference/loader.py @@ -0,0 +1,60 @@ +import os +import pickle + +import boto3 + + +class ClassifierLoader: + _classifier = None + _classifier_version = "" + + @classmethod + def load(cls, force=False): + if force or cls._classifier is None: + access_key = os.environ["ACCESS_KEY"] + secret_key = os.environ["SECRET_KEY"] + region_name = os.environ["REGION"] + + bucket_name = os.environ["S3_BUCKET_NAME"] + s3_url = os.environ["S3_URL"] + + s3 = boto3.client( + "s3", + region_name=region_name, + endpoint_url=s3_url, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + ) + + # get model file with the latest version + bucket_objects = s3.list_objects(Bucket=bucket_name) + get_last_modified = lambda object: int( + object["LastModified"].strftime("%s") + ) + model_objects = [ + model_object + for model_object in bucket_objects["Contents"] + if "classifier" in model_object["Key"] + ] + latest_model_file = [ + object["Key"] for object in sorted(model_objects, key=get_last_modified) + ][0] + + s3.download_file(bucket_name, latest_model_file, latest_model_file) + + with open(latest_model_file, "rb") as fh: + cls._classifier = pickle.load(fh) + cls._classifier_version = latest_model_file[11:-4] + + print( + "Successfully loaded model file: {latest_model_file}".format( + latest_model_file=latest_model_file + ), + flush=True, + ) + + return cls._classifier + + @classmethod + def model_version(cls): + return cls._classifier_version diff --git a/jobs/ml-ops/inference/main.py b/jobs/ml-ops/inference/main.py index 579d340..32f9fbc 100644 --- a/jobs/ml-ops/inference/main.py +++ b/jobs/ml-ops/inference/main.py @@ -1,54 +1,38 @@ +import data from fastapi import FastAPI +from loader import ClassifierLoader from sklearn.ensemble import RandomForestClassifier -from sklearn.metrics import RocCurveDisplay -import pickle -import boto3 -import pandas -import os - -import data classifier = RandomForestClassifier() app = FastAPI() -MODEL_FILE = "classifier.pkl" - - -class ClassifierLoader: - _classifier = None - @classmethod - def load(cls, force=False): - if force or cls._classifier is None: - access_key = os.environ["ACCESS_KEY"] - secret_key = os.environ["SECRET_KEY"] - region_name = os.environ["REGION"] +@app.get("/") +def hello(): + """Get Model Version""" - bucket_name = os.environ["S3_BUCKET_NAME"] - s3_url = os.environ["S3_URL"] + model_version = ClassifierLoader.model_version() - s3 = boto3.client( - "s3", - region_name=region_name, - endpoint_url=s3_url, - aws_access_key_id=access_key, - aws_secret_access_key=secret_key, - ) + if model_version == "": + return { + "message": "Hello, this is the inference server! No classifier loaded in memory." + } - s3.download_file(bucket_name, MODEL_FILE, MODEL_FILE) + return { + "message": "Hello, this is the inference server! Serving classifier with version {model_version}".format( + model_version=model_version + ) + } - with open(MODEL_FILE, "rb") as fh: - cls._classifier = pickle.load(fh) - return cls._classifier - - -@app.post("/load") +# this endpoint is used by cron trigger to load model from S3 +@app.post("/") def load(): """Reloads classifier from model registry bucket""" ClassifierLoader.load(force=True) + return {"message": "model loaded successfully"} @@ -59,8 +43,13 @@ def classify(profile: data.ClientProfile): cleaned_data = data.clean_profile(profile) data_point_processed = data.transform_data(cleaned_data) - # Lazy-loads classifer from S3 + # Lazy-loads classifier from S3 classifier = ClassifierLoader.load() prediction = classifier.predict(data_point_processed) - return {"predicted_class": int(prediction)} + response = "This client is likely to respond positively to a cold call" + + if int(prediction) == 0: + response = "This client is likely to respond negatively to a cold call" + + return {"prediction": response} diff --git a/jobs/ml-ops/terraform/container.tf b/jobs/ml-ops/terraform/container.tf index 5329163..6e23151 100644 --- a/jobs/ml-ops/terraform/container.tf +++ b/jobs/ml-ops/terraform/container.tf @@ -12,7 +12,7 @@ resource "scaleway_container" "inference" { cpu_limit = 2000 memory_limit = 2048 min_scale = 1 - max_scale = 5 + max_scale = 1 environment_variables = { "S3_BUCKET_NAME" = scaleway_object_bucket.main.name "S3_URL" = var.s3_url @@ -24,3 +24,9 @@ resource "scaleway_container" "inference" { } deploy = true } + +resource scaleway_container_cron "inference_cron" { + container_id = scaleway_container.inference.id + schedule = var.inference_cron_schedule + args = jsonencode({}) +} \ No newline at end of file diff --git a/jobs/ml-ops/terraform/jobs.tf b/jobs/ml-ops/terraform/jobs.tf index d52b558..876f8c8 100644 --- a/jobs/ml-ops/terraform/jobs.tf +++ b/jobs/ml-ops/terraform/jobs.tf @@ -4,7 +4,10 @@ resource "scaleway_job_definition" "fetch_data" { memory_limit = 1024 image_uri = docker_image.data.name timeout = "10m" - + cron { + schedule = var.data_fetch_cron_schedule + timezone = "Europe/Paris" + } env = { "S3_BUCKET_NAME" : scaleway_object_bucket.main.name, "S3_URL" : var.s3_url, @@ -20,7 +23,10 @@ resource "scaleway_job_definition" "training" { memory_limit = 4096 image_uri = docker_image.training.name timeout = "10m" - + cron { + schedule = var.training_cron_schedule + timezone = "Europe/Paris" + } env = { "S3_BUCKET_NAME" : scaleway_object_bucket.main.name, "S3_URL" : var.s3_url, diff --git a/jobs/ml-ops/terraform/variables.tf b/jobs/ml-ops/terraform/variables.tf index 84355ba..bbe8b76 100644 --- a/jobs/ml-ops/terraform/variables.tf +++ b/jobs/ml-ops/terraform/variables.tf @@ -25,15 +25,17 @@ variable "s3_url" { default = "https://s3.fr-par.scw.cloud" } -variable "data_file" { - type = string - description = "name data file in data store" - default = "bank_telemarketing.csv" +variable "data_fetch_cron_schedule" { + type = string + default = "0 */10 * * *" } -variable "model_object" { - type = string - description = "name of model object stored in model registry" - default = "classifier.pkl" +variable "training_cron_schedule" { + type = string + default = "0 */11 * * *" } +variable "inference_cron_schedule" { + type = string + default = "0 */12 * * *" +} diff --git a/jobs/ml-ops/terraform/versions.tf b/jobs/ml-ops/terraform/versions.tf index b4ddc4b..b186193 100644 --- a/jobs/ml-ops/terraform/versions.tf +++ b/jobs/ml-ops/terraform/versions.tf @@ -2,6 +2,7 @@ terraform { required_providers { scaleway = { source = "scaleway/scaleway" + version = ">= 2.39" } docker = { source = "kreuzwerker/docker" diff --git a/jobs/ml-ops/training/main.py b/jobs/ml-ops/training/main.py index 7b2e87c..e41a0c3 100644 --- a/jobs/ml-ops/training/main.py +++ b/jobs/ml-ops/training/main.py @@ -1,17 +1,19 @@ -import pandas as pd import os import pickle +from datetime import datetime + import boto3 +import pandas as pd import training as ml -from sklearn.metrics import RocCurveDisplay -from sklearn.metrics import ConfusionMatrixDisplay +from sklearn.metrics import ConfusionMatrixDisplay, RocCurveDisplay -DATA_FILE_NAME = "bank-additional-full.csv" +VERSION = datetime.now().strftime("%Y%m%d%H%M") -MODEL_FILE = "classifier.pkl" -PERF_FILE = "performance.pkl" -ROC_AUC_FILE = "roc_auc.png" -CONFUSION_MATRIX_FILE = "confusion_matrix.png" +DATA_FILE_NAME = "bank-additional-full.csv" +MODEL_FILE = "classifier_" + VERSION + ".pkl" +PERF_FILE = "performance_" + VERSION + ".pkl" +ROC_AUC_FILE = "roc_auc_" + VERSION + ".png" +CONFUSION_MATRIX_FILE = "confusion_matrix_" + VERSION + ".png" def main() -> int: diff --git a/jobs/ml-ops/training/training.py b/jobs/ml-ops/training/training.py index 478a47a..4fc7915 100644 --- a/jobs/ml-ops/training/training.py +++ b/jobs/ml-ops/training/training.py @@ -1,10 +1,9 @@ -import pandas as pd import numpy as np +import pandas as pd from imblearn.over_sampling import SMOTE -from sklearn.model_selection import train_test_split -from sklearn.metrics import accuracy_score, precision_score, recall_score, log_loss -from sklearn.model_selection import RandomizedSearchCV from sklearn.ensemble import RandomForestClassifier +from sklearn.metrics import accuracy_score, log_loss, precision_score, recall_score +from sklearn.model_selection import RandomizedSearchCV, train_test_split def transform_data(data: pd.DataFrame) -> pd.DataFrame: