diff --git a/assets/deploy-model-dev.yml b/assets/deploy-model-dev.yml index 3b86b03..c4e8d5a 100644 --- a/assets/deploy-model-dev.yml +++ b/assets/deploy-model-dev.yml @@ -23,7 +23,7 @@ Resources: Model: Type: "AWS::SageMaker::Model" Properties: - ModelName: !Sub ${ModelName}-dev-${TrainJobId} + ModelName: !Sub ${ModelName}-dev PrimaryContainer: Image: !Ref ImageRepoUri ModelDataUrl: !Sub s3://sagemaker-${AWS::Region}-${AWS::AccountId}/${ModelName}/${ModelName}-${TrainJobId}/output/model.tar.gz @@ -38,11 +38,11 @@ Resources: InstanceType: ml.t2.medium ModelName: !GetAtt Model.ModelName VariantName: !Sub ${ModelVariant}-${ModelName} - EndpointConfigName: !Sub ${ModelName}-dec-${TrainJobId} + EndpointConfigName: !Sub ${ModelName}-dec KmsKeyId: !Ref KmsKeyId Endpoint: Type: "AWS::SageMaker::Endpoint" Properties: - EndpointName: !Sub ${ModelName}-dev-${TrainJobId} - EndpointConfigName: !GetAtt EndpointConfig.EndpointConfigName + EndpointName: !Sub ${ModelName}-dev + EndpointConfigName: !GetAtt EndpointConfig.EndpointConfigName \ No newline at end of file diff --git a/assets/deploy-model-prd.yml b/assets/deploy-model-prd.yml index 42aee98..52e38a1 100644 --- a/assets/deploy-model-prd.yml +++ b/assets/deploy-model-prd.yml @@ -1,5 +1,6 @@ Transform: AWS::Serverless-2016-10-31 -Description: Deploy the production Amazon SageMaker Endpoint with Autoscaling, Model Monitoring Schedule and API Gateway Lambda. +Description: Deploy the production Amazon SageMaker Endpoint with Autoscaling, Model + Monitoring Schedule and API Gateway Lambda. Parameters: ImageRepoUri: Type: String @@ -23,7 +24,8 @@ Parameters: Type: Number Description: The metric alarm threshold KmsKeyId: - Description: AWS KMS key ID used to encrypt data at rest on the ML storage volume attached to endpoint config and S3 data capture. + Description: AWS KMS key ID used to encrypt data at rest on the ML storage volume + attached to endpoint config and S3 data capture. Type: String NotificationArn: Description: The arn for notification topic @@ -81,7 +83,7 @@ Resources: Model: Type: "AWS::SageMaker::Model" Properties: - ModelName: !Sub ${ModelName}-prd-${TrainJobId} + ModelName: !Sub ${ModelName}-prd PrimaryContainer: Image: !Ref ImageRepoUri ModelDataUrl: !Sub s3://sagemaker-${AWS::Region}-${AWS::AccountId}/${ModelName}/${ModelName}-${TrainJobId}/output/model.tar.gz @@ -109,13 +111,13 @@ Resources: EnableCapture: True InitialSamplingPercentage: 100 KmsKeyId: !Ref KmsKeyId - EndpointConfigName: !Sub ${ModelName}-pec-${TrainJobId} + EndpointConfigName: !Sub ${ModelName}-pec KmsKeyId: !Ref KmsKeyId Endpoint: Type: "AWS::SageMaker::Endpoint" Properties: - EndpointName: !Sub ${ModelName}-prd-${TrainJobId} + EndpointName: !Sub ${ModelName}-prd EndpointConfigName: !GetAtt EndpointConfig.EndpointConfigName ApiFunction: @@ -189,7 +191,7 @@ Resources: PreTrafficLambdaFunction: Type: AWS::Serverless::Function Properties: - FunctionName: !Sub "CodeDeployHook_mlops-${ModelName}-PreTrafficLambdaFunction" + FunctionName: !Sub "${ModelName}-PreTrafficLambdaFunction" CodeUri: ../api Handler: pre_traffic_hook.lambda_handler Runtime: python3.7 @@ -221,7 +223,7 @@ Resources: PostTrafficLambdaFunction: Type: AWS::Serverless::Function Properties: - FunctionName: !Sub "CodeDeployHook_mlops-${ModelName}-PostTrafficLambdaFunction" + FunctionName: !Sub "${ModelName}-PostTrafficLambdaFunction" CodeUri: ../api Handler: post_traffic_hook.lambda_handler Runtime: python3.7 @@ -351,7 +353,7 @@ Resources: Properties: MaxCapacity: 10 MinCapacity: 2 - ResourceId: !Sub endpoint/${ModelName}-prd-${TrainJobId}/variant/${ModelVariant}-${ModelName} + ResourceId: !Sub endpoint/${ModelName}-prd/variant/${ModelVariant}-${ModelName} RoleARN: !Sub arn:aws:iam::${AWS::AccountId}:role/MLOps ScalableDimension: sagemaker:variant:DesiredInstanceCount ServiceNamespace: sagemaker @@ -362,7 +364,7 @@ Resources: Properties: PolicyName: SageMakerVariantInvocationsPerInstance PolicyType: TargetTrackingScaling - ResourceId: !Sub endpoint/${ModelName}-prd-${TrainJobId}/variant/${ModelVariant}-${ModelName} + ResourceId: !Sub endpoint/${ModelName}-prd/variant/${ModelVariant}-${ModelName} ScalableDimension: sagemaker:variant:DesiredInstanceCount ServiceNamespace: sagemaker TargetTrackingScalingPolicyConfiguration: diff --git a/notebook/mlops.ipynb b/notebook/mlops.ipynb index 11ef268..36184e0 100644 --- a/notebook/mlops.ipynb +++ b/notebook/mlops.ipynb @@ -1 +1 @@ -{"cells": [{"cell_type": "markdown", "metadata": {}, "source": ["# Safe MLOps Deployment Pipeline\n", "\n", "\n", "## Overview\n", "\n", "In this notebook you will step through an MLOps pipeline to build, train, deploy and monitor an XGBoost regression model for predicting the expected taxi fare using the New York City Taxi [dataset](https://registry.opendata.aws/nyc-tlc-trip-records-pds/)\u21d7. This safe pipeline features a [canary deployment](https://docs.aws.amazon.com/wellarchitected/latest/machine-learning-lens/canary-deployment.html) strategy with rollback on error. You will learn how to trigger and monitor the pipeline, inspect the training workflow, use model monitor to set up alerts, and create a canary deployment.\n", "\n", "
\n", " Note: This notebook assumes prior familiarity with the basics training ML models on Amazon SageMaker. Data preparation and visualization, although present, will be kept to a minimum. If you are not familiar with the basic concepts and features of SageMaker, we recommend reading the SageMaker documentation\u21d7 and completing the workshops and samples in AWS SageMaker Examples GitHub\u21d7 and AWS Samples GitHub\u21d7. \n", "
\n", "\n", "### Contents\n", "\n", "This notebook has the following key sections:\n", "\n", "1. [Data Prep](#Data-Prep)\n", "2. [Build](#Build)\n", "3. [Train Model](#Train-Model)\n", "4. [Deploy Dev](#Deploy-Dev)\n", "5. [Deploy Prod](#Deploy-Prod)\n", "6. [Monitor](#Monitor)\n", "6. [Cleanup](#Cleanup)\n", "\n", "### Architecture\n", "\n", "The architecture diagram below shows the entire MLOps pipeline at a high level.\n", "\n", "Use the CloudFormation template provided in this repository (`pipeline.yml`) to build the demo in your own AWS account. If you are currently viewing this notebook from SageMaker in your AWS account, then you have already completed this step. CloudFormation deploys several resources:\n", " \n", "1. A customer-managed encryption key in in Amazon KMS for encrypting data and artifacts.\n", "1. A secret in Amazon Secrets Manager to securely store your GitHub Access Token.\n", "1. Several AWS IAM roles so CloudFormation, SageMaker, and other AWS services can perform actions in your AWS account, following the principle of [least privilege](https://docs.aws.amazon.com/IAM/latest/UserGuide/best-practices.html#grant-least-privilege)\u21d7.\n", "1. A messaging service in Amazon SNS to notify you when CodeDeploy has successfully deployed the API, and to receive alerts for retraining and drift detection (signing up for these notifications is optional).\n", "1. Two Amazon CloudWatch event rules: one which schedules the pipeline to run every month, and one which triggers the pipeline to run when SageMaker Model Monitor detects certain metrics.\n", "1. An Amazon SageMaker Jupyter notebook with this workshop content pre-loaded.\n", "1. An Amazon S3 bucket for storing model artifacts.\n", "1. An AWS CodePipeline instance with several pre-defined stages. \n", "\n", "Take a moment to look at all of these resources now deployed in your account. \n", "\n", "![MLOps pipeline architecture](../docs/mlops-architecture.png)\n", "\n", "In this notebook, you will work through the CodePipeline instance created by the CloudFormation template. It has several stages:\n", "\n", "1. **Source** - The pipeline is already configured with two sources. If you upload a new dataset to a specific location in the S3 data bucket, this will trigger the pipeline to run. The Git source can be GitHub, or CodeCommit if you don\u2019t supply your access token. If you commit new code to your repository, this will trigger the pipeline to run. \n", "1. **Build** - In this stage, CodeBuild configured by the build specification `model/buildspec.yml` will execute `model/run_pipeline.py` to generate AWS CloudFormation templates for creating the AWS Step Function (including AWS Lambda custom resources), and deployment templates used in the following stages based on the data sets and hyperparameters specified for this pipeline run. You will take a closer look at these files later in this notebook. \n", "1. **Train** The Step Functions workflow created in the Build stage is run in this stage. The workflow creates a baseline for the model monitor using a SageMaker processing job, and trains an XGBoost model on the taxi ride dataset using a SageMaker training job.\n", "1. **Deploy Dev** In this stage, a CloudFormation template created in the build stage (from `assets/deploy-model-dev.yml`) deploys a dev endpoint. This will allow you to run tests on the model and decide if the model is of sufficient quality to deploy into production.\n", "1. **Deploy Production** The final stage of the pipeline is the only stage which does not run automatically as soon as the previous stage is complete. It waits for a user to manually approve the model which was previously deployed to dev. As soon as the model is approved, a CloudFormation template (packaged from `assets/deploy-model-prod.yml` to include the Lambda functions saved and uploaded as ZIP files in S3) deploys the production endpoint. It configures autoscaling and enables data capture. It creates a model monitoring schedule and sets CloudWatch alarms for certain metrics. It also sets up an AWS CodeDeploy instance which deploys a set of AWS Lambda functions and an Amazon API Gateway to sit in front of the SageMaker endpoint. This stage can make use of canary deployment to safely switch from an old model to a new model."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["# Import the latest sagemaker and boto3 SDKs.\n", "import sys\n", "\n", "!{sys.executable} -m pip install --upgrade pip\n", "!{sys.executable} -m pip install -qU awscli boto3 \"sagemaker>=2.1.0<3\" tqdm\n", "!{sys.executable} -m pip install -qU \"stepfunctions==2.0.0\"\n", "!{sys.executable} -m pip show sagemaker stepfunctions"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Restart your SageMaker kernel then continue with this notebook."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["# Replace `None` with the project name when creating SageMaker Project\n", "# You can find it from the left panel in Studio\n", "\n", "PROJECT_NAME = None\n", "\n", "assert PROJECT_NAME is not None and isinstance(\n", " PROJECT_NAME, str\n", "), \"Please specify the project name as string\""]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["import boto3\n", "from IPython.core.display import HTML, display\n", "\n", "\n", "def get_provisioned_product_name(project_name):\n", " region = boto3.Session().region_name\n", " sc = boto3.client(\"servicecatalog\")\n", " products = sc.search_provisioned_products(\n", " Filters={\n", " \"SearchQuery\": [\n", " project_name,\n", " ]\n", " }\n", " )\n", " pp = products[\"ProvisionedProducts\"]\n", " if len(pp) != 1:\n", " print(\"Invalid provisioned product name. Open the link below and search manually\")\n", " display(\n", " HTML(\n", " f'Service Catalog'\n", " )\n", " )\n", " raise ValueError(\"Invalid provisioned product\")\n", "\n", " return pp[0][\"Name\"]\n", "\n", "\n", "PROVISIONED_PRODUCT_NAME = get_provisioned_product_name(PROJECT_NAME)\n", "print(\n", " f\"The associated Service Catalog Provisioned Product Name to this SagaMaker project: {PROVISIONED_PRODUCT_NAME}\"\n", ")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["In case of any errors, you can examine the Service Catalog console from the above link and find the associated provisioned product name which is something like `example-p-1v7hbpwe594n` and assigns it to `PROVISIONED_PRODUCT_NAME` manually."]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Data Prep\n", " \n", "In this section of the notebook, you will download the publicly available New York Taxi dataset in preparation for uploading it to S3.\n", "\n", "### Download Dataset\n", "\n", "First, download a sample of the New York City Taxi [dataset](https://registry.opendata.aws/nyc-tlc-trip-records-pds/)\u21d7 to this notebook instance. This dataset contains information on trips taken by taxis and for-hire vehicles in New York City, including pick-up and drop-off times and locations, fares, distance traveled, and more. "]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["!aws s3 cp 's3://nyc-tlc/trip data/green_tripdata_2018-02.csv' 'nyc-tlc.csv'"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Now load the dataset into a pandas data frame, taking care to parse the dates correctly."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["import pandas as pd\n", "\n", "parse_dates = [\"lpep_dropoff_datetime\", \"lpep_pickup_datetime\"]\n", "trip_df = pd.read_csv(\"nyc-tlc.csv\", parse_dates=parse_dates)\n", "\n", "trip_df.head()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Data manipulation\n", "\n", "Instead of the raw date and time features for pick-up and drop-off, let's use these features to calculate the total time of the trip in minutes, which will be easier to work with for our model."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["trip_df[\"duration_minutes\"] = (\n", " trip_df[\"lpep_dropoff_datetime\"] - trip_df[\"lpep_pickup_datetime\"]\n", ").dt.seconds / 60"]}, {"cell_type": "markdown", "metadata": {}, "source": ["The dataset contains a lot of columns we don't need, so let's select a sample of columns for our machine learning model. Keep only `total_amount` (fare), `duration_minutes`, `passenger_count`, and `trip_distance`."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["cols = [\"total_amount\", \"duration_minutes\", \"passenger_count\", \"trip_distance\"]\n", "data_df = trip_df[cols]\n", "print(data_df.shape)\n", "data_df.head()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Generate some quick statistics for the dataset to understand the quality."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["data_df.describe()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["The table above shows some clear outliers, e.g. -400 or 2626 as fare, or 0 passengers. There are many intelligent methods for identifying and removing outliers, but data cleaning is not the focus of this notebook, so just remove the outliers by setting some min and max values which seem more reasonable. Removing the outliers results in a final dataset of 754,671 rows."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["data_df = data_df[\n", " (data_df.total_amount > 0)\n", " & (data_df.total_amount < 200)\n", " & (data_df.duration_minutes > 0)\n", " & (data_df.duration_minutes < 120)\n", " & (data_df.trip_distance > 0)\n", " & (data_df.trip_distance < 121)\n", " & (data_df.passenger_count > 0)\n", "].dropna()\n", "print(data_df.shape)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Data visualization\n", "\n", "Since this notebook will build a regression model for the taxi data, it's a good idea to check if there is any correlation between the variables in our data. Use scatter plots on a sample of the data to compare trip distance with duration in minutes, and total amount (fare) with duration in minutes."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["import seaborn as sns\n", "\n", "sample_df = data_df.sample(1000)\n", "sns.scatterplot(data=sample_df, x=\"duration_minutes\", y=\"trip_distance\")"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["sns.scatterplot(data=sample_df, x=\"duration_minutes\", y=\"total_amount\")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["These scatter plots look fine and show at least some correlation between our variables. \n", "\n", "### Data splitting and saving\n", "\n", "We are now ready to split the dataset into train, validation, and test sets. "]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["from sklearn.model_selection import train_test_split\n", "\n", "train_df, val_df = train_test_split(data_df, test_size=0.20, random_state=42)\n", "val_df, test_df = train_test_split(val_df, test_size=0.05, random_state=42)\n", "\n", "# Reset the index for our test dataframe\n", "test_df.reset_index(inplace=True, drop=True)\n", "\n", "print(\n", " \"Size of\\n train: {},\\n val: {},\\n test: {} \".format(\n", " train_df.shape[0], val_df.shape[0], test_df.shape[0]\n", " )\n", ")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Save the train, validation, and test files as CSV locally on this notebook instance. Notice that you save the train file twice - once as the training data file and once as the baseline data file. The baseline data file will be used by [SageMaker Model Monitor](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor.html)\u21d7 to detect data drift. Data drift occurs when the statistical nature of the data that your model receives while in production drifts away from the nature of the baseline data it was trained on, which means the model begins to lose accuracy in its predictions."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["train_cols = [\"total_amount\", \"duration_minutes\", \"passenger_count\", \"trip_distance\"]\n", "train_df.to_csv(\"train.csv\", index=False, header=False)\n", "val_df.to_csv(\"validation.csv\", index=False, header=False)\n", "test_df.to_csv(\"test.csv\", index=False, header=False)\n", "\n", "# Save test and baseline with headers\n", "train_df.to_csv(\"baseline.csv\", index=False, header=True)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Now upload these CSV files to your default SageMaker S3 bucket. "]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["import sagemaker\n", "\n", "# Get the session and default bucket\n", "session = sagemaker.session.Session()\n", "bucket = session.default_bucket()\n", "\n", "# Specify data prefix and version\n", "prefix = \"nyc-tlc/v1\"\n", "\n", "s3_train_uri = session.upload_data(\"train.csv\", bucket, prefix + \"/data/training\")\n", "s3_val_uri = session.upload_data(\"validation.csv\", bucket, prefix + \"/data/validation\")\n", "s3_test_uri = session.upload_data(\"test.csv\", bucket, prefix + \"/data/test\")\n", "s3_baseline_uri = session.upload_data(\"baseline.csv\", bucket, prefix + \"/data/baseline\")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["You will use the datasets which you have prepared and saved in this section to trigger the pipeline to train and deploy a model in the next section."]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Build\n", "\n", "If you navigate to the CodePipeline instance created for this workshop, you will notice that the Source stage is initially in a `Failed` state. This happens because the dataset, which is one of the sources that can trigger the pipeline, has not yet been uploaded to the S3 location expected by the pipeline.\n", "\n", "![Failed code pipeline](../docs/pipeline_failed.png)\n", "\n", "### Trigger Build\n", "\n", "In this section, you will start a model build and deployment pipeline by packaging up the datasets you prepared in the previous section and uploading these to the S3 source location which triggers the CodePipeline instance created for this workshop. \n", "\n", "\n", "First, import some libraries and load some environment variables which you will need. These environment variables have been set through a [lifecycle configuration](https://docs.aws.amazon.com/sagemaker/latest/dg/notebook-lifecycle-config.html)\u21d7 script attached to this notebook."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["import boto3\n", "from botocore.exceptions import ClientError\n", "import os\n", "import time\n", "\n", "\n", "def get_config(provisioned_product_name):\n", " sc = boto3.client(\"servicecatalog\")\n", " outputs = sc.get_provisioned_product_outputs(ProvisionedProductName=provisioned_product_name)[\n", " \"Outputs\"\n", " ]\n", " config = {}\n", " for out in outputs:\n", " config[out[\"OutputKey\"]] = out[\"OutputValue\"]\n", " return config\n", "\n", "\n", "config = get_config(PROVISIONED_PRODUCT_NAME)\n", "region = config[\"Region\"]\n", "artifact_bucket = config[\"ArtifactBucket\"]\n", "pipeline_name = config[\"PipelineName\"]\n", "model_name = config[\"ModelName\"]\n", "workflow_pipeline_arn = config[\"WorkflowPipelineARN\"]\n", "\n", "print(\"region: {}\".format(region))\n", "print(\"artifact bucket: {}\".format(artifact_bucket))\n", "print(\"pipeline: {}\".format(pipeline_name))\n", "print(\"model name: {}\".format(model_name))\n", "print(\"workflow: {}\".format(workflow_pipeline_arn))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["From the AWS CodePipeline [documentation](https://docs.aws.amazon.com/codepipeline/latest/userguide/tutorials-simple-s3.html)\u21d7:\n", "\n", "> When Amazon S3 is the source provider for your pipeline, you may zip your source file or files into a single .zip and upload the .zip to your source bucket. You may also upload a single unzipped file; however, downstream actions that expect a .zip file will fail.\n", "\n", "To train a model, you need multiple datasets (train, validation, and test) along with a file specifying the hyperparameters. In this example, you will create one JSON file which contains the S3 dataset locations and one JSON file which contains the hyperparameter values. Then you compress both files into a zip package to be used as input for the pipeline run. "]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["from io import BytesIO\n", "import zipfile\n", "import json\n", "\n", "input_data = {\n", " \"TrainingUri\": s3_train_uri,\n", " \"ValidationUri\": s3_val_uri,\n", " \"TestUri\": s3_test_uri,\n", " \"BaselineUri\": s3_baseline_uri,\n", "}\n", "\n", "hyperparameters = {\"num_round\": 50}\n", "\n", "zip_buffer = BytesIO()\n", "with zipfile.ZipFile(zip_buffer, \"a\") as zf:\n", " zf.writestr(\"inputData.json\", json.dumps(input_data))\n", " zf.writestr(\"hyperparameters.json\", json.dumps(hyperparameters))\n", "zip_buffer.seek(0)\n", "\n", "data_source_key = \"{}/data-source.zip\".format(pipeline_name)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Now upload the zip package to your artifact S3 bucket - this action will trigger the pipeline to train and deploy a model."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["s3 = boto3.client(\"s3\")\n", "s3.put_object(Bucket=artifact_bucket, Key=data_source_key, Body=bytearray(zip_buffer.read()))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Click the link below to open the AWS console at the Code Pipeline if you don't have it open in another tab.\n", "\n", "
\n", " Tip: You may need to wait a minute to see the DataSource stage turn green. The page will refresh automatically.\n", "
\n", "\n", "![Source Green](../docs/datasource-after.png)"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["from IPython.core.display import HTML\n", "\n", "HTML(\n", " 'Code Pipeline'.format(\n", " region, pipeline_name\n", " )\n", ")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Inspect Build Logs\n", "\n", "Once the build stage is running, you will see the AWS CodeBuild job turn blue with a status of **In progress**.\n", "\n", "![Failed code pipeline](../docs/codebuild-inprogress.png)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["You can click on the **Details** link displayed in the CodePipeline UI or click the link below to jump directly to the CodeBuild logs.\n", "\n", "
\n", " Tip: You may need to wait a few seconds for the pipeline to transition into the active (blue) state and for the build to start.\n", "
"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["codepipeline = boto3.client(\"codepipeline\")\n", "\n", "\n", "def get_pipeline_stage(pipeline_name, stage_name):\n", " response = codepipeline.get_pipeline_state(name=pipeline_name)\n", " for stage in response[\"stageStates\"]:\n", " if stage[\"stageName\"] == stage_name:\n", " return stage\n", "\n", "\n", "# Get last execution id\n", "build_stage = get_pipeline_stage(pipeline_name, \"Build\")\n", "if not \"latestExecution\" in build_stage:\n", " raise (Exception(\"Please wait. Build not started\"))\n", "\n", "build_url = build_stage[\"actionStates\"][0][\"latestExecution\"][\"externalExecutionUrl\"]\n", "\n", "# Out a link to the code build logs\n", "HTML('Code Build Logs'.format(build_url))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["The AWS CodeBuild process is responsible for creating a number of AWS CloudFormation templates which we will explore in more detail in the next section. Two of these templates are used to set up the **Train** step by creating the AWS Step Functions worklow and the custom AWS Lambda functions used within this workflow."]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Train Model\n", "\n", "### Inspect Training Job\n", "\n", "Wait until the pipeline has started running the Train step (see screenshot) before continuing with the next cells in this notebook. \n", "\n", "![Training in progress](../docs/train-in-progress.png)\n", "\n", "When the pipeline has started running the train step, you can click on the **Details** link displayed in the CodePipeline UI (see screenshot above) to view the Step Functions workflow which is running the training job. \n", "\n", "Alternatively, you can click on the Workflow link from the cell output below once it's available."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["from stepfunctions.workflow import Workflow\n", "\n", "while True:\n", " try:\n", " workflow = Workflow.attach(workflow_pipeline_arn)\n", " break\n", " except ClientError as e:\n", " print(e.response[\"Error\"][\"Message\"])\n", " time.sleep(10)\n", "\n", "workflow"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Review Build Script\n", "\n", "While you wait for the training job to complete, let's take a look at the `run.py` code which was used by the AWS CodeBuild process.\n", "\n", "This script takes all of the input parameters, including the dataset locations and hyperparameters which you saved to JSON files earlier in this notebook, and uses them to generate the templates which the pipeline needs to run the training job. It *does not* create the actual Step Functions instance - it only generates the templates which define the Step Functions workflow, as well as the CloudFormation input templates which CodePipeline uses to instantiate the Step Functions instance.\n", "\n", "Step-by-step, the script does the following:\n", "\n", "1. It collects all the input parameters it needs to generate the templates. This includes information about the environment container needed to run the training job, the input and output data locations, IAM roles needed by various components, encryption keys, and more. It then sets up some basic parameters like the AWS region and the function names.\n", "1. If the input parameters specify an environment container stored in ECR, it fetches that container. Otherwise, it fetches the URI of the AWS managed environment container needed for the training job.\n", "1. It reads the input data JSON file which you generated earlier in this notebook (and which was included in the zip source for the pipeline), thereby fetching the locations of the train, validation, and baseline data files. Then it formats more parameters which will be needed later in the script, including version IDs and output data locations.\n", "1. It reads the hyperparameter JSON file which you generated earlier in this notebook.\n", "1. It defines the Step Functions workflow, starting with the input schema, followed by each step of the workflow (i.e. Create Experiment, Baseline Job, Training Job), and finally combines those steps into a workflow graph. \n", "1. The workflow graph is saved to file, along with a file containing all of the input parameters saved according to the schema defined in the workflow.\n", "1. It saves parameters to file which will be used by CloudFormation to instantiate the Step Functions workflow."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["!pygmentize ../model/run_pipeline.py"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Customize Workflow (Optional)\n", "\n", "If you are interested in customising the workflow used in the Build Script, store the `input_data` to be used within the local [workflow.ipynb](workflow.ipynb) notebook. The workflow notebook can be used to experiment with the Step Functions workflow and training job definitions for your model."]}, {"cell_type": "code", "execution_count": null, "metadata": {"scrolled": true}, "outputs": [], "source": ["%store input_data PROVISIONED_PRODUCT_NAME"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Training Analytics\n", "\n", "Once the training and baseline jobs are complete (meaning they are displayed in a green color in the Step Functions workflow, this takes around 5 minutes), you can inspect the experiment metrics. The code below will display all experiments in a table. Note that the baseline processing job won't have RMSE metrics - it calculates metrics based on the training data, but does not train a machine learning model. \n", "\n", "You will [explore the baseline](#Explore-Baseline) results later in this notebook. "]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["from sagemaker import analytics\n", "\n", "experiment_name = \"mlops-{}\".format(model_name)\n", "model_analytics = analytics.ExperimentAnalytics(experiment_name=experiment_name)\n", "analytics_df = model_analytics.dataframe()\n", "\n", "if analytics_df.shape[0] == 0:\n", " raise (Exception(\"Please wait. No training or baseline jobs\"))\n", "\n", "pd.set_option(\"display.max_colwidth\", 100) # Increase column width to show full copmontent name\n", "cols = [\n", " \"TrialComponentName\",\n", " \"DisplayName\",\n", " \"SageMaker.InstanceType\",\n", " \"train:rmse - Last\",\n", " \"validation:rmse - Last\",\n", "] # return the last rmse for training and validation\n", "analytics_df[analytics_df.columns & cols].head(2)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Deploy Dev\n", "\n", "### Test Dev Deployment\n", "\n", "When the pipeline has finished training a model, it automatically moves to the next step, where the model is deployed as a SageMaker Endpoint. This endpoint is part of your dev deployment, therefore, in this section, you will run some tests on the endpoint to decide if you want to deploy this model into production.\n", "\n", "First, run the cell below to fetch the name of the SageMaker Endpoint."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["codepipeline = boto3.client(\"codepipeline\")\n", "\n", "deploy_dev = get_pipeline_stage(pipeline_name, \"DeployDev\")\n", "if not \"latestExecution\" in deploy_dev:\n", " raise (Exception(\"Please wait. Deploy dev not started\"))\n", "\n", "execution_id = deploy_dev[\"latestExecution\"][\"pipelineExecutionId\"]\n", "dev_endpoint_name = \"mlops-{}-dev-{}\".format(model_name, execution_id)\n", "\n", "print(\"endpoint name: {}\".format(dev_endpoint_name))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["If you moved through the previous section very quickly, you will need to wait until the dev endpoint has been successfully deployed and the pipeline is waiting for approval to deploy to production (see screenshot). It can take up to 10 minutes for SageMaker to create an endpoint.\n", "\n", "![Deploying dev endpoint in code pipeline](../docs/dev-deploy-ready.png)\n", "\n", "Alternatively, run the code below to check the status of your endpoint. Wait until the status of the endpoint is 'InService'."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["sm = boto3.client(\"sagemaker\")\n", "\n", "while True:\n", " try:\n", " response = sm.describe_endpoint(EndpointName=dev_endpoint_name)\n", " print(\"Endpoint status: {}\".format(response[\"EndpointStatus\"]))\n", " if response[\"EndpointStatus\"] == \"InService\":\n", " break\n", " except ClientError as e:\n", " print(e.response[\"Error\"][\"Message\"])\n", " time.sleep(10)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Now that your endpoint is ready, let's write some code to run the test data (which you split off from the dataset and saved to file at the start of this notebook) through the endpoint for inference. The code below supports both v1 and v2 of the SageMaker SDK, but we recommend using v2 of the SDK in all of your future projects."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["import numpy as np\n", "from tqdm import tqdm\n", "\n", "from sagemaker.predictor import Predictor\n", "from sagemaker.serializers import CSVSerializer\n", "\n", "\n", "def get_predictor(endpoint_name):\n", " xgb_predictor = Predictor(endpoint_name)\n", " xgb_predictor.serializer = CSVSerializer()\n", " return xgb_predictor\n", "\n", "\n", "def predict(predictor, data, rows=500):\n", " split_array = np.array_split(data, round(data.shape[0] / float(rows)))\n", " predictions = \"\"\n", " for array in tqdm(split_array):\n", " predictions = \",\".join([predictions, predictor.predict(array).decode(\"utf-8\")])\n", " return np.fromstring(predictions[1:], sep=\",\")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Now use the `predict` function, which was defined in the code above, to run the test data through the endpoint and generate the predictions."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["dev_predictor = get_predictor(dev_endpoint_name)\n", "predictions = predict(dev_predictor, test_df[test_df.columns[1:]].values)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Next, load the predictions into a data frame, and join it with your test data. Then, calculate absolute error as the difference between the actual taxi fare and the predicted taxi fare. Display the results in a table, sorted by the highest absolute error values."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["pred_df = pd.DataFrame({\"total_amount_predictions\": predictions})\n", "pred_df = test_df.join(pred_df) # Join on all\n", "pred_df[\"error\"] = abs(pred_df[\"total_amount\"] - pred_df[\"total_amount_predictions\"])\n", "\n", "pred_df.sort_values(\"error\", ascending=False).head()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["From this table, we note that some short trip distances have large errors because the low predicted fare does not match the high actual fare. This could be the result of a generous tip which we haven't included in this dataset.\n", "\n", "You can also analyze the results by plotting the absolute error to visualize outliers. In this graph, we see that most of the outliers are cases where the model predicted a much lower fare than the actual fare. There are only a few outliers where the model predicted a higher fare than the actual fare."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["sns.scatterplot(data=pred_df, x=\"total_amount_predictions\", y=\"total_amount\", hue=\"error\")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["If you want one overall measure of quality for the model, you can calculate the root mean square error (RMSE) for the predicted fares compared to the actual fares. Compare this to the [results calculated on the validation set](#validation-results) at the end of the 'Inspect Training Job' section."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["from math import sqrt\n", "from sklearn.metrics import mean_squared_error\n", "\n", "\n", "def rmse(pred_df):\n", " return sqrt(mean_squared_error(pred_df[\"total_amount\"], pred_df[\"total_amount_predictions\"]))\n", "\n", "\n", "print(\"RMSE: {}\".format(rmse(pred_df)))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Deploy Prod\n", "\n", "### Approve Deployment to Production\n", "\n", "If you are happy with the results of the model, you can go ahead and approve the model to be deployed into production. You can do so by clicking the **Review** button in the CodePipeline UI, leaving a comment to explain why you approve this model, and clicking on **Approve**. \n", "\n", "Alternatively, you can create a Jupyter widget which (when enabled) allows you to comment and approve the model directly from this notebook. Run the cell below to see this in action."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["import ipywidgets as widgets\n", "\n", "\n", "def on_click(obj):\n", " result = {\"summary\": approval_text.value, \"status\": obj.description}\n", " response = codepipeline.put_approval_result(\n", " pipelineName=pipeline_name,\n", " stageName=\"DeployDev\",\n", " actionName=\"ApproveDeploy\",\n", " result=result,\n", " token=approval_action[\"token\"],\n", " )\n", " button_box.close()\n", " print(result)\n", "\n", "\n", "# Create the widget if we are ready for approval\n", "deploy_dev = get_pipeline_stage(pipeline_name, \"DeployDev\")\n", "if not \"latestExecution\" in deploy_dev[\"actionStates\"][-1]:\n", " raise (Exception(\"Please wait. Deploy dev not complete\"))\n", "\n", "approval_action = deploy_dev[\"actionStates\"][-1][\"latestExecution\"]\n", "if approval_action[\"status\"] == \"Succeeded\":\n", " print(\"Dev approved: {}\".format(approval_action[\"summary\"]))\n", "elif \"token\" in approval_action:\n", " approval_text = widgets.Text(placeholder=\"Optional approval message\")\n", " approve_btn = widgets.Button(description=\"Approved\", button_style=\"success\", icon=\"check\")\n", " reject_btn = widgets.Button(description=\"Rejected\", button_style=\"danger\", icon=\"close\")\n", " approve_btn.on_click(on_click)\n", " reject_btn.on_click(on_click)\n", " button_box = widgets.HBox([approval_text, approve_btn, reject_btn])\n", " display(button_box)\n", "else:\n", " raise (Exception(\"Please wait. No dev approval\"))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Test Production Deployment\n", "\n", "Within about a minute after approving the model deployment, you should see the pipeline start on the final step: deploying your model into production. In this section, you will check the deployment status and test the production endpoint after it has been deployed.\n", "\n", "![Deploy production endpoint in code pipeline](../docs/deploy-production.png)\n", "\n", "This step of the pipeline uses CloudFormation to deploy a number of resources on your behalf. In particular, it creates:\n", "\n", "1. A production-ready SageMaker Endpoint for your model, with [data capture](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-data-capture.html)\u21d7 (used by SageMaker Model Monitor) and [autoscaling](https://docs.aws.amazon.com/sagemaker/latest/dg/endpoint-auto-scaling.html)\u21d7 enabled.\n", "1. A [model monitoring schedule](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-scheduling.html)\u21d7 which outputs the results to CloudWatch metrics, along with a [CloudWatch Alarm](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html)\u21d7 which will notify you when a violation occurs. \n", "1. A CodeDeploy instance which creates a simple app by deploying API Gateway, three Lambda functions, and an alarm to notify of the success or failure of this deployment. The code for the Lambda functions can be found in `api/app.py`, `api/pre_traffic_hook.py`, and `api/post_traffic_hook.py`. These functions update the endpoint to enable data capture, format and submit incoming traffic to the SageMaker endpoint, and capture the data logs.\n", "\n", "![Components of production deployment](../docs/cloud-formation.png)\n", "\n", "Let's check how the deployment is progressing. Use the code below to fetch the execution ID of the deployment step. Then generate a table which lists the resources created by the CloudFormation stack and their creation status. You can re-run the cell after a few minutes to see how the steps are progressing."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["deploy_prd = get_pipeline_stage(pipeline_name, \"DeployPrd\")\n", "if not \"latestExecution\" in deploy_prd or not \"latestExecution\" in deploy_prd[\"actionStates\"][0]:\n", " raise (Exception(\"Please wait. Deploy prd not started\"))\n", "\n", "execution_id = deploy_prd[\"latestExecution\"][\"pipelineExecutionId\"]"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["from datetime import datetime, timedelta\n", "from dateutil.tz import tzlocal\n", "\n", "\n", "def get_event_dataframe(events):\n", " stack_cols = [\n", " \"LogicalResourceId\",\n", " \"ResourceStatus\",\n", " \"ResourceStatusReason\",\n", " \"Timestamp\",\n", " ]\n", " stack_event_df = pd.DataFrame(events)[stack_cols].fillna(\"\")\n", " stack_event_df[\"TimeAgo\"] = datetime.now(tzlocal()) - stack_event_df[\"Timestamp\"]\n", " return stack_event_df.drop(\"Timestamp\", axis=1)\n", "\n", "\n", "cfn = boto3.client(\"cloudformation\")\n", "\n", "stack_name = stack_name = \"{}-deploy-prd\".format(pipeline_name)\n", "print(\"stack name: {}\".format(stack_name))\n", "\n", "# Get latest stack events\n", "while True:\n", " try:\n", " response = cfn.describe_stack_events(StackName=stack_name)\n", " break\n", " except ClientError as e:\n", " print(e.response[\"Error\"][\"Message\"])\n", " time.sleep(10)\n", "\n", "get_event_dataframe(response[\"StackEvents\"]).head()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["The resource of most interest to us is the endpoint. This takes on average 10 minutes to deploy. In the meantime, you can take a look at the Python code used for the application. \n", "\n", "The `app.py` is the main entry point invoking the Amazon SageMaker endpoint. It returns results along with a custom header for the endpoint we invoked."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["!pygmentize ../api/app.py"]}, {"cell_type": "markdown", "metadata": {}, "source": ["The `pre_traffic_hook.py` lambda is invoked prior to deployment and confirms the endpoint has data capture enabled."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["!pygmentize ../api/pre_traffic_hook.py"]}, {"cell_type": "markdown", "metadata": {}, "source": ["The `post_traffic_hook.py` lambda is invoked to perform any final checks, in this case to verify that we have received log data from data capature."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["!pygmentize ../api/post_traffic_hook.py"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Use the code below to fetch the name of the endpoint, then run a loop to wait for the endpoint to be fully deployed. You need the status to be 'InService'."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["prd_endpoint_name = \"mlops-{}-prd-{}\".format(model_name, execution_id)\n", "print(\"prod endpoint: {}\".format(prd_endpoint_name))"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["sm = boto3.client(\"sagemaker\")\n", "\n", "while True:\n", " try:\n", " response = sm.describe_endpoint(EndpointName=prd_endpoint_name)\n", " print(\"Endpoint status: {}\".format(response[\"EndpointStatus\"]))\n", " # Wait until the endpoint is in service with data capture enabled\n", " if (\n", " response[\"EndpointStatus\"] == \"InService\"\n", " and \"DataCaptureConfig\" in response\n", " and response[\"DataCaptureConfig\"][\"EnableCapture\"]\n", " ):\n", " break\n", " except ClientError as e:\n", " print(e.response[\"Error\"][\"Message\"])\n", " time.sleep(10)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["When the endpoint status is 'InService', you can continue. Earlier in this notebook, you created some code to send data to the dev endpoint. Reuse this code now to send a sample of the test data to the production endpoint. Since data capture is enabled on this endpoint, you want to send single records at a time, so the model monitor can map these records to the baseline. \n", "\n", "You will [inspect the model monitor](#Inspect-Model-Monitor) later in this notebook. For now, just check if you can send data to the endpoint and receive predictions in return."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["prd_predictor = get_predictor(prd_endpoint_name)\n", "sample_values = test_df[test_df.columns[1:]].sample(100).values\n", "predictions = predict(prd_predictor, sample_values, rows=1)\n", "predictions"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Test REST API\n", "\n", "Although you already tested the SageMaker endpoint in the previous section, it is also a good idea to test the application created with API Gateway. \n", "\n", "![Traffic shift between endpoints](../docs/lambda-deploy-create.png)\n", "\n", "Follow the link below to open the Lambda Deployment where you can see the in-progress and completed deployments. You can also click to expand the **SAM template** to see the packaged CloudFormation template used in the deployment."]}, {"cell_type": "code", "execution_count": null, "metadata": {"scrolled": true}, "outputs": [], "source": ["HTML(\n", " 'Lambda Deployment'.format(\n", " region, model_name\n", " )\n", ")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Run the code below to confirm that the endpoint is in service. It will complete once the REST API is available."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["def get_stack_status(stack_name):\n", " response = cfn.describe_stacks(StackName=stack_name)\n", " if response[\"Stacks\"]:\n", " stack = response[\"Stacks\"][0]\n", " outputs = None\n", " if \"Outputs\" in stack:\n", " outputs = dict([(o[\"OutputKey\"], o[\"OutputValue\"]) for o in stack[\"Outputs\"]])\n", " return stack[\"StackStatus\"], outputs\n", "\n", "\n", "outputs = None\n", "while True:\n", " try:\n", " status, outputs = get_stack_status(stack_name)\n", " response = sm.describe_endpoint(EndpointName=prd_endpoint_name)\n", " print(\"Endpoint status: {}\".format(response[\"EndpointStatus\"]))\n", " if outputs:\n", " break\n", " elif status.endswith(\"FAILED\"):\n", " raise (Exception(\"Stack status: {}\".format(status)))\n", " except ClientError as e:\n", " print(e.response[\"Error\"][\"Message\"])\n", " time.sleep(10)\n", "\n", "if outputs:\n", " print(\"deployment application: {}\".format(outputs[\"DeploymentApplication\"]))\n", " print(\"rest api: {}\".format(outputs[\"RestApi\"]))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["If you are performing an update on your production deployment as a result of running [Trigger Retraining](#Trigger-Retraining) you will then be able to expand the Lambda Deployment tab to reveal the resources. Click on the **ApiFunctionAliaslive** link to see the Lambda Deployment in progress. \n", "\n", "![Traffic shift between endpoints](../docs/lambda-deploy-update.png)\n", "\n", "This page will be updated to list the deployment events. It also has a link to the Deployment Application which you can access in the output of the next cell."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["HTML(\n", " 'CodeDeploy application'.format(\n", " region, outputs[\"DeploymentApplication\"]\n", " )\n", ")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["CodeDeploy will perform a canary deployment and send 10% of the traffic to the new endpoint over a 5-minute period.\n", "\n", "![Traffic shift between endpoints](../docs/code-deploy.gif)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["We can invoke the REST API and inspect the headers being returned to see which endpoint we are hitting. You will occasionally see the cell below show a different endpoint that settles to the new version once the stack is complete. "]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["%%time\n", "\n", "from urllib import request\n", "\n", "headers = {\"Content-type\": \"text/csv\"}\n", "payload = test_df[test_df.columns[1:]].head(1).to_csv(header=False, index=False).encode(\"utf-8\")\n", "rest_api = outputs[\"RestApi\"]\n", "\n", "while True:\n", " try:\n", " resp = request.urlopen(request.Request(rest_api, data=payload, headers=headers))\n", " print(\n", " \"Response code: %d: endpoint: %s\"\n", " % (resp.getcode(), resp.getheader(\"x-sagemaker-endpoint\"))\n", " )\n", " status, outputs = get_stack_status(stack_name)\n", " if status.endswith(\"COMPLETE\"):\n", " print(\"Deployment complete\\n\")\n", " break\n", " elif status.endswith(\"FAILED\"):\n", " raise (Exception(\"Stack status: {}\".format(status)))\n", " except ClientError as e:\n", " print(e.response[\"Error\"][\"Message\"])\n", " time.sleep(10)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Monitor\n", "\n", "### Inspect Model Monitor\n", "\n", "When you prepared the datasets for model training at the start of this notebook, you saved a baseline dataset (a copy of the train dataset). Then, when you approved the model for deployment into production, the pipeline set up an SageMaker Endpoint with data capture enabled and a model monitoring schedule. In this section, you will take a closer look at the model monitor results.\n", "\n", "To start off, fetch the latest production deployment execution ID."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["deploy_prd = get_pipeline_stage(pipeline_name, \"DeployPrd\")\n", "if not \"latestExecution\" in deploy_prd:\n", " raise (Exception(\"Please wait. Deploy prod not complete\"))\n", "\n", "execution_id = deploy_prd[\"latestExecution\"][\"pipelineExecutionId\"]"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Under the hood, SageMaker model monitor runs in SageMaker processing jobs. Use the execution ID to fetch the names of the processing job and the schedule."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["processing_job_name = \"mlops-{}-pbl-{}\".format(model_name, execution_id)\n", "schedule_name = \"mlops-{}-pms\".format(model_name)\n", "\n", "print(\"processing job name: {}\".format(processing_job_name))\n", "print(\"schedule name: {}\".format(schedule_name))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Explore Baseline\n", "\n", "Now fetch the baseline results from the processing job. This cell will throw an exception if the processing job is not complete - if that happens, just wait several minutes and try again. "]}, {"cell_type": "code", "execution_count": null, "metadata": {"scrolled": true}, "outputs": [], "source": ["import sagemaker\n", "from sagemaker.model_monitor import BaseliningJob, MonitoringExecution\n", "from sagemaker.s3 import S3Downloader\n", "\n", "sagemaker_session = sagemaker.Session()\n", "baseline_job = BaseliningJob.from_processing_name(sagemaker_session, processing_job_name)\n", "status = baseline_job.describe()[\"ProcessingJobStatus\"]\n", "if status != \"Completed\":\n", " raise (Exception(\"Please wait. Processing job not complete, status: {}\".format(status)))\n", "\n", "baseline_results_uri = baseline_job.outputs[0].destination"]}, {"cell_type": "markdown", "metadata": {}, "source": ["SageMaker model monitor generates two types of files. Take a look at the statistics file first. It calculates various statistics for each feature of the dataset, including the mean, standard deviation, minimum value, maximum value, and more. "]}, {"cell_type": "code", "execution_count": null, "metadata": {"scrolled": true}, "outputs": [], "source": ["import pandas as pd\n", "import json\n", "\n", "baseline_statistics = baseline_job.baseline_statistics().body_dict\n", "schema_df = pd.json_normalize(baseline_statistics[\"features\"])\n", "schema_df[\n", " [\n", " \"name\",\n", " \"numerical_statistics.mean\",\n", " \"numerical_statistics.std_dev\",\n", " \"numerical_statistics.min\",\n", " \"numerical_statistics.max\",\n", " ]\n", "].head()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Now look at the suggested [constraints files](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-byoc-constraints.html)\u21d7. As the name implies, these are constraints which SageMaker model monitor recommends. If the live data which is sent to your production SageMaker Endpoint violates these constraints, this indicates data drift, and model monitor can raise an alert to trigger retraining. Of course, you can set different constraints based on the statistics which you viewed previously."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["baseline_constraints = baseline_job.suggested_constraints().body_dict\n", "constraints_df = pd.json_normalize(baseline_constraints[\"features\"])\n", "constraints_df.head()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### View data capture\n", "\n", "When the \"Deploy Production\" stage of the MLOps pipeline deploys a SageMaker endpoint, it also enables data capture. This means the incoming requests to the endpoint, as well as the results from the ML model, are stored in an S3 location. Model monitor can analyze this data and compare it to the baseline to ensure that no constraints are violated. \n", "\n", "Use the code below to check how many files have been created by the data capture, and view the latest file in detail. Note, data capture relies on data being sent to the production endpoint. If you don't see any files yet, wait several minutes and try again."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["bucket = sagemaker_session.default_bucket()\n", "data_capture_logs_uri = \"s3://{}/mlops-{}/datacapture/{}\".format(\n", " bucket, model_name, prd_endpoint_name\n", ")\n", "\n", "capture_files = S3Downloader.list(data_capture_logs_uri)\n", "print(\"Found {} files\".format(len(capture_files)))\n", "\n", "if capture_files:\n", " # Get the first line of the most recent file\n", " event = json.loads(S3Downloader.read_file(capture_files[-1]).split(\"\\n\")[0])\n", " print(\"\\nLast file:\\n{}\".format(json.dumps(event, indent=2)))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### View monitoring schedule\n", "\n", "There are some useful functions for plotting and rendering distribution statistics or constraint violations provided in a `utils` file in the [SageMaker Examples GitHub](https://github.com/aws/amazon-sagemaker-examples/tree/master/sagemaker_model_monitor/visualization)\u21d7. Grab a copy of this code to use in this notebook. "]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["!wget -O utils.py --quiet https://raw.githubusercontent.com/awslabs/amazon-sagemaker-examples/master/sagemaker_model_monitor/visualization/utils.py\n", "import utils as mu"]}, {"cell_type": "markdown", "metadata": {}, "source": ["The [minimum scheduled run time](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-scheduling.html)\u21d7 for model monitor is one hour, which means you will need to wait at least an hour to see any results. Use the code below to check the schedule status and list the next run. If you are completing this notebook as part of a workshop, your host will have activities which you can complete while you wait. "]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["sm = boto3.client(\"sagemaker\")\n", "\n", "response = sm.describe_monitoring_schedule(MonitoringScheduleName=schedule_name)\n", "print(\"Schedule Status: {}\".format(response[\"MonitoringScheduleStatus\"]))\n", "\n", "now = datetime.now(tzlocal())\n", "next_hour = (now + timedelta(hours=1)).replace(minute=0)\n", "scheduled_diff = (next_hour - now).seconds // 60\n", "print(\"Next schedule in {} minutes\".format(scheduled_diff))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["While you wait, you can take a look at the CloudFormation template which is used as a base for the CloudFormation template built by CodeDeploy to deploy the production application. \n", "\n", "Alterntively, you can jump ahead to [Trigger Retraining](#Trigger-Retraining) which will kick off another run of the code pipeline whilst you wait."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["!cat ../assets/deploy-model-prd.yml"]}, {"cell_type": "markdown", "metadata": {}, "source": ["A couple of minutes after the model monitoring schedule has run, you can use the code below to fetch the latest schedule status. A completed schedule run may have found violations. "]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["processing_job_arn = None\n", "\n", "while processing_job_arn is None:\n", " try:\n", " response = sm.list_monitoring_executions(MonitoringScheduleName=schedule_name)\n", " except ClientError as e:\n", " print(e.response[\"Error\"][\"Message\"])\n", " for mon in response[\"MonitoringExecutionSummaries\"]:\n", " status = mon[\"MonitoringExecutionStatus\"]\n", " now = datetime.now(tzlocal())\n", " created_diff = (now - mon[\"CreationTime\"]).seconds // 60\n", " print(\"Schedule status: {}, Created: {} minutes ago\".format(status, created_diff))\n", " if status in [\"Completed\", \"CompletedWithViolations\"]:\n", " processing_job_arn = mon[\"ProcessingJobArn\"]\n", " break\n", " if status == \"InProgress\":\n", " break\n", " else:\n", " raise (Exception(\"Please wait. No Schedules executing\"))\n", " time.sleep(10)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### View monitoring results\n", "\n", "Once the model monitoring schedule has had a chance to run at least once, you can take a look at the results. First, load the monitoring execution results from the latest scheduled run."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["if processing_job_arn:\n", " execution = MonitoringExecution.from_processing_arn(\n", " sagemaker_session=sagemaker.Session(), processing_job_arn=processing_job_arn\n", " )\n", " exec_inputs = {inp[\"InputName\"]: inp for inp in execution.describe()[\"ProcessingInputs\"]}\n", " exec_results_uri = execution.output.destination\n", "\n", " print(\"Monitoring Execution results: {}\".format(exec_results_uri))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Take a look at the files which have been saved in the S3 output location. If violations were found, you should see a constraint violations file in addition to the statistics and constraints file which you viewed before."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["!aws s3 ls $exec_results_uri/"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Now, fetch the monitoring statistics and violations. Then use the utils code to visualize the results in a table. It will highlight any baseline drift found by the model monitor. Drift can happen for categorical features (for inferred string styles) or for numerical features (e.g. total fare amount)."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["# Get the baseline and monitoring statistics & violations\n", "baseline_statistics = baseline_job.baseline_statistics().body_dict\n", "execution_statistics = execution.statistics().body_dict\n", "violations = execution.constraint_violations().body_dict[\"violations\"]"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["mu.show_violation_df(\n", " baseline_statistics=baseline_statistics,\n", " latest_statistics=execution_statistics,\n", " violations=violations,\n", ")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Trigger Retraining\n", "\n", "The CodePipeline instance is configured with [CloudWatch Events](https://docs.aws.amazon.com/codepipeline/latest/userguide/create-cloudtrail-S3-source.html)\u21d7 to start the pipeline for retraining when the drift detection triggers specific metric alarms.\n", "\n", "You can simulate drift by putting a metric value above the threshold of `0.2` directly into CloudWatch. This will trigger the alarm, and start the code pipeline.\n", "\n", "
\n", " Tip: This alarm is configured only for the latest production endpoint, so re-training will only occur if you are putting metrics against the latest endpoint.\n", "
\n", "\n", "![Metric graph in CloudWatch](../docs/cloudwatch-alarm.png)\n", "\n", "Run the code below to trigger the metric alarm. The cell output will be a link to CloudWatch, where you can see the alarm (similar to the screenshot above), and a link to CodePipeline which you will see run again. Note that it can take a couple of minutes for everything to trigger."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["from datetime import datetime\n", "import random\n", "\n", "cloudwatch = boto3.client(\"cloudwatch\")\n", "\n", "# Define the metric name and threshold\n", "metric_name = \"feature_baseline_drift_total_amount\"\n", "metric_threshold = 0.2\n", "\n", "# Put a new metric to trigger an alaram\n", "def put_drift_metric(value):\n", " print(\"Putting metric: {}\".format(value))\n", " response = cloudwatch.put_metric_data(\n", " Namespace=\"aws/sagemaker/Endpoints/data-metrics\",\n", " MetricData=[\n", " {\n", " \"MetricName\": metric_name,\n", " \"Dimensions\": [\n", " {\"Name\": \"MonitoringSchedule\", \"Value\": schedule_name},\n", " {\"Name\": \"Endpoint\", \"Value\": prd_endpoint_name},\n", " ],\n", " \"Timestamp\": datetime.now(),\n", " \"Value\": value,\n", " \"Unit\": \"None\",\n", " },\n", " ],\n", " )\n", "\n", "\n", "def get_drift_stats():\n", " response = cloudwatch.get_metric_statistics(\n", " Namespace=\"aws/sagemaker/Endpoints/data-metrics\",\n", " MetricName=metric_name,\n", " Dimensions=[\n", " {\"Name\": \"MonitoringSchedule\", \"Value\": schedule_name},\n", " {\"Name\": \"Endpoint\", \"Value\": prd_endpoint_name},\n", " ],\n", " StartTime=datetime.now() - timedelta(minutes=2),\n", " EndTime=datetime.now(),\n", " Period=1,\n", " Statistics=[\"Average\"],\n", " Unit=\"None\",\n", " )\n", " if \"Datapoints\" in response and len(response[\"Datapoints\"]) > 0:\n", " return response[\"Datapoints\"][0][\"Average\"]\n", " return 0\n", "\n", "\n", "print(\"Simluate drift on endpoint: {}\".format(prd_endpoint_name))\n", "\n", "while True:\n", " put_drift_metric(round(random.uniform(metric_threshold, 1.0), 4))\n", " drift_stats = get_drift_stats()\n", " print(\"Average drift amount: {}\".format(get_drift_stats()))\n", " if drift_stats > metric_threshold:\n", " break\n", " time.sleep(1)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Click through to the Alarm and CodePipeline Execution history with the links below."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["# Output a html link to the cloudwatch dashboard\n", "metric_alarm_name = \"mlops-{}-metric-gt-threshold\".format(model_name)\n", "HTML(\n", " \"\"\"CloudWatch Alarm triggers\n", " Code Pipeline Execution\"\"\".format(\n", " region, metric_alarm_name, pipeline_name\n", " )\n", ")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Once the pipeline is running again you can jump back up to [Inspect Training Job](#Inspect-Training-Job)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Create a CloudWatch dashboard\n", "\n", "Finally, use the code below to create a CloudWatch dashboard to visualize the key performance metrics and alarms which you have created during this demo. The cell will output a link to the dashboard. This dashboard shows 9 charts in three rows, where the first row displays Lambda metrics, the second row displays SageMaker metrics, and the third row (shown in the screenshot below) displays the alarms set up for the pipeline.\n", "\n", "![Graphs in CloudWatch dashboard](../docs/cloudwatch-dashboard.png)"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["from string import Template\n", "\n", "sts = boto3.client(\"sts\")\n", "account_id = sts.get_caller_identity().get(\"Account\")\n", "dashboard_name = \"mlops-{0}-{1}\".format(model_name, config[\"SageMakerProjectId\"])\n", "\n", "with open(\"dashboard.json\") as f:\n", " dashboard_body = Template(f.read()).substitute(\n", " region=region, account_id=account_id, model_name=model_name\n", " )\n", " response = cloudwatch.put_dashboard(DashboardName=dashboard_name, DashboardBody=dashboard_body)\n", "\n", "# Output a html link to the cloudwatch dashboard\n", "HTML(\n", " 'CloudWatch Dashboard'.format(\n", " region, dashboard_name\n", " )\n", ")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Congratulations! You have made it to the end of this notebook, and have automated a safe MLOps pipeline using a wide range of AWS services. \n", "\n", "You can use the other notebook in this repository [workflow.ipynb](workflow.ipynb) to implement your own ML model and deploy it as part of this pipeline. Or, if you are finished with the content, follow the instructions in the next section to clean up the resources you have deployed."]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Cleanup\n", "\n", "Execute the following cell to delete the stacks created in the pipeline. For a model name of **nyctaxi** these would be:\n", "\n", "1. *nyctaxi*-deploy-prd\n", "2. *nyctaxi*-deploy-dev\n", "3. *nyctaxi*-workflow\n", "4. sagemaker-custom-resource"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["cfn = boto3.client(\"cloudformation\")\n", "\n", "# Delete the prod and then dev stack\n", "for stack_name in [\n", " f\"{pipeline_name}-deploy-prd\",\n", " f\"{pipeline_name}-deploy-dev\",\n", " f\"{pipeline_name}-workflow\",\n", " f\"mlops-{model_name}-{config['SageMakerProjectId']}-sagemaker-custom-resource\",\n", "]:\n", " print(\"Deleting stack: {}\".format(stack_name))\n", " cfn.delete_stack(StackName=stack_name)\n", " cfn.get_waiter(\"stack_delete_complete\").wait(StackName=stack_name)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["The following code will delete the dashboard."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["cloudwatch.delete_dashboards(DashboardNames=[dashboard_name])\n", "print(\"Dashboard deleted\")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["The following code will clean up all objects in the artifact bucket and delete the SageMaker project."]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["s3_resource = boto3.resource('s3')\n", "s3_artifact_bucket = s3_resource.Bucket(artifact_bucket)\n", "s3_artifact_bucket.object_versions.delete()\n", "print(\"Artifact bucket objects deleted\")\n", "\n", "sm.delete_project(\n", " ProjectName=PROJECT_NAME\n", ")\n", "print(\"SageMaker Project deleted\")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Finally, close this notebook."]}], "metadata": {"instance_type": "ml.t3.medium", "kernelspec": {"display_name": "conda_python3", "language": "python", "name": "conda_python3"}, "language_info": {"codemirror_mode": {"name": "ipython", "version": 3}, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6"}}, "nbformat": 4, "nbformat_minor": 4} \ No newline at end of file +{"cells":[{"cell_type":"markdown","metadata":{},"source":["# Safe MLOps Deployment Pipeline\n","\n","\n","## Overview\n","\n","In this notebook you will step through an MLOps pipeline to build, train, deploy and monitor an XGBoost regression model for predicting the expected taxi fare using the New York City Taxi [dataset](https://registry.opendata.aws/nyc-tlc-trip-records-pds/)⇗. This safe pipeline features a [canary deployment](https://docs.aws.amazon.com/wellarchitected/latest/machine-learning-lens/canary-deployment.html) strategy with rollback on error. You will learn how to trigger and monitor the pipeline, inspect the training workflow, use model monitor to set up alerts, and create a canary deployment.\n","\n","
\n"," Note: This notebook assumes prior familiarity with the basics training ML models on Amazon SageMaker. Data preparation and visualization, although present, will be kept to a minimum. If you are not familiar with the basic concepts and features of SageMaker, we recommend reading the SageMaker documentation⇗ and completing the workshops and samples in AWS SageMaker Examples GitHub⇗ and AWS Samples GitHub⇗. \n","
\n","\n","### Contents\n","\n","This notebook has the following key sections:\n","\n","1. [Data Prep](#Data-Prep)\n","2. [Build](#Build)\n","3. [Train Model](#Train-Model)\n","4. [Deploy Dev](#Deploy-Dev)\n","5. [Deploy Prod](#Deploy-Prod)\n","6. [Monitor](#Monitor)\n","6. [Cleanup](#Cleanup)\n","\n","### Architecture\n","\n","The architecture diagram below shows the entire MLOps pipeline at a high level.\n","\n","Use the CloudFormation template provided in this repository (`pipeline.yml`) to build the demo in your own AWS account. If you are currently viewing this notebook from SageMaker in your AWS account, then you have already completed this step. CloudFormation deploys several resources:\n"," \n","1. A customer-managed encryption key in in Amazon KMS for encrypting data and artifacts.\n","1. A secret in Amazon Secrets Manager to securely store your GitHub Access Token.\n","1. Several AWS IAM roles so CloudFormation, SageMaker, and other AWS services can perform actions in your AWS account, following the principle of [least privilege](https://docs.aws.amazon.com/IAM/latest/UserGuide/best-practices.html#grant-least-privilege)⇗.\n","1. A messaging service in Amazon SNS to notify you when CodeDeploy has successfully deployed the API, and to receive alerts for retraining and drift detection (signing up for these notifications is optional).\n","1. Two Amazon CloudWatch event rules: one which schedules the pipeline to run every month, and one which triggers the pipeline to run when SageMaker Model Monitor detects certain metrics.\n","1. An Amazon SageMaker Jupyter notebook with this workshop content pre-loaded.\n","1. An Amazon S3 bucket for storing model artifacts.\n","1. An AWS CodePipeline instance with several pre-defined stages. \n","\n","Take a moment to look at all of these resources now deployed in your account. \n","\n","![MLOps pipeline architecture](../docs/mlops-architecture.png)\n","\n","In this notebook, you will work through the CodePipeline instance created by the CloudFormation template. It has several stages:\n","\n","1. **Source** - The pipeline is already configured with two sources. If you upload a new dataset to a specific location in the S3 data bucket, this will trigger the pipeline to run. The Git source can be GitHub, or CodeCommit if you don’t supply your access token. If you commit new code to your repository, this will trigger the pipeline to run. \n","1. **Build** - In this stage, CodeBuild configured by the build specification `model/buildspec.yml` will execute `model/run_pipeline.py` to generate AWS CloudFormation templates for creating the AWS Step Function (including AWS Lambda custom resources), and deployment templates used in the following stages based on the data sets and hyperparameters specified for this pipeline run. You will take a closer look at these files later in this notebook. \n","1. **Train** The Step Functions workflow created in the Build stage is run in this stage. The workflow creates a baseline for the model monitor using a SageMaker processing job, and trains an XGBoost model on the taxi ride dataset using a SageMaker training job.\n","1. **Deploy Dev** In this stage, a CloudFormation template created in the build stage (from `assets/deploy-model-dev.yml`) deploys a dev endpoint. This will allow you to run tests on the model and decide if the model is of sufficient quality to deploy into production.\n","1. **Deploy Production** The final stage of the pipeline is the only stage which does not run automatically as soon as the previous stage is complete. It waits for a user to manually approve the model which was previously deployed to dev. As soon as the model is approved, a CloudFormation template (packaged from `assets/deploy-model-prod.yml` to include the Lambda functions saved and uploaded as ZIP files in S3) deploys the production endpoint. It configures autoscaling and enables data capture. It creates a model monitoring schedule and sets CloudWatch alarms for certain metrics. It also sets up an AWS CodeDeploy instance which deploys a set of AWS Lambda functions and an Amazon API Gateway to sit in front of the SageMaker endpoint. This stage can make use of canary deployment to safely switch from an old model to a new model."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["# Import the latest sagemaker and boto3 SDKs.\n","import sys\n","\n","!{sys.executable} -m pip install --upgrade pip\n","!{sys.executable} -m pip install -qU awscli boto3 \"sagemaker>=2.1.0<3\" tqdm\n","!{sys.executable} -m pip install -qU \"stepfunctions==2.0.0\"\n","!{sys.executable} -m pip show sagemaker stepfunctions"]},{"cell_type":"markdown","metadata":{},"source":["Restart your SageMaker kernel then continue with this notebook."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["# Replace `None` with the project name when creating SageMaker Project\n","# You can find it from the left panel in Studio\n","\n","PROJECT_NAME = None\n","\n","assert PROJECT_NAME is not None and isinstance(\n"," PROJECT_NAME, str\n","), \"Please specify the project name as string\""]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["import boto3\n","from IPython.core.display import HTML, display\n","\n","\n","def get_provisioned_product_name(project_name):\n"," region = boto3.Session().region_name\n"," sc = boto3.client(\"servicecatalog\")\n"," products = sc.search_provisioned_products(\n"," Filters={\n"," \"SearchQuery\": [\n"," project_name,\n"," ]\n"," }\n"," )\n"," pp = products[\"ProvisionedProducts\"]\n"," if len(pp) != 1:\n"," print(\"Invalid provisioned product name. Open the link below and search manually\")\n"," display(\n"," HTML(\n"," f'Service Catalog'\n"," )\n"," )\n"," raise ValueError(\"Invalid provisioned product\")\n","\n"," return pp[0][\"Name\"]\n","\n","\n","PROVISIONED_PRODUCT_NAME = get_provisioned_product_name(PROJECT_NAME)\n","print(\n"," f\"The associated Service Catalog Provisioned Product Name to this SagaMaker project: {PROVISIONED_PRODUCT_NAME}\"\n",")"]},{"cell_type":"markdown","metadata":{},"source":["In case of any errors, you can examine the Service Catalog console from the above link and find the associated provisioned product name which is something like `example-p-1v7hbpwe594n` and assigns it to `PROVISIONED_PRODUCT_NAME` manually."]},{"cell_type":"markdown","metadata":{},"source":["## Data Prep\n"," \n","In this section of the notebook, you will use the publicly available New York Taxi dataset in preparation for uploading it to S3. We have predownloaded the file for you and you can see it in the directory on the left."]},{"cell_type":"markdown","metadata":{},"source":["Load the dataset into a pandas data frame, taking care to parse the dates correctly."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["import pandas as pd\n","\n","df = pd.read_parquet('nyc-tlc.parquet')\n","df.to_csv('nyc-tlc.csv')\n","\n","parse_dates = [\"lpep_dropoff_datetime\", \"lpep_pickup_datetime\"]\n","trip_df = pd.read_csv(\"nyc-tlc.csv\", parse_dates=parse_dates)\n","\n","trip_df.head()"]},{"cell_type":"markdown","metadata":{},"source":["### Data manipulation\n","\n","Instead of the raw date and time features for pick-up and drop-off, let's use these features to calculate the total time of the trip in minutes, which will be easier to work with for our model."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["trip_df[\"duration_minutes\"] = (\n"," trip_df[\"lpep_dropoff_datetime\"] - trip_df[\"lpep_pickup_datetime\"]\n",").dt.seconds / 60"]},{"cell_type":"markdown","metadata":{},"source":["The dataset contains a lot of columns we don't need, so let's select a sample of columns for our machine learning model. Keep only `total_amount` (fare), `duration_minutes`, `passenger_count`, and `trip_distance`."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["cols = [\"total_amount\", \"duration_minutes\", \"passenger_count\", \"trip_distance\"]\n","data_df = trip_df[cols]\n","print(data_df.shape)\n","data_df.head()"]},{"cell_type":"markdown","metadata":{},"source":["Generate some quick statistics for the dataset to understand the quality."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["data_df.describe()"]},{"cell_type":"markdown","metadata":{},"source":["The table above shows some clear outliers, e.g. -400 or 2626 as fare, or 0 passengers. There are many intelligent methods for identifying and removing outliers, but data cleaning is not the focus of this notebook, so just remove the outliers by setting some min and max values which seem more reasonable. Removing the outliers results in a final dataset of 754,671 rows."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["data_df = data_df[\n"," (data_df.total_amount > 0)\n"," & (data_df.total_amount < 200)\n"," & (data_df.duration_minutes > 0)\n"," & (data_df.duration_minutes < 120)\n"," & (data_df.trip_distance > 0)\n"," & (data_df.trip_distance < 121)\n"," & (data_df.passenger_count > 0)\n","].dropna()\n","print(data_df.shape)"]},{"cell_type":"markdown","metadata":{},"source":["### Data visualization\n","\n","Since this notebook will build a regression model for the taxi data, it's a good idea to check if there is any correlation between the variables in our data. Use scatter plots on a sample of the data to compare trip distance with duration in minutes, and total amount (fare) with duration in minutes."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["import matplotlib.pyplot as plt\n","import seaborn as sns\n","import numpy as np\n","\n","sample_df = data_df.sample(1000)\n","sns.scatterplot(data=sample_df, x=\"duration_minutes\", y=\"trip_distance\")\n","plt.show()"]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["sns.scatterplot(data=sample_df, x=\"duration_minutes\", y=\"total_amount\")\n","plt.show()"]},{"cell_type":"markdown","metadata":{},"source":["These scatter plots look fine and show at least some correlation between our variables. \n","\n","### Data splitting and saving\n","\n","We are now ready to split the dataset into train, validation, and test sets. "]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["from sklearn.model_selection import train_test_split\n","\n","train_df, val_df = train_test_split(data_df, test_size=0.20, random_state=42)\n","val_df, test_df = train_test_split(val_df, test_size=0.05, random_state=42)\n","\n","# Reset the index for our test dataframe\n","test_df.reset_index(inplace=True, drop=True)\n","\n","print(\n"," \"Size of\\n train: {},\\n val: {},\\n test: {} \".format(\n"," train_df.shape[0], val_df.shape[0], test_df.shape[0]\n"," )\n",")"]},{"cell_type":"markdown","metadata":{},"source":["Save the train, validation, and test files as CSV locally on this notebook instance. Notice that you save the train file twice - once as the training data file and once as the baseline data file. The baseline data file will be used by [SageMaker Model Monitor](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor.html)⇗ to detect data drift. Data drift occurs when the statistical nature of the data that your model receives while in production drifts away from the nature of the baseline data it was trained on, which means the model begins to lose accuracy in its predictions."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["train_cols = [\"total_amount\", \"duration_minutes\", \"passenger_count\", \"trip_distance\"]\n","train_df.to_csv(\"train.csv\", index=False, header=False)\n","val_df.to_csv(\"validation.csv\", index=False, header=False)\n","test_df.to_csv(\"test.csv\", index=False, header=False)\n","\n","# Save test and baseline with headers\n","train_df.to_csv(\"baseline.csv\", index=False, header=True)"]},{"cell_type":"markdown","metadata":{},"source":["Now upload these CSV files to your default SageMaker S3 bucket. "]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["import sagemaker\n","\n","# Get the session and default bucket\n","session = sagemaker.session.Session()\n","bucket = session.default_bucket()\n","\n","# Specify data prefix and version\n","prefix = \"nyc-tlc/v1\"\n","\n","s3_train_uri = session.upload_data(\"train.csv\", bucket, prefix + \"/data/training\")\n","s3_val_uri = session.upload_data(\"validation.csv\", bucket, prefix + \"/data/validation\")\n","s3_test_uri = session.upload_data(\"test.csv\", bucket, prefix + \"/data/test\")\n","s3_baseline_uri = session.upload_data(\"baseline.csv\", bucket, prefix + \"/data/baseline\")"]},{"cell_type":"markdown","metadata":{},"source":["You will use the datasets which you have prepared and saved in this section to trigger the pipeline to train and deploy a model in the next section."]},{"cell_type":"markdown","metadata":{},"source":["## Build\n","\n","If you navigate to the CodePipeline instance created for this workshop, you will notice that the Source stage is initially in a `Failed` state. This happens because the dataset, which is one of the sources that can trigger the pipeline, has not yet been uploaded to the S3 location expected by the pipeline.\n","\n","![Failed code pipeline](../docs/pipeline_failed.png)\n","\n","### Trigger Build\n","\n","In this section, you will start a model build and deployment pipeline by packaging up the datasets you prepared in the previous section and uploading these to the S3 source location which triggers the CodePipeline instance created for this workshop. \n","\n","\n","First, import some libraries and load some environment variables which you will need. These environment variables have been set through a [lifecycle configuration](https://docs.aws.amazon.com/sagemaker/latest/dg/notebook-lifecycle-config.html)⇗ script attached to this notebook."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["import boto3\n","from botocore.exceptions import ClientError\n","import os\n","import time\n","\n","\n","def get_config(provisioned_product_name):\n"," sc = boto3.client(\"servicecatalog\")\n"," outputs = sc.get_provisioned_product_outputs(ProvisionedProductName=provisioned_product_name)[\n"," \"Outputs\"\n"," ]\n"," config = {}\n"," for out in outputs:\n"," config[out[\"OutputKey\"]] = out[\"OutputValue\"]\n"," return config\n","\n","\n","config = get_config(PROVISIONED_PRODUCT_NAME)\n","region = config[\"Region\"]\n","artifact_bucket = config[\"ArtifactBucket\"]\n","pipeline_name = config[\"PipelineName\"]\n","model_name = config[\"ModelName\"]\n","workflow_pipeline_arn = config[\"WorkflowPipelineARN\"]\n","\n","print(\"region: {}\".format(region))\n","print(\"artifact bucket: {}\".format(artifact_bucket))\n","print(\"pipeline: {}\".format(pipeline_name))\n","print(\"model name: {}\".format(model_name))\n","print(\"workflow: {}\".format(workflow_pipeline_arn))"]},{"cell_type":"markdown","metadata":{},"source":["From the AWS CodePipeline [documentation](https://docs.aws.amazon.com/codepipeline/latest/userguide/tutorials-simple-s3.html)⇗:\n","\n","> When Amazon S3 is the source provider for your pipeline, you may zip your source file or files into a single .zip and upload the .zip to your source bucket. You may also upload a single unzipped file; however, downstream actions that expect a .zip file will fail.\n","\n","To train a model, you need multiple datasets (train, validation, and test) along with a file specifying the hyperparameters. In this example, you will create one JSON file which contains the S3 dataset locations and one JSON file which contains the hyperparameter values. Then you compress both files into a zip package to be used as input for the pipeline run. "]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["from io import BytesIO\n","import zipfile\n","import json\n","\n","input_data = {\n"," \"TrainingUri\": s3_train_uri,\n"," \"ValidationUri\": s3_val_uri,\n"," \"TestUri\": s3_test_uri,\n"," \"BaselineUri\": s3_baseline_uri,\n","}\n","\n","hyperparameters = {\"num_round\": 50}\n","\n","zip_buffer = BytesIO()\n","with zipfile.ZipFile(zip_buffer, \"a\") as zf:\n"," zf.writestr(\"inputData.json\", json.dumps(input_data))\n"," zf.writestr(\"hyperparameters.json\", json.dumps(hyperparameters))\n","zip_buffer.seek(0)\n","\n","data_source_key = \"{}/data-source.zip\".format(pipeline_name)"]},{"cell_type":"markdown","metadata":{},"source":["Now upload the zip package to your artifact S3 bucket - this action will trigger the pipeline to train and deploy a model."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["s3 = boto3.client(\"s3\")\n","s3.put_object(Bucket=artifact_bucket, Key=data_source_key, Body=bytearray(zip_buffer.read()))"]},{"cell_type":"markdown","metadata":{},"source":["Click the link below to open the AWS console at the Code Pipeline if you don't have it open in another tab.\n","\n","
\n"," Tip: You may need to wait a minute to see the DataSource stage turn green. The page will refresh automatically.\n","
\n","\n","![Source Green](../docs/datasource-after.png)"]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["from IPython.core.display import HTML\n","\n","HTML(\n"," 'Code Pipeline'.format(\n"," region, pipeline_name\n"," )\n",")"]},{"cell_type":"markdown","metadata":{},"source":["### Inspect Build Logs\n","\n","Once the build stage is running, you will see the AWS CodeBuild job turn blue with a status of **In progress**.\n","\n","![Failed code pipeline](../docs/codebuild-inprogress.png)"]},{"cell_type":"markdown","metadata":{},"source":["You can click on the **Details** link displayed in the CodePipeline UI or click the link below to jump directly to the CodeBuild logs.\n","\n","
\n"," Tip: You may need to wait a few seconds for the pipeline to transition into the active (blue) state and for the build to start.\n","
"]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["codepipeline = boto3.client(\"codepipeline\")\n","\n","\n","def get_pipeline_stage(pipeline_name, stage_name):\n"," response = codepipeline.get_pipeline_state(name=pipeline_name)\n"," for stage in response[\"stageStates\"]:\n"," if stage[\"stageName\"] == stage_name:\n"," return stage\n","\n","\n","# Get last execution id\n","build_stage = get_pipeline_stage(pipeline_name, \"Build\")\n","if not \"latestExecution\" in build_stage:\n"," raise (Exception(\"Please wait. Build not started\"))\n","\n","build_url = build_stage[\"actionStates\"][0][\"latestExecution\"][\"externalExecutionUrl\"]\n","\n","# Out a link to the code build logs\n","HTML('Code Build Logs'.format(build_url))"]},{"cell_type":"markdown","metadata":{},"source":["The AWS CodeBuild process is responsible for creating a number of AWS CloudFormation templates which we will explore in more detail in the next section. Two of these templates are used to set up the **Train** step by creating the AWS Step Functions worklow and the custom AWS Lambda functions used within this workflow."]},{"cell_type":"markdown","metadata":{},"source":["## Train Model\n","\n","### Inspect Training Job\n","\n","Wait until the pipeline has started running the Train step (see screenshot) before continuing with the next cells in this notebook. \n","\n","![Training in progress](../docs/train-in-progress.png)\n","\n","When the pipeline has started running the train step, you can click on the **Details** link displayed in the CodePipeline UI (see screenshot above) to view the Step Functions workflow which is running the training job. \n","\n","Alternatively, you can click on the Workflow link from the cell output below once it's available."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["from stepfunctions.workflow import Workflow\n","\n","while True:\n"," try:\n"," workflow = Workflow.attach(workflow_pipeline_arn)\n"," break\n"," except ClientError as e:\n"," print(e.response[\"Error\"][\"Message\"])\n"," time.sleep(10)\n","\n","workflow"]},{"cell_type":"markdown","metadata":{},"source":["### Review Build Script\n","\n","While you wait for the training job to complete, let's take a look at the `run.py` code which was used by the AWS CodeBuild process.\n","\n","This script takes all of the input parameters, including the dataset locations and hyperparameters which you saved to JSON files earlier in this notebook, and uses them to generate the templates which the pipeline needs to run the training job. It *does not* create the actual Step Functions instance - it only generates the templates which define the Step Functions workflow, as well as the CloudFormation input templates which CodePipeline uses to instantiate the Step Functions instance.\n","\n","Step-by-step, the script does the following:\n","\n","1. It collects all the input parameters it needs to generate the templates. This includes information about the environment container needed to run the training job, the input and output data locations, IAM roles needed by various components, encryption keys, and more. It then sets up some basic parameters like the AWS region and the function names.\n","1. If the input parameters specify an environment container stored in ECR, it fetches that container. Otherwise, it fetches the URI of the AWS managed environment container needed for the training job.\n","1. It reads the input data JSON file which you generated earlier in this notebook (and which was included in the zip source for the pipeline), thereby fetching the locations of the train, validation, and baseline data files. Then it formats more parameters which will be needed later in the script, including version IDs and output data locations.\n","1. It reads the hyperparameter JSON file which you generated earlier in this notebook.\n","1. It defines the Step Functions workflow, starting with the input schema, followed by each step of the workflow (i.e. Create Experiment, Baseline Job, Training Job), and finally combines those steps into a workflow graph. \n","1. The workflow graph is saved to file, along with a file containing all of the input parameters saved according to the schema defined in the workflow.\n","1. It saves parameters to file which will be used by CloudFormation to instantiate the Step Functions workflow."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["!pygmentize ../model/run_pipeline.py"]},{"cell_type":"markdown","metadata":{},"source":["### Training Analytics\n","\n","Once the training and baseline jobs are complete (meaning they are displayed in a green color in the Step Functions workflow, this takes around 5 minutes), you can inspect the experiment metrics. The code below will display all experiments in a table. Note that the baseline processing job won't have RMSE metrics - it calculates metrics based on the training data, but does not train a machine learning model. \n","\n","You will [explore the baseline](#Explore-Baseline) results later in this notebook. "]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["from sagemaker import analytics\n","\n","experiment_name = \"mlops-{}\".format(model_name)\n","model_analytics = analytics.ExperimentAnalytics(experiment_name=experiment_name)\n","analytics_df = model_analytics.dataframe()\n","\n","if analytics_df.shape[0] == 0:\n"," raise (Exception(\"Please wait. No training or baseline jobs\"))\n","\n","pd.set_option(\"display.max_colwidth\", 100) # Increase column width to show full copmontent name\n","cols = [\n"," \"TrialComponentName\",\n"," \"DisplayName\",\n"," \"SageMaker.InstanceType\",\n"," \"train:rmse - Last\",\n"," \"validation:rmse - Last\",\n","] # return the last rmse for training and validation\n","analytics_df[analytics_df.columns & cols].head(2)"]},{"cell_type":"markdown","metadata":{},"source":["## Deploy Dev\n","\n","### Test Dev Deployment\n","\n","When the pipeline has finished training a model, it automatically moves to the next step, where the model is deployed as a SageMaker Endpoint. This endpoint is part of your dev deployment, therefore, in this section, you will run some tests on the endpoint to decide if you want to deploy this model into production.\n","\n","First, run the cell below to fetch the name of the SageMaker Endpoint."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["codepipeline = boto3.client(\"codepipeline\")\n","\n","deploy_dev = get_pipeline_stage(pipeline_name, \"DeployDev\")\n","if not \"latestExecution\" in deploy_dev:\n"," raise (Exception(\"Please wait. Deploy dev not started\"))\n","\n","execution_id = deploy_dev[\"latestExecution\"][\"pipelineExecutionId\"]\n","dev_endpoint_name = \"sagemaker-mlops-{}-dev\".format(model_name)\n","\n","print(\"endpoint name: {}\".format(dev_endpoint_name))"]},{"cell_type":"markdown","metadata":{},"source":["If you moved through the previous section very quickly, you will need to wait until the dev endpoint has been successfully deployed and the pipeline is waiting for approval to deploy to production (see screenshot). It can take up to 10 minutes for SageMaker to create an endpoint.\n","\n","![Deploying dev endpoint in code pipeline](../docs/dev-deploy-ready.png)\n","\n","Alternatively, run the code below to check the status of your endpoint. Wait until the status of the endpoint is 'InService'."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["sm = boto3.client(\"sagemaker\")\n","\n","while True:\n"," try:\n"," response = sm.describe_endpoint(EndpointName=dev_endpoint_name)\n"," print(\"Endpoint status: {}\".format(response[\"EndpointStatus\"]))\n"," if response[\"EndpointStatus\"] == \"InService\":\n"," break\n"," except ClientError as e:\n"," print(e.response[\"Error\"][\"Message\"])\n"," time.sleep(10)"]},{"cell_type":"markdown","metadata":{},"source":["Now that your endpoint is ready, let's write some code to run the test data (which you split off from the dataset and saved to file at the start of this notebook) through the endpoint for inference. The code below supports both v1 and v2 of the SageMaker SDK, but we recommend using v2 of the SDK in all of your future projects."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["import numpy as np\n","from tqdm import tqdm\n","\n","from sagemaker.predictor import Predictor\n","from sagemaker.serializers import CSVSerializer\n","\n","\n","def get_predictor(endpoint_name):\n"," xgb_predictor = Predictor(endpoint_name)\n"," xgb_predictor.serializer = CSVSerializer()\n"," return xgb_predictor\n","\n","\n","def predict(predictor, data, rows=500):\n"," split_array = np.array_split(data, round(data.shape[0] / float(rows)))\n"," predictions = \"\"\n"," for array in tqdm(split_array):\n"," predictions = \",\".join([predictions, predictor.predict(array).decode(\"utf-8\")])\n"," return np.fromstring(predictions[1:], sep=\",\")"]},{"cell_type":"markdown","metadata":{},"source":["Now use the `predict` function, which was defined in the code above, to run the test data through the endpoint and generate the predictions."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["dev_predictor = get_predictor(dev_endpoint_name)\n","predictions = predict(dev_predictor, test_df[test_df.columns[1:]].values)"]},{"cell_type":"markdown","metadata":{},"source":["Next, load the predictions into a data frame, and join it with your test data. Then, calculate absolute error as the difference between the actual taxi fare and the predicted taxi fare. Display the results in a table, sorted by the highest absolute error values."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["pred_df = pd.DataFrame({\"total_amount_predictions\": predictions})\n","pred_df = test_df.join(pred_df) # Join on all\n","pred_df[\"error\"] = abs(pred_df[\"total_amount\"] - pred_df[\"total_amount_predictions\"])\n","\n","pred_df.sort_values(\"error\", ascending=False).head()"]},{"cell_type":"markdown","metadata":{},"source":["From this table, we note that some short trip distances have large errors because the low predicted fare does not match the high actual fare. This could be the result of a generous tip which we haven't included in this dataset.\n","\n","You can also analyze the results by plotting the absolute error to visualize outliers. In this graph, we see that most of the outliers are cases where the model predicted a much lower fare than the actual fare. There are only a few outliers where the model predicted a higher fare than the actual fare."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["sns.scatterplot(data=pred_df, x=\"total_amount_predictions\", y=\"total_amount\", hue=\"error\")\n","plt.show()"]},{"cell_type":"markdown","metadata":{},"source":["If you want one overall measure of quality for the model, you can calculate the root mean square error (RMSE) for the predicted fares compared to the actual fares. Compare this to the [results calculated on the validation set](#validation-results) at the end of the 'Inspect Training Job' section."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["from math import sqrt\n","from sklearn.metrics import mean_squared_error\n","\n","\n","def rmse(pred_df):\n"," return sqrt(mean_squared_error(pred_df[\"total_amount\"], pred_df[\"total_amount_predictions\"]))\n","\n","\n","print(\"RMSE: {}\".format(rmse(pred_df)))"]},{"cell_type":"markdown","metadata":{},"source":["## Deploy Prod\n","\n","### Approve Deployment to Production\n","\n","If you are happy with the results of the model, you can go ahead and approve the model to be deployed into production. You can do so by clicking the **Review** button in the CodePipeline UI, leaving a comment to explain why you approve this model, and clicking on **Approve**. \n","\n","Alternatively, you can create a Jupyter widget which (when enabled) allows you to comment and approve the model directly from this notebook. Run the cell below to see this in action."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["import ipywidgets as widgets\n","\n","\n","def on_click(obj):\n"," result = {\"summary\": approval_text.value, \"status\": obj.description}\n"," response = codepipeline.put_approval_result(\n"," pipelineName=pipeline_name,\n"," stageName=\"DeployDev\",\n"," actionName=\"ApproveDeploy\",\n"," result=result,\n"," token=approval_action[\"token\"],\n"," )\n"," button_box.close()\n"," print(result)\n","\n","\n","# Create the widget if we are ready for approval\n","deploy_dev = get_pipeline_stage(pipeline_name, \"DeployDev\")\n","if not \"latestExecution\" in deploy_dev[\"actionStates\"][-1]:\n"," raise (Exception(\"Please wait. Deploy dev not complete\"))\n","\n","approval_action = deploy_dev[\"actionStates\"][-1][\"latestExecution\"]\n","if approval_action[\"status\"] == \"Succeeded\":\n"," print(\"Dev approved: {}\".format(approval_action[\"summary\"]))\n","elif \"token\" in approval_action:\n"," approval_text = widgets.Text(placeholder=\"Optional approval message\")\n"," approve_btn = widgets.Button(description=\"Approved\", button_style=\"success\", icon=\"check\")\n"," reject_btn = widgets.Button(description=\"Rejected\", button_style=\"danger\", icon=\"close\")\n"," approve_btn.on_click(on_click)\n"," reject_btn.on_click(on_click)\n"," button_box = widgets.HBox([approval_text, approve_btn, reject_btn])\n"," display(button_box)\n","else:\n"," raise (Exception(\"Please wait. No dev approval\"))"]},{"cell_type":"markdown","metadata":{},"source":["### Test Production Deployment\n","\n","Within about a minute after approving the model deployment, you should see the pipeline start on the final step: deploying your model into production. In this section, you will check the deployment status and test the production endpoint after it has been deployed.\n","\n","![Deploy production endpoint in code pipeline](../docs/deploy-production.png)\n","\n","This step of the pipeline uses CloudFormation to deploy a number of resources on your behalf. In particular, it creates:\n","\n","1. A production-ready SageMaker Endpoint for your model, with [data capture](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-data-capture.html)⇗ (used by SageMaker Model Monitor) and [autoscaling](https://docs.aws.amazon.com/sagemaker/latest/dg/endpoint-auto-scaling.html)⇗ enabled.\n","1. A [model monitoring schedule](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-scheduling.html)⇗ which outputs the results to CloudWatch metrics, along with a [CloudWatch Alarm](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html)⇗ which will notify you when a violation occurs. \n","1. A CodeDeploy instance which creates a simple app by deploying API Gateway, three Lambda functions, and an alarm to notify of the success or failure of this deployment. The code for the Lambda functions can be found in `api/app.py`, `api/pre_traffic_hook.py`, and `api/post_traffic_hook.py`. These functions update the endpoint to enable data capture, format and submit incoming traffic to the SageMaker endpoint, and capture the data logs.\n","\n","![Components of production deployment](../docs/cloud-formation.png)\n","\n","Let's check how the deployment is progressing. Use the code below to fetch the execution ID of the deployment step. Then generate a table which lists the resources created by the CloudFormation stack and their creation status. You can re-run the cell after a few minutes to see how the steps are progressing."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["deploy_prd = get_pipeline_stage(pipeline_name, \"DeployPrd\")\n","if not \"latestExecution\" in deploy_prd or not \"latestExecution\" in deploy_prd[\"actionStates\"][0]:\n"," raise (Exception(\"Please wait. Deploy prd not started\"))\n","\n","execution_id = deploy_prd[\"latestExecution\"][\"pipelineExecutionId\"]"]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["from datetime import datetime, timedelta\n","from dateutil.tz import tzlocal\n","\n","\n","def get_event_dataframe(events):\n"," stack_cols = [\n"," \"LogicalResourceId\",\n"," \"ResourceStatus\",\n"," \"ResourceStatusReason\",\n"," \"Timestamp\",\n"," ]\n"," stack_event_df = pd.DataFrame(events)[stack_cols].fillna(\"\")\n"," stack_event_df[\"TimeAgo\"] = datetime.now(tzlocal()) - stack_event_df[\"Timestamp\"]\n"," return stack_event_df.drop(\"Timestamp\", axis=1)\n","\n","\n","cfn = boto3.client(\"cloudformation\")\n","\n","stack_name = stack_name = \"{}-deploy-prd\".format(pipeline_name)\n","print(\"stack name: {}\".format(stack_name))\n","\n","# Get latest stack events\n","while True:\n"," try:\n"," response = cfn.describe_stack_events(StackName=stack_name)\n"," break\n"," except ClientError as e:\n"," print(e.response[\"Error\"][\"Message\"])\n"," time.sleep(10)\n","\n","get_event_dataframe(response[\"StackEvents\"]).head()"]},{"cell_type":"markdown","metadata":{},"source":["The resource of most interest to us is the endpoint. This takes on average 10 minutes to deploy. In the meantime, you can take a look at the Python code used for the application. \n","\n","The `app.py` is the main entry point invoking the Amazon SageMaker endpoint. It returns results along with a custom header for the endpoint we invoked."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["!pygmentize ../api/app.py"]},{"cell_type":"markdown","metadata":{},"source":["The `pre_traffic_hook.py` lambda is invoked prior to deployment and confirms the endpoint has data capture enabled."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["!pygmentize ../api/pre_traffic_hook.py"]},{"cell_type":"markdown","metadata":{},"source":["The `post_traffic_hook.py` lambda is invoked to perform any final checks, in this case to verify that we have received log data from data capature."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["!pygmentize ../api/post_traffic_hook.py"]},{"cell_type":"markdown","metadata":{},"source":["Use the code below to fetch the name of the endpoint, then run a loop to wait for the endpoint to be fully deployed. You need the status to be 'InService'."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["prd_endpoint_name = \"mlops-{}-prd\".format(model_name)\n","print(\"prod endpoint: {}\".format(prd_endpoint_name))"]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["sm = boto3.client(\"sagemaker\")\n","\n","while True:\n"," try:\n"," response = sm.describe_endpoint(EndpointName=prd_endpoint_name)\n"," print(\"Endpoint status: {}\".format(response[\"EndpointStatus\"]))\n"," # Wait until the endpoint is in service with data capture enabled\n"," if (\n"," response[\"EndpointStatus\"] == \"InService\"\n"," and \"DataCaptureConfig\" in response\n"," and response[\"DataCaptureConfig\"][\"EnableCapture\"]\n"," ):\n"," break\n"," except ClientError as e:\n"," print(e.response[\"Error\"][\"Message\"])\n"," time.sleep(10)"]},{"cell_type":"markdown","metadata":{},"source":["When the endpoint status is 'InService', you can continue. Earlier in this notebook, you created some code to send data to the dev endpoint. Reuse this code now to send a sample of the test data to the production endpoint. Since data capture is enabled on this endpoint, you want to send single records at a time, so the model monitor can map these records to the baseline. \n","\n","You will [inspect the model monitor](#Inspect-Model-Monitor) later in this notebook. For now, just check if you can send data to the endpoint and receive predictions in return."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["prd_predictor = get_predictor(prd_endpoint_name)\n","sample_values = test_df[test_df.columns[1:]].sample(100).values\n","predictions = predict(prd_predictor, sample_values, rows=1)\n","predictions"]},{"cell_type":"markdown","metadata":{},"source":["### Test REST API\n","\n","Although you already tested the SageMaker endpoint in the previous section, it is also a good idea to test the application created with API Gateway. \n","\n","![Traffic shift between endpoints](../docs/lambda-deploy-create.png)\n","\n","Follow the link below to open the Lambda Deployment where you can see the in-progress and completed deployments. You can also click to expand the **SAM template** to see the packaged CloudFormation template used in the deployment."]},{"cell_type":"code","execution_count":null,"metadata":{"scrolled":true},"outputs":[],"source":["HTML(\n"," 'Lambda Deployment'.format(\n"," region, model_name\n"," )\n",")"]},{"cell_type":"markdown","metadata":{},"source":["Run the code below to confirm that the endpoint is in service. It will complete once the REST API is available."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["def get_stack_status(stack_name):\n"," response = cfn.describe_stacks(StackName=stack_name)\n"," if response[\"Stacks\"]:\n"," stack = response[\"Stacks\"][0]\n"," outputs = None\n"," if \"Outputs\" in stack:\n"," outputs = dict([(o[\"OutputKey\"], o[\"OutputValue\"]) for o in stack[\"Outputs\"]])\n"," return stack[\"StackStatus\"], outputs\n","\n","\n","outputs = None\n","while True:\n"," try:\n"," status, outputs = get_stack_status(stack_name)\n"," response = sm.describe_endpoint(EndpointName=prd_endpoint_name)\n"," print(\"Endpoint status: {}\".format(response[\"EndpointStatus\"]))\n"," if outputs:\n"," break\n"," elif status.endswith(\"FAILED\"):\n"," raise (Exception(\"Stack status: {}\".format(status)))\n"," except ClientError as e:\n"," print(e.response[\"Error\"][\"Message\"])\n"," time.sleep(10)\n","\n","if outputs:\n"," print(\"deployment application: {}\".format(outputs[\"DeploymentApplication\"]))\n"," print(\"rest api: {}\".format(outputs[\"RestApi\"]))"]},{"cell_type":"markdown","metadata":{},"source":["If you are performing an update on your production deployment as a result of running [Trigger Retraining](#Trigger-Retraining) you will then be able to expand the Lambda Deployment tab to reveal the resources. Click on the **ApiFunctionAliaslive** link to see the Lambda Deployment in progress. \n","\n","![Traffic shift between endpoints](../docs/lambda-deploy-update.png)\n","\n","This page will be updated to list the deployment events. It also has a link to the Deployment Application which you can access in the output of the next cell."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["HTML(\n"," 'CodeDeploy application'.format(\n"," region, outputs[\"DeploymentApplication\"]\n"," )\n",")"]},{"cell_type":"markdown","metadata":{},"source":["CodeDeploy will perform a canary deployment and send 10% of the traffic to the new endpoint over a 5-minute period.\n","\n","![Traffic shift between endpoints](../docs/code-deploy.gif)"]},{"cell_type":"markdown","metadata":{},"source":["We can invoke the REST API and inspect the headers being returned to see which endpoint we are hitting. You will occasionally see the cell below show a different endpoint that settles to the new version once the stack is complete. "]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["%%time\n","\n","from urllib import request\n","\n","headers = {\"Content-type\": \"text/csv\"}\n","payload = test_df[test_df.columns[1:]].head(1).to_csv(header=False, index=False).encode(\"utf-8\")\n","rest_api = outputs[\"RestApi\"]\n","\n","while True:\n"," try:\n"," resp = request.urlopen(request.Request(rest_api, data=payload, headers=headers))\n"," print(\n"," \"Response code: %d: endpoint: %s\"\n"," % (resp.getcode(), resp.getheader(\"x-sagemaker-endpoint\"))\n"," )\n"," status, outputs = get_stack_status(stack_name)\n"," if status.endswith(\"COMPLETE\"):\n"," print(\"Deployment complete\\n\")\n"," break\n"," elif status.endswith(\"FAILED\"):\n"," raise (Exception(\"Stack status: {}\".format(status)))\n"," except ClientError as e:\n"," print(e.response[\"Error\"][\"Message\"])\n"," time.sleep(10)"]},{"cell_type":"markdown","metadata":{},"source":["## Monitor\n","\n","### Inspect Model Monitor\n","\n","When you prepared the datasets for model training at the start of this notebook, you saved a baseline dataset (a copy of the train dataset). Then, when you approved the model for deployment into production, the pipeline set up an SageMaker Endpoint with data capture enabled and a model monitoring schedule. In this section, you will take a closer look at the model monitor results.\n","\n","To start off, fetch the latest production deployment execution ID."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["deploy_prd = get_pipeline_stage(pipeline_name, \"DeployPrd\")\n","if not \"latestExecution\" in deploy_prd:\n"," raise (Exception(\"Please wait. Deploy prod not complete\"))\n","\n","execution_id = deploy_prd[\"latestExecution\"][\"pipelineExecutionId\"]"]},{"cell_type":"markdown","metadata":{},"source":["Under the hood, SageMaker model monitor runs in SageMaker processing jobs. Use the execution ID to fetch the names of the processing job and the schedule."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["processing_job_name = \"mlops-{}-pbl\".format(model_name)\n","schedule_name = \"mlops-{}-pms\".format(model_name)\n","\n","print(\"processing job name: {}\".format(processing_job_name))\n","print(\"schedule name: {}\".format(schedule_name))"]},{"cell_type":"markdown","metadata":{},"source":["### Explore Baseline\n","\n","Now fetch the baseline results from the processing job. This cell will throw an exception if the processing job is not complete - if that happens, just wait several minutes and try again. "]},{"cell_type":"code","execution_count":null,"metadata":{"scrolled":true},"outputs":[],"source":["import sagemaker\n","from sagemaker.model_monitor import BaseliningJob, MonitoringExecution\n","from sagemaker.s3 import S3Downloader\n","\n","sagemaker_session = sagemaker.Session()\n","baseline_job = BaseliningJob.from_processing_name(sagemaker_session, processing_job_name)\n","status = baseline_job.describe()[\"ProcessingJobStatus\"]\n","if status != \"Completed\":\n"," raise (Exception(\"Please wait. Processing job not complete, status: {}\".format(status)))\n","\n","baseline_results_uri = baseline_job.outputs[0].destination"]},{"cell_type":"markdown","metadata":{},"source":["SageMaker model monitor generates two types of files. Take a look at the statistics file first. It calculates various statistics for each feature of the dataset, including the mean, standard deviation, minimum value, maximum value, and more. "]},{"cell_type":"code","execution_count":null,"metadata":{"scrolled":true},"outputs":[],"source":["import pandas as pd\n","import json\n","\n","baseline_statistics = baseline_job.baseline_statistics().body_dict\n","schema_df = pd.json_normalize(baseline_statistics[\"features\"])\n","schema_df[\n"," [\n"," \"name\",\n"," \"numerical_statistics.mean\",\n"," \"numerical_statistics.std_dev\",\n"," \"numerical_statistics.min\",\n"," \"numerical_statistics.max\",\n"," ]\n","].head()"]},{"cell_type":"markdown","metadata":{},"source":["Now look at the suggested [constraints files](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-byoc-constraints.html)⇗. As the name implies, these are constraints which SageMaker model monitor recommends. If the live data which is sent to your production SageMaker Endpoint violates these constraints, this indicates data drift, and model monitor can raise an alert to trigger retraining. Of course, you can set different constraints based on the statistics which you viewed previously."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["baseline_constraints = baseline_job.suggested_constraints().body_dict\n","constraints_df = pd.json_normalize(baseline_constraints[\"features\"])\n","constraints_df.head()"]},{"cell_type":"markdown","metadata":{},"source":["### View data capture\n","\n","When the \"Deploy Production\" stage of the MLOps pipeline deploys a SageMaker endpoint, it also enables data capture. This means the incoming requests to the endpoint, as well as the results from the ML model, are stored in an S3 location. Model monitor can analyze this data and compare it to the baseline to ensure that no constraints are violated. \n","\n","Use the code below to check how many files have been created by the data capture, and view the latest file in detail. Note, data capture relies on data being sent to the production endpoint. If you don't see any files yet, wait several minutes and try again."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["bucket = sagemaker_session.default_bucket()\n","data_capture_logs_uri = \"s3://{}/mlops-{}/datacapture/{}\".format(\n"," bucket, model_name, prd_endpoint_name\n",")\n","\n","capture_files = S3Downloader.list(data_capture_logs_uri)\n","print(\"Found {} files\".format(len(capture_files)))\n","\n","if capture_files:\n"," # Get the first line of the most recent file\n"," event = json.loads(S3Downloader.read_file(capture_files[-1]).split(\"\\n\")[0])\n"," print(\"\\nLast file:\\n{}\".format(json.dumps(event, indent=2)))"]},{"cell_type":"markdown","metadata":{},"source":["### View monitoring schedule\n","\n","There are some useful functions for plotting and rendering distribution statistics or constraint violations provided in a `utils` file in the [SageMaker Examples GitHub](https://github.com/aws/amazon-sagemaker-examples/tree/master/sagemaker_model_monitor/visualization)⇗. Grab a copy of this code to use in this notebook. "]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["!wget -O utils.py --quiet https://raw.githubusercontent.com/awslabs/amazon-sagemaker-examples/master/sagemaker_model_monitor/visualization/utils.py\n","import utils as mu"]},{"cell_type":"markdown","metadata":{},"source":["The [minimum scheduled run time](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-scheduling.html)⇗ for model monitor is one hour, which means you will need to wait at least an hour to see any results. Use the code below to check the schedule status and list the next run. If you are completing this notebook as part of a workshop, your host will have activities which you can complete while you wait. "]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["sm = boto3.client(\"sagemaker\")\n","\n","response = sm.describe_monitoring_schedule(MonitoringScheduleName=schedule_name)\n","print(\"Schedule Status: {}\".format(response[\"MonitoringScheduleStatus\"]))\n","\n","now = datetime.now(tzlocal())\n","next_hour = (now + timedelta(hours=1)).replace(minute=0)\n","scheduled_diff = (next_hour - now).seconds // 60\n","print(\"Next schedule in {} minutes\".format(scheduled_diff))"]},{"cell_type":"markdown","metadata":{},"source":["While you wait, you can take a look at the CloudFormation template which is used as a base for the CloudFormation template built by CodeDeploy to deploy the production application. \n","\n","Alterntively, you can jump ahead to [Trigger Retraining](#Trigger-Retraining) which will kick off another run of the code pipeline whilst you wait."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["!cat ../assets/deploy-model-prd.yml"]},{"cell_type":"markdown","metadata":{},"source":["A couple of minutes after the model monitoring schedule has run, you can use the code below to fetch the latest schedule status. A completed schedule run may have found violations. "]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["processing_job_arn = None\n","\n","while processing_job_arn is None:\n"," try:\n"," response = sm.list_monitoring_executions(MonitoringScheduleName=schedule_name)\n"," except ClientError as e:\n"," print(e.response[\"Error\"][\"Message\"])\n"," for mon in response[\"MonitoringExecutionSummaries\"]:\n"," status = mon[\"MonitoringExecutionStatus\"]\n"," now = datetime.now(tzlocal())\n"," created_diff = (now - mon[\"CreationTime\"]).seconds // 60\n"," print(\"Schedule status: {}, Created: {} minutes ago\".format(status, created_diff))\n"," if status in [\"Completed\", \"CompletedWithViolations\"]:\n"," processing_job_arn = mon[\"ProcessingJobArn\"]\n"," break\n"," if status == \"InProgress\":\n"," break\n"," else:\n"," raise (Exception(\"Please wait. No Schedules executing\"))\n"," time.sleep(10)"]},{"cell_type":"markdown","metadata":{},"source":["### View monitoring results\n","\n","Once the model monitoring schedule has had a chance to run at least once, you can take a look at the results. First, load the monitoring execution results from the latest scheduled run."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["if processing_job_arn:\n"," execution = MonitoringExecution.from_processing_arn(\n"," sagemaker_session=sagemaker.Session(), processing_job_arn=processing_job_arn\n"," )\n"," exec_inputs = {inp[\"InputName\"]: inp for inp in execution.describe()[\"ProcessingInputs\"]}\n"," exec_results_uri = execution.output.destination\n","\n"," print(\"Monitoring Execution results: {}\".format(exec_results_uri))"]},{"cell_type":"markdown","metadata":{},"source":["Take a look at the files which have been saved in the S3 output location. If violations were found, you should see a constraint violations file in addition to the statistics and constraints file which you viewed before."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["!aws s3 ls $exec_results_uri/"]},{"cell_type":"markdown","metadata":{},"source":["Now, fetch the monitoring statistics and violations. Then use the utils code to visualize the results in a table. It will highlight any baseline drift found by the model monitor. Drift can happen for categorical features (for inferred string styles) or for numerical features (e.g. total fare amount)."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["# Get the baseline and monitoring statistics & violations\n","baseline_statistics = baseline_job.baseline_statistics().body_dict\n","execution_statistics = execution.statistics().body_dict\n","violations = execution.constraint_violations().body_dict[\"violations\"]"]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["mu.show_violation_df(\n"," baseline_statistics=baseline_statistics,\n"," latest_statistics=execution_statistics,\n"," violations=violations,\n",")"]},{"cell_type":"markdown","metadata":{},"source":["### Trigger Retraining\n","\n","The CodePipeline instance is configured with [CloudWatch Events](https://docs.aws.amazon.com/codepipeline/latest/userguide/create-cloudtrail-S3-source.html)⇗ to start the pipeline for retraining when the drift detection triggers specific metric alarms.\n","\n","You can simulate drift by putting a metric value above the threshold of `0.2` directly into CloudWatch. This will trigger the alarm, and start the code pipeline.\n","\n","
\n"," Tip: This alarm is configured only for the latest production endpoint, so re-training will only occur if you are putting metrics against the latest endpoint.\n","
\n","\n","![Metric graph in CloudWatch](../docs/cloudwatch-alarm.png)\n","\n","Run the code below to trigger the metric alarm. The cell output will be a link to CloudWatch, where you can see the alarm (similar to the screenshot above), and a link to CodePipeline which you will see run again. Note that it can take a couple of minutes for everything to trigger."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["from datetime import datetime\n","import random\n","\n","cloudwatch = boto3.client(\"cloudwatch\")\n","\n","# Define the metric name and threshold\n","metric_name = \"feature_baseline_drift_total_amount\"\n","metric_threshold = 0.2\n","\n","# Put a new metric to trigger an alaram\n","def put_drift_metric(value):\n"," print(\"Putting metric: {}\".format(value))\n"," response = cloudwatch.put_metric_data(\n"," Namespace=\"aws/sagemaker/Endpoints/data-metrics\",\n"," MetricData=[\n"," {\n"," \"MetricName\": metric_name,\n"," \"Dimensions\": [\n"," {\"Name\": \"MonitoringSchedule\", \"Value\": schedule_name},\n"," {\"Name\": \"Endpoint\", \"Value\": prd_endpoint_name},\n"," ],\n"," \"Timestamp\": datetime.now(),\n"," \"Value\": value,\n"," \"Unit\": \"None\",\n"," },\n"," ],\n"," )\n","\n","\n","def get_drift_stats():\n"," response = cloudwatch.get_metric_statistics(\n"," Namespace=\"aws/sagemaker/Endpoints/data-metrics\",\n"," MetricName=metric_name,\n"," Dimensions=[\n"," {\"Name\": \"MonitoringSchedule\", \"Value\": schedule_name},\n"," {\"Name\": \"Endpoint\", \"Value\": prd_endpoint_name},\n"," ],\n"," StartTime=datetime.now() - timedelta(minutes=2),\n"," EndTime=datetime.now(),\n"," Period=1,\n"," Statistics=[\"Average\"],\n"," Unit=\"None\",\n"," )\n"," if \"Datapoints\" in response and len(response[\"Datapoints\"]) > 0:\n"," return response[\"Datapoints\"][0][\"Average\"]\n"," return 0\n","\n","\n","print(\"Simluate drift on endpoint: {}\".format(prd_endpoint_name))\n","\n","while True:\n"," put_drift_metric(round(random.uniform(metric_threshold, 1.0), 4))\n"," drift_stats = get_drift_stats()\n"," print(\"Average drift amount: {}\".format(get_drift_stats()))\n"," if drift_stats > metric_threshold:\n"," break\n"," time.sleep(1)"]},{"cell_type":"markdown","metadata":{},"source":["Click through to the Alarm and CodePipeline Execution history with the links below."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["# Output a html link to the cloudwatch dashboard\n","metric_alarm_name = \"mlops-{}-metric-gt-threshold\".format(model_name)\n","HTML(\n"," \"\"\"CloudWatch Alarm triggers\n"," Code Pipeline Execution\"\"\".format(\n"," region, metric_alarm_name, pipeline_name\n"," )\n",")"]},{"cell_type":"markdown","metadata":{},"source":["Once the pipeline is running again you can jump back up to [Inspect Training Job](#Inspect-Training-Job)"]},{"cell_type":"markdown","metadata":{},"source":["### Create a CloudWatch dashboard\n","\n","Finally, use the code below to create a CloudWatch dashboard to visualize the key performance metrics and alarms which you have created during this demo. The cell will output a link to the dashboard. This dashboard shows 9 charts in three rows, where the first row displays Lambda metrics, the second row displays SageMaker metrics, and the third row (shown in the screenshot below) displays the alarms set up for the pipeline.\n","\n","![Graphs in CloudWatch dashboard](../docs/cloudwatch-dashboard.png)"]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["from string import Template\n","\n","sts = boto3.client(\"sts\")\n","account_id = sts.get_caller_identity().get(\"Account\")\n","dashboard_name = \"mlops-{0}-{1}\".format(model_name, config[\"SageMakerProjectId\"])\n","\n","with open(\"dashboard.json\") as f:\n"," dashboard_body = Template(f.read()).substitute(\n"," region=region, account_id=account_id, model_name=model_name\n"," )\n"," response = cloudwatch.put_dashboard(DashboardName=dashboard_name, DashboardBody=dashboard_body)\n","\n","# Output a html link to the cloudwatch dashboard\n","HTML(\n"," 'CloudWatch Dashboard'.format(\n"," region, dashboard_name\n"," )\n",")"]},{"cell_type":"markdown","metadata":{},"source":["Congratulations! You have made it to the end of this notebook, and have automated a safe MLOps pipeline using a wide range of AWS services. \n","\n","You can use the other notebook in this repository [workflow.ipynb](workflow.ipynb) to implement your own ML model and deploy it as part of this pipeline. Or, if you are finished with the content, follow the instructions in the next section to clean up the resources you have deployed."]},{"cell_type":"markdown","metadata":{},"source":["## Cleanup\n","\n","Execute the following cell to delete the stacks created in the pipeline. For a model name of **nyctaxi** these would be:\n","\n","1. *nyctaxi*-deploy-prd\n","2. *nyctaxi*-deploy-dev\n","3. *nyctaxi*-workflow\n","4. sagemaker-custom-resource"]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["cfn = boto3.client(\"cloudformation\")\n","\n","# Delete the prod and then dev stack\n","for stack_name in [\n"," f\"{pipeline_name}-deploy-prd\",\n"," f\"{pipeline_name}-deploy-dev\",\n"," f\"{pipeline_name}-workflow\",\n"," f\"mlops-{model_name}-{config['SageMakerProjectId']}-sagemaker-custom-resource\",\n","]:\n"," print(\"Deleting stack: {}\".format(stack_name))\n"," cfn.delete_stack(StackName=stack_name)\n"," cfn.get_waiter(\"stack_delete_complete\").wait(StackName=stack_name)"]},{"cell_type":"markdown","metadata":{},"source":["The following code will delete the dashboard."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["cloudwatch.delete_dashboards(DashboardNames=[dashboard_name])\n","print(\"Dashboard deleted\")"]},{"cell_type":"markdown","metadata":{},"source":["The following code will clean up all objects in the artifact bucket and delete the SageMaker project."]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["s3_resource = boto3.resource('s3')\n","s3_artifact_bucket = s3_resource.Bucket(artifact_bucket)\n","s3_artifact_bucket.object_versions.delete()\n","print(\"Artifact bucket objects deleted\")\n","\n","sm.delete_project(\n"," ProjectName=PROJECT_NAME\n",")\n","print(\"SageMaker Project deleted\")"]},{"cell_type":"markdown","metadata":{},"source":["Finally, close this notebook."]}],"metadata":{"instance_type":"ml.t3.medium","kernelspec":{"display_name":"conda_python3","language":"python","name":"conda_python3"},"language_info":{"codemirror_mode":{"name":"ipython","version":3},"file_extension":".py","mimetype":"text/x-python","name":"python","nbconvert_exporter":"python","pygments_lexer":"ipython3","version":"3.7.6"}},"nbformat":4,"nbformat_minor":4} diff --git a/notebook/nyc-tlc.parquet b/notebook/nyc-tlc.parquet new file mode 100644 index 0000000..185b82e Binary files /dev/null and b/notebook/nyc-tlc.parquet differ diff --git a/pipeline.yml b/pipeline.yml index 0c530b6..d82f049 100644 --- a/pipeline.yml +++ b/pipeline.yml @@ -183,7 +183,7 @@ Resources: RepositoryDescription: !Sub SageMaker safe deployment pipeline for project ${SageMakerProjectName} with id ${SageMakerProjectId}, prefix ${ProjectPrefix} and model name ${ModelName} Code: S3: - Bucket: "S3_BUCKET_NAME" + Bucket: !Sub "${AWS::AccountId}-pipelineyaml-bucket" Key: "project.zip" BranchName: !Ref GitBranch @@ -966,4 +966,4 @@ Outputs: KMSKey: Value: !Ref KMSKey NotificationTopic: - Value: !Ref NotificationTopic + Value: !Ref NotificationTopic \ No newline at end of file diff --git a/pipeline_bucket.yml b/pipeline_bucket.yml new file mode 100644 index 0000000..0e019ae --- /dev/null +++ b/pipeline_bucket.yml @@ -0,0 +1,6 @@ +Resources: + S3Bucket: + Type: 'AWS::S3::Bucket' + DeletionPolicy: Retain + Properties: + BucketName: !Sub "${AWS::AccountId}-pipelineyaml-bucket"