diff --git a/doc/source/examples/argo_workflows_batch.nblink b/doc/source/examples/argo_workflows_batch.nblink new file mode 100644 index 0000000000..d8e5f4d000 --- /dev/null +++ b/doc/source/examples/argo_workflows_batch.nblink @@ -0,0 +1,3 @@ +{ + "path": "../../../examples/batch/argo-workflows-batch/README.ipynb" +} diff --git a/doc/source/examples/assets/kubeflow-pipeline.jpg b/doc/source/examples/assets/kubeflow-pipeline.jpg new file mode 100755 index 0000000000..f793407067 Binary files /dev/null and b/doc/source/examples/assets/kubeflow-pipeline.jpg differ diff --git a/doc/source/examples/assets/seldon-kubeflow-batch.gif b/doc/source/examples/assets/seldon-kubeflow-batch.gif new file mode 100644 index 0000000000..905d4dd3c5 Binary files /dev/null and b/doc/source/examples/assets/seldon-kubeflow-batch.gif differ diff --git a/doc/source/examples/kubeflow_pipelines_batch.nblink b/doc/source/examples/kubeflow_pipelines_batch.nblink new file mode 100644 index 0000000000..25d13a2fe4 --- /dev/null +++ b/doc/source/examples/kubeflow_pipelines_batch.nblink @@ -0,0 +1,3 @@ +{ + "path": "../../../examples/batch/kubeflow-pipelines-batch/README.ipynb" +} diff --git a/doc/source/examples/notebooks.rst b/doc/source/examples/notebooks.rst index 4b5ec66ee2..ba2cb6d22e 100644 --- a/doc/source/examples/notebooks.rst +++ b/doc/source/examples/notebooks.rst @@ -71,6 +71,15 @@ Advanced Machine Learning Insights Tabular, Text and Image Model Explainers Outlier Detection on CIFAR10 +Batch Processing with Seldon Core +----- + +.. toctree:: + :titlesonly: + + Batch Processing with Argo Workflows + Batch Processing with Kubeflow Pipelines + MLOps: Scaling and Monitoring and Observability ----- diff --git a/doc/source/images/batch-processor.jpg b/doc/source/images/batch-processor.jpg new file mode 100755 index 0000000000..c47987072d Binary files /dev/null and b/doc/source/images/batch-processor.jpg differ diff --git a/doc/source/images/batch-workflow-manager-integration.jpg b/doc/source/images/batch-workflow-manager-integration.jpg new file mode 100644 index 0000000000..1345b217cb Binary files /dev/null and b/doc/source/images/batch-workflow-manager-integration.jpg differ diff --git a/doc/source/images/batch-workflow-managers.jpg b/doc/source/images/batch-workflow-managers.jpg new file mode 100644 index 0000000000..8d7cfab730 Binary files /dev/null and b/doc/source/images/batch-workflow-managers.jpg differ diff --git a/doc/source/images/stream-processing-knative.jpg b/doc/source/images/stream-processing-knative.jpg new file mode 100755 index 0000000000..95381aa0be Binary files /dev/null and b/doc/source/images/stream-processing-knative.jpg differ diff --git a/doc/source/index.rst b/doc/source/index.rst index 2dbc38fd91..fec7e767d8 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -64,20 +64,38 @@ Documentation Index .. toctree:: :maxdepth: 1 - :caption: Language Wrappers (Production) + :caption: Production + + Supported API Protocols + CI/CD MLOps at Scale + Metrics with Prometheus + Payload Logging with ELK + Distributed Tracing with Jaeger + Replica Scaling + Custom Inference Servers + +.. toctree:: + :maxdepth: 1 + :caption: Batch Processing with Seldon + + Overview of Batch Processing + +.. toctree:: + :maxdepth: 1 + :caption: Language Wrappers - Python Language Wrapper [Production] + Python Language Wrapper .. toctree:: :maxdepth: 1 :caption: Incubating Projects - Java Language Wrapper [Incubating] + Java Language Wrapper + Metadata R Language Wrapper [ALPHA] NodeJS Language Wrapper [ALPHA] Go Language Wrapper [ALPHA] Stream Processing with KNative - Metadata [Incubating] .. toctree:: :maxdepth: 1 @@ -86,18 +104,6 @@ Documentation Index Ambassador Ingress Istio Ingress -.. toctree:: - :maxdepth: 1 - :caption: Production - - Supported API Protocols - CI/CD MLOps at Scale - Metrics with Prometheus - Payload Logging with ELK - Distributed Tracing with Jaeger - Replica Scaling - Custom Inference Servers - .. toctree:: :maxdepth: 1 :caption: Advanced Inference diff --git a/doc/source/python/api/seldon_core.rst b/doc/source/python/api/seldon_core.rst index dabc42b612..19e02a0e19 100644 --- a/doc/source/python/api/seldon_core.rst +++ b/doc/source/python/api/seldon_core.rst @@ -24,6 +24,14 @@ seldon\_core.api\_tester module :undoc-members: :show-inheritance: +seldon\_core.batch\_processor module +------------------------------------ + +.. automodule:: seldon_core.batch_processor + :members: + :undoc-members: + :show-inheritance: + seldon\_core.flask\_utils module -------------------------------- diff --git a/doc/source/servers/batch.md b/doc/source/servers/batch.md new file mode 100644 index 0000000000..b6bc821cac --- /dev/null +++ b/doc/source/servers/batch.md @@ -0,0 +1,149 @@ +# Batch Processing with Seldon Core + +Seldon Core provides a command line component that allows for highly parallelizable batch processing with the horizontally scalable seldon core kubernetes model deployments. + +For stream processing with Seldon Core please see [Stream Processing with KNative Eventing](../streaming/knative_eventing.md). + +![](../images/batch-processor.jpg) + +## Horizontally Scalable Workers and Replicas + +The parallelizable batch processor worker allows for high throughput as it is able to leverage the Seldon Core horizontal scaling replicas as well as autoscaling, and hence providing flexibility to the user to optimize their configuration as required. + +The diagram below shows a standard workflow where data can be downloaded and then uploaded through an object store, and the Seldon model can be created and deleted when the job finishes successfully. + +![](../images/batch-workflow-manager-integration.jpg) + +## Integration with ETL & Workflow Managers + +The Seldon Batch component has been built to be modular and flexible such that it can be integrated across any workflow managers. + +This allows you to leverage Seldon on a large number of batch applications, including triggers that have to take place on a scheduled basis (e.g. once a day, once a month, etc), or jobs that can be triggered programmatically. + +![](../images/batch-workflow-managers.jpg) + +## Hands on Examples + +We have provided a set of examples that show you how you can use the Seldon batch processing component: + +* [Batch Processing with Argo Workflows](../examples/argo_workflows_batch.html) +* [Batch Processing with Kubeflow Pipelines Example](../examples/kubeflow_pipelines_batch.html) + +## High Level Implementation Details + +### CLI Parameters + +To get more insights on each of the commands available you can interact with the batch processor component as follows: + +```bash +$ seldon-batch-processor --help + +Usage: seldon-batch-processor [OPTIONS] + + Command line interface for Seldon Batch Processor, which can be used to + send requests through configurable parallel workers to Seldon Core models. + It is recommended that the respective Seldon Core model is also optimized + with number of replicas to distribute and scale out the batch processing + work. The processor is able to process data from local filestore input + file in various formats supported by the SeldonClient module. It is also + suggested to use the batch processor component integrated with an ETL + Workflow Manager such as Kubeflow, Argo Pipelines, Airflow, etc. which + would allow for extra setup / teardown steps such as downloading the data + from object store or starting a seldon core model with replicas. See the + Seldon Core examples folder for implementations of this batch module with + Seldon Core. + +Options: + -d, --deployment-name TEXT The name of the SeldonDeployment to send the + requests to [required] + + -g, --gateway-type [ambassador|istio|seldon] + The gateway type for the seldon model, which + can be through the ingress provider + (istio/ambassador) or directly through the + service (seldon) + + -n, --namespace TEXT The Kubernetes namespace where the + SeldonDeployment is deployed in + + -h, --host TEXT The hostname for the seldon model to send + the request to, which can be the ingress of + the Seldon model or the service itself + + -t, --transport [rest|grpc] The transport type of the SeldonDeployment + model which can be REST or GRPC + + -a, --data-type [data|json|str] + Whether to use json, strData or Seldon Data + type for the payload to send to the + SeldonDeployment which aligns with the + SeldonClient format + + -p, --payload-type [ndarray|tensor|tftensor] + The payload type expected by the + SeldonDeployment and hence the expected + format for the data in the input file which + can be an array + + -w, --workers INTEGER The number of parallel request processor + workers to run for parallel processing + + -r, --retries INTEGER The number of retries for each request + before marking an error + + -i, --input-data-path PATH The local filestore path where the input + file with the data to process is located + + -o, --output-data-path PATH The local filestore path where the output + file should be written with the outputs of + the batch processing + + -m, --method [predict] The method of the SeldonDeployment to send + the request to which currently only supports + the predict method + + -l, --log-level [debug|info|warning|error] + The log level for the batch processor + -b, --benchmark If true the batch processor will print the + elapsed time taken to run the process + + -u, --batch-id TEXT Unique batch ID to identify all datapoints + processed in this batch, if not provided is + auto generated + + --help Show this message and exit. + +``` + +### Identifiers + +Each data point that is sent to the Seldon Core model contains the following identifiers in the request metadata: +* Batch ID - A unique identifier which can be provided through CLI or is automatically generated +* Batch Instance ID - A generated unique identifier for each datapoint processed +* Batch Index - The local ordered descending index for the datapoint relative to the input file location + +These identifiers are added on each request as follows: + +``` +seldon_request = { + : , + "meta": { + "tags": { + "batch_id": + "batch_instance_id": + "batch_index": + } + } + } +``` + +This allows the requests to be identified and matched against the initial request in the data. + +### Performance + +The implementation of the module is done leveraging Python's Threading system. + +Benchmarking was carried out using vanilla Python requests module to assess performance of Threading vs Twisted vs AsyncIO. The results showed better performance with Asyncio, however given that the logic in the worker is quite minimal (ie sending a request) and most of the time is waiting for the response, the implementation with Python's native threading was able to perform at speeds that were efficient enough to very easily scale to thousands of workers. + +However currently the implementation uses the Seldon Client which does not leverage quite a few optimization requirements to increase the performance of processing, such as re-using a requests.py session. However even without these optimisations the worker will still reach a highly concurrent performance, and these optimizations will be introduced as adoption of this component (and feedback) grows. + diff --git a/doc/source/streaming/knative_eventing.md b/doc/source/streaming/knative_eventing.md index 8283d1298b..29945c9f57 100644 --- a/doc/source/streaming/knative_eventing.md +++ b/doc/source/streaming/knative_eventing.md @@ -4,6 +4,8 @@ Seldon has an integration with KNative eventing that allows for real time proces This allow Seldon Core users to connect SeldonDeployments through triggers that will receive any relevant Cloudevents. +![](../images/stream-processing-knative.jpg) + ## Triggers The way that KNative Eventing works is by creating triggers that send any relevant Cloudevents that match a specific setup into the relevant addressable location. diff --git a/examples/batch/argo-workflows-batch/.gitignore b/examples/batch/argo-workflows-batch/.gitignore new file mode 100644 index 0000000000..ea76a1cd33 --- /dev/null +++ b/examples/batch/argo-workflows-batch/.gitignore @@ -0,0 +1,2 @@ +assets/input-data.txt +assets/output-data.txt diff --git a/examples/batch/argo-workflows-batch/README.ipynb b/examples/batch/argo-workflows-batch/README.ipynb new file mode 100644 index 0000000000..954415729b --- /dev/null +++ b/examples/batch/argo-workflows-batch/README.ipynb @@ -0,0 +1,421 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Batch processing with Argo Worfklows\n", + "\n", + "In this notebook we will dive into how you can run batch processing with Argo Workflows and Seldon Core.\n", + "\n", + "Dependencies:\n", + "\n", + "* Seldon core installed as per the docs with an ingress\n", + "* Minio running in your cluster to use as local (s3) object storage\n", + "* Argo Workfklows installed in cluster (and argo CLI for commands)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Setup\n", + "\n", + "### Install Seldon Core\n", + "Use the notebook to [set-up Seldon Core with Ambassador or Istio Ingress](https://docs.seldon.io/projects/seldon-core/en/latest/examples/seldon_core_setup.html).\n", + "\n", + "Note: If running with KIND you need to make sure do follow [these steps](https://github.com/argoproj/argo/issues/2376#issuecomment-595593237) as workaround to the `/.../docker.sock` known issue.\n", + "\n", + "### Set up Minio in your cluster\n", + "Use the notebook to [set-up Minio in your cluster](https://docs.seldon.io/projects/seldon-core/en/latest/examples/minio_setup.html).\n", + "\n", + "### Copy the Minio Secret to namespace\n", + "\n", + "We need to re-use the minio secret for the batch job, so this can be done by just copying the minio secret created in the `minio-system`\n", + "\n", + "The command below just copies the secred with the name \"minio\" from the minio-system namespace to the default namespace." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!kubectl get secret minio -n minio-system -o json | jq '{apiVersion,data,kind,metadata,type} | .metadata |= {\"annotations\", \"name\"}' | kubectl apply -n default -f -" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Install Argo Workflows\n", + "You can follow the instructions from the official [Argo Workflows Documentation](https://github.com/argoproj/argo/blob/master/docs/getting-started.md#1-download-the-argo-cli).\n", + "\n", + "You also need to make sure that argo has permissions to create seldon deployments - for this you can just create a default-admin rolebinding as follows:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!kubectl create rolebinding default-admin --clusterrole=admin --serviceaccount=default:default" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create some input for our model\n", + "\n", + "We will create a file that will contain the inputs that will be sent to our model" + ] + }, + { + "cell_type": "code", + "execution_count": 38, + "metadata": {}, + "outputs": [], + "source": [ + "mkdir -p assets/" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "with open(\"assets/input-data.txt\", \"w\") as f:\n", + " for i in range(10000):\n", + " f.write('[[1, 2, 3, 4]]\\n')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Check the contents of the file" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "10000 assets/input-data.txt\n", + "[[1, 2, 3, 4]]\n", + "[[1, 2, 3, 4]]\n", + "[[1, 2, 3, 4]]\n", + "[[1, 2, 3, 4]]\n", + "[[1, 2, 3, 4]]\n", + "[[1, 2, 3, 4]]\n", + "[[1, 2, 3, 4]]\n", + "[[1, 2, 3, 4]]\n", + "[[1, 2, 3, 4]]\n", + "[[1, 2, 3, 4]]\n" + ] + } + ], + "source": [ + "!wc -l assets/input-data.txt\n", + "!head assets/input-data.txt" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Upload the file to our minio" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u001b[m\u001b[32;1mBucket created successfully `minio-seldon/data`.\u001b[0m\n", + "...-data.txt: 146.48 KiB / 146.48 KiB ┃▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓┃ 2.03 MiB/s 0s\u001b[0m\u001b[0m\u001b[m\u001b[32;1m" + ] + } + ], + "source": [ + "!mc mb minio-seldon/data\n", + "!mc cp assets/input-data.txt minio-seldon/data/" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create Argo Workflow\n", + "\n", + "In order to create our argo workflow we have made it simple so you can leverage the power of the helm charts.\n", + "\n", + "Before we dive into the contents of the full helm chart, let's first give it a try with some of the settings.\n", + "\n", + "We will run a batch job that will set up a Seldon Deployment with 10 replicas and 100 batch client workers to send requests." + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "metadata": { + "scrolled": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Name: seldon-batch-process\r\n", + "Namespace: default\r\n", + "ServiceAccount: default\r\n", + "Status: Pending\r\n", + "Created: Mon Jun 08 11:57:56 +0100 (1 second ago)\r\n" + ] + } + ], + "source": [ + "!helm template seldon-batch-workflow helm-charts/seldon-batch-workflow/ \\\n", + " --set workflow.name=seldon-batch-process \\\n", + " --set seldonDeployment.name=sklearn \\\n", + " --set seldonDeployment.replicas=10 \\\n", + " --set batchWorker.workers=100 \\\n", + " --set batchWorker.payloadType=ndarray \\\n", + " --set batchWorker.dataType=data \\\n", + " | argo submit -" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "NAME STATUS AGE DURATION PRIORITY\r\n", + "seldon-batch-process Running 3s 3s 0\r\n" + ] + } + ], + "source": [ + "!argo list" + ] + }, + { + "cell_type": "code", + "execution_count": 30, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Name: seldon-batch-process\r\n", + "Namespace: default\r\n", + "ServiceAccount: default\r\n", + "Status: Succeeded\r\n", + "Created: Mon Jun 08 11:57:56 +0100 (1 minute ago)\r\n", + "Started: Mon Jun 08 11:57:57 +0100 (1 minute ago)\r\n", + "Finished: Mon Jun 08 11:59:33 +0100 (10 seconds ago)\r\n", + "Duration: 1 minute 36 seconds\r\n", + "\r\n", + "\u001b[39mSTEP\u001b[0m PODNAME DURATION MESSAGE\r\n", + " \u001b[32m✔\u001b[0m seldon-batch-process (seldon-batch-process) \r\n", + " ├---\u001b[32m✔\u001b[0m create-seldon-resource (create-seldon-resource-template) seldon-batch-process-3626514072 1s \r\n", + " ├---\u001b[32m✔\u001b[0m wait-seldon-resource (wait-seldon-resource-template) seldon-batch-process-2052519094 32s \r\n", + " ├---\u001b[32m✔\u001b[0m download-object-store (download-object-store-template) seldon-batch-process-1257652469 3s \r\n", + " ├---\u001b[32m✔\u001b[0m process-batch-inputs (process-batch-inputs-template) seldon-batch-process-2033515954 50s \r\n", + " └---\u001b[32m✔\u001b[0m upload-object-store (upload-object-store-template) seldon-batch-process-2123074048 3s \r\n" + ] + } + ], + "source": [ + "!argo get seldon-batch-process" + ] + }, + { + "cell_type": "code", + "execution_count": 31, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u001b[35mcreate-seldon-resource\u001b[0m:\ttime=\"2020-06-08T10:57:58Z\" level=info msg=\"Starting Workflow Executor\" version=v2.8.0-rc4+8f69617.dirty\r\n", + "\u001b[35mcreate-seldon-resource\u001b[0m:\ttime=\"2020-06-08T10:57:58Z\" level=info msg=\"Creating a docker executor\"\r\n", + "\u001b[35mcreate-seldon-resource\u001b[0m:\ttime=\"2020-06-08T10:57:58Z\" level=info msg=\"Executor (version: v2.8.0-rc4+8f69617.dirty, build_date: 2020-05-12T15:17:15Z) initialized (pod: default/seldon-batch-process-3626514072) with template:\\n{\\\"name\\\":\\\"create-seldon-resource-template\\\",\\\"arguments\\\":{},\\\"inputs\\\":{},\\\"outputs\\\":{},\\\"metadata\\\":{},\\\"resource\\\":{\\\"action\\\":\\\"create\\\",\\\"manifest\\\":\\\"apiVersion: machinelearning.seldon.io/v1\\\\nkind: SeldonDeployment\\\\nmetadata:\\\\n name: \\\\\\\"sklearn\\\\\\\"\\\\n namespace: default\\\\n ownerReferences:\\\\n - apiVersion: argoproj.io/v1alpha1\\\\n blockOwnerDeletion: true\\\\n kind: Workflow\\\\n name: \\\\\\\"seldon-batch-process\\\\\\\"\\\\n uid: \\\\\\\"15014702-d5a7-4dc6-a219-373b7013b744\\\\\\\"\\\\nspec:\\\\n name: \\\\\\\"sklearn\\\\\\\"\\\\n predictors:\\\\n - graph:\\\\n children: []\\\\n implementation: SKLEARN_SERVER\\\\n modelUri: gs://seldon-models/sklearn/iris\\\\n name: classifier\\\\n name: default\\\\n replicas: 10\\\\n \\\\n\\\"}}\"\r\n", + "\u001b[35mcreate-seldon-resource\u001b[0m:\ttime=\"2020-06-08T10:57:58Z\" level=info msg=\"Loading manifest to /tmp/manifest.yaml\"\r\n", + "\u001b[35mcreate-seldon-resource\u001b[0m:\ttime=\"2020-06-08T10:57:58Z\" level=info msg=\"kubectl create -f /tmp/manifest.yaml -o json\"\r\n", + "\u001b[35mcreate-seldon-resource\u001b[0m:\ttime=\"2020-06-08T10:57:58Z\" level=info msg=default/SeldonDeployment.machinelearning.seldon.io/sklearn\r\n", + "\u001b[35mcreate-seldon-resource\u001b[0m:\ttime=\"2020-06-08T10:57:58Z\" level=info msg=\"No output parameters\"\r\n", + "\u001b[32mwait-seldon-resource\u001b[0m:\tWaiting for deployment \"sklearn-default-0-classifier\" rollout to finish: 0 of 10 updated replicas are available...\r\n", + "\u001b[32mwait-seldon-resource\u001b[0m:\tWaiting for deployment \"sklearn-default-0-classifier\" rollout to finish: 1 of 10 updated replicas are available...\r\n", + "\u001b[32mwait-seldon-resource\u001b[0m:\tWaiting for deployment \"sklearn-default-0-classifier\" rollout to finish: 2 of 10 updated replicas are available...\r\n", + "\u001b[32mwait-seldon-resource\u001b[0m:\tWaiting for deployment \"sklearn-default-0-classifier\" rollout to finish: 3 of 10 updated replicas are available...\r\n", + "\u001b[32mwait-seldon-resource\u001b[0m:\tWaiting for deployment \"sklearn-default-0-classifier\" rollout to finish: 4 of 10 updated replicas are available...\r\n", + "\u001b[32mwait-seldon-resource\u001b[0m:\tWaiting for deployment \"sklearn-default-0-classifier\" rollout to finish: 5 of 10 updated replicas are available...\r\n", + "\u001b[32mwait-seldon-resource\u001b[0m:\tWaiting for deployment \"sklearn-default-0-classifier\" rollout to finish: 6 of 10 updated replicas are available...\r\n", + "\u001b[32mwait-seldon-resource\u001b[0m:\tWaiting for deployment \"sklearn-default-0-classifier\" rollout to finish: 7 of 10 updated replicas are available...\r\n", + "\u001b[32mwait-seldon-resource\u001b[0m:\tWaiting for deployment \"sklearn-default-0-classifier\" rollout to finish: 8 of 10 updated replicas are available...\r\n", + "\u001b[32mwait-seldon-resource\u001b[0m:\tWaiting for deployment \"sklearn-default-0-classifier\" rollout to finish: 9 of 10 updated replicas are available...\r\n", + "\u001b[32mwait-seldon-resource\u001b[0m:\tdeployment \"sklearn-default-0-classifier\" successfully rolled out\r\n", + "\u001b[34mdownload-object-store\u001b[0m:\tAdded `minio-local` successfully.\r\n", + "\u001b[34mdownload-object-store\u001b[0m:\t`minio-local/data/input-data.txt` -> `/assets/input-data.txt`\r\n", + "\u001b[34mdownload-object-store\u001b[0m:\tTotal: 0 B, Transferred: 146.48 KiB, Speed: 9.99 MiB/s\r\n", + "\u001b[39mprocess-batch-inputs\u001b[0m:\tElapsed time: 47.03067970275879\r\n", + "\u001b[31mupload-object-store\u001b[0m:\tAdded `minio-local` successfully.\r\n", + "\u001b[31mupload-object-store\u001b[0m:\t`/assets/output-data.txt` -> `minio-local/data/output-data-15014702-d5a7-4dc6-a219-373b7013b744.txt`\r\n", + "\u001b[31mupload-object-store\u001b[0m:\tTotal: 0 B, Transferred: 2.75 MiB, Speed: 81.57 MiB/s\r\n" + ] + } + ], + "source": [ + "!argo logs -w seldon-batch-process || argo logs seldon-batch-process # The 2nd command is for argo 2.8+" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Check output in object store\n", + "\n", + "We can now visualise the output that we obtained in the object store.\n", + "\n", + "First we can check that the file is present:" + ] + }, + { + "cell_type": "code", + "execution_count": 32, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Workflow ID is 15014702-d5a7-4dc6-a219-373b7013b744\n" + ] + } + ], + "source": [ + "import json\n", + "wf_arr = !argo get seldon-batch-process -o json\n", + "wf = json.loads(\"\".join(wf_arr))\n", + "WF_ID = wf[\"metadata\"][\"uid\"]\n", + "print(f\"Workflow ID is {WF_ID}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u001b[m\u001b[32m[2020-06-08 11:59:31 BST] \u001b[0m\u001b[33m 2.7MiB \u001b[0m\u001b[1moutput-data-15014702-d5a7-4dc6-a219-373b7013b744.txt\u001b[0m\r\n", + "\u001b[0m" + ] + } + ], + "source": [ + "!mc ls minio-seldon/data/output-data-\"$WF_ID\".txt" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we can output the contents of the file created using the `mc head` command." + ] + }, + { + "cell_type": "code", + "execution_count": 36, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "...3b744.txt: 2.75 MiB / 2.75 MiB ┃▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓┃ 24.68 MiB/s 0s\u001b[0m\u001b[0m\u001b[m\u001b[32;1m{\"data\": {\"names\": [\"t:0\", \"t:1\", \"t:2\"], \"ndarray\": [[0.0006985194531162841, 0.003668039039435755, 0.9956334415074478]]}, \"meta\": {\"tags\": {\"tags\": {\"batch_id\": \"03403156-a977-11ea-88f0-ea5c4746b555\", \"batch_index\": 2.0, \"batch_instance_id\": \"0340bbe4-a977-11ea-9ca4-ea5c4746b555\"}}}}\n", + "{\"data\": {\"names\": [\"t:0\", \"t:1\", \"t:2\"], \"ndarray\": [[0.0006985194531162841, 0.003668039039435755, 0.9956334415074478]]}, \"meta\": {\"tags\": {\"tags\": {\"batch_id\": \"03403156-a977-11ea-88f0-ea5c4746b555\", \"batch_index\": 6.0, \"batch_instance_id\": \"03411da0-a977-11ea-9ca4-ea5c4746b555\"}}}}\n", + "{\"data\": {\"names\": [\"t:0\", \"t:1\", \"t:2\"], \"ndarray\": [[0.0006985194531162841, 0.003668039039435755, 0.9956334415074478]]}, \"meta\": {\"tags\": {\"tags\": {\"batch_id\": \"03403156-a977-11ea-88f0-ea5c4746b555\", \"batch_index\": 9.0, \"batch_instance_id\": \"03415234-a977-11ea-9ca4-ea5c4746b555\"}}}}\n", + "{\"data\": {\"names\": [\"t:0\", \"t:1\", \"t:2\"], \"ndarray\": [[0.0006985194531162841, 0.003668039039435755, 0.9956334415074478]]}, \"meta\": {\"tags\": {\"tags\": {\"batch_id\": \"03403156-a977-11ea-88f0-ea5c4746b555\", \"batch_index\": 3.0, \"batch_instance_id\": \"03410f72-a977-11ea-9ca4-ea5c4746b555\"}}}}\n", + "{\"data\": {\"names\": [\"t:0\", \"t:1\", \"t:2\"], \"ndarray\": [[0.0006985194531162841, 0.003668039039435755, 0.9956334415074478]]}, \"meta\": {\"tags\": {\"tags\": {\"batch_id\": \"03403156-a977-11ea-88f0-ea5c4746b555\", \"batch_index\": 1.0, \"batch_instance_id\": \"0340b8ce-a977-11ea-9ca4-ea5c4746b555\"}}}}\n", + "{\"data\": {\"names\": [\"t:0\", \"t:1\", \"t:2\"], \"ndarray\": [[0.0006985194531162841, 0.003668039039435755, 0.9956334415074478]]}, \"meta\": {\"tags\": {\"tags\": {\"batch_id\": \"03403156-a977-11ea-88f0-ea5c4746b555\", \"batch_index\": 0.0, \"batch_instance_id\": \"0340b432-a977-11ea-9ca4-ea5c4746b555\"}}}}\n", + "{\"data\": {\"names\": [\"t:0\", \"t:1\", \"t:2\"], \"ndarray\": [[0.0006985194531162841, 0.003668039039435755, 0.9956334415074478]]}, \"meta\": {\"tags\": {\"tags\": {\"batch_id\": \"03403156-a977-11ea-88f0-ea5c4746b555\", \"batch_index\": 8.0, \"batch_instance_id\": \"034123b8-a977-11ea-9ca4-ea5c4746b555\"}}}}\n", + "{\"data\": {\"names\": [\"t:0\", \"t:1\", \"t:2\"], \"ndarray\": [[0.0006985194531162841, 0.003668039039435755, 0.9956334415074478]]}, \"meta\": {\"tags\": {\"tags\": {\"batch_id\": \"03403156-a977-11ea-88f0-ea5c4746b555\", \"batch_index\": 7.0, \"batch_instance_id\": \"0341208e-a977-11ea-9ca4-ea5c4746b555\"}}}}\n", + "{\"data\": {\"names\": [\"t:0\", \"t:1\", \"t:2\"], \"ndarray\": [[0.0006985194531162841, 0.003668039039435755, 0.9956334415074478]]}, \"meta\": {\"tags\": {\"tags\": {\"batch_id\": \"03403156-a977-11ea-88f0-ea5c4746b555\", \"batch_index\": 5.0, \"batch_instance_id\": \"03411a9e-a977-11ea-9ca4-ea5c4746b555\"}}}}\n", + "{\"data\": {\"names\": [\"t:0\", \"t:1\", \"t:2\"], \"ndarray\": [[0.0006985194531162841, 0.003668039039435755, 0.9956334415074478]]}, \"meta\": {\"tags\": {\"tags\": {\"batch_id\": \"03403156-a977-11ea-88f0-ea5c4746b555\", \"batch_index\": 4.0, \"batch_instance_id\": \"03411288-a977-11ea-9ca4-ea5c4746b555\"}}}}\n" + ] + } + ], + "source": [ + "!mc cp minio-seldon/data/output-data-\"$WF_ID\".txt assets/output-data.txt\n", + "!head assets/output-data.txt" + ] + }, + { + "cell_type": "code", + "execution_count": 37, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Workflow 'seldon-batch-process' deleted\r\n" + ] + } + ], + "source": [ + "!argo delete seldon-batch-process" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/examples/batch/argo-workflows-batch/README.md b/examples/batch/argo-workflows-batch/README.md new file mode 100644 index 0000000000..b0f661d061 --- /dev/null +++ b/examples/batch/argo-workflows-batch/README.md @@ -0,0 +1,239 @@ +## Batch processing with Argo Worfklows + +In this notebook we will dive into how you can run batch processing with Argo Workflows and Seldon Core. + +Dependencies: + +* Seldon core installed as per the docs with an ingress +* Minio running in your cluster to use as local (s3) object storage +* Argo Workfklows installed in cluster (and argo CLI for commands) + + +## Setup + +### Install Seldon Core +Use the notebook to [set-up Seldon Core with Ambassador or Istio Ingress](https://docs.seldon.io/projects/seldon-core/en/latest/examples/seldon_core_setup.html). + +Note: If running with KIND you need to make sure do follow [these steps](https://github.com/argoproj/argo/issues/2376#issuecomment-595593237) as workaround to the `/.../docker.sock` known issue. + +### Set up Minio in your cluster +Use the notebook to [set-up Minio in your cluster](https://docs.seldon.io/projects/seldon-core/en/latest/examples/minio_setup.html). + +### Copy the Minio Secret to namespace + +We need to re-use the minio secret for the batch job, so this can be done by just copying the minio secret created in the `minio-system` + +The command below just copies the secred with the name "minio" from the minio-system namespace to the default namespace. + + +```python +!kubectl get secret minio -n minio-system -o json | jq '{apiVersion,data,kind,metadata,type} | .metadata |= {"annotations", "name"}' | kubectl apply -n default -f - +``` + +### Install Argo Workflows +You can follow the instructions from the official [Argo Workflows Documentation](https://github.com/argoproj/argo/blob/master/docs/getting-started.md#1-download-the-argo-cli). + +You also need to make sure that argo has permissions to create seldon deployments - for this you can just create a default-admin rolebinding as follows: + + +```python +!kubectl create rolebinding default-admin --clusterrole=admin --serviceaccount=default:default +``` + +## Create some input for our model + +We will create a file that will contain the inputs that will be sent to our model + + +```python +mkdir -p assets/ +``` + + +```python +with open("assets/input-data.txt", "w") as f: + for i in range(10000): + f.write('[[1, 2, 3, 4]]\n') +``` + +### Check the contents of the file + + +```python +!wc -l assets/input-data.txt +!head assets/input-data.txt +``` + + 10000 assets/input-data.txt + [[1, 2, 3, 4]] + [[1, 2, 3, 4]] + [[1, 2, 3, 4]] + [[1, 2, 3, 4]] + [[1, 2, 3, 4]] + [[1, 2, 3, 4]] + [[1, 2, 3, 4]] + [[1, 2, 3, 4]] + [[1, 2, 3, 4]] + [[1, 2, 3, 4]] + + +### Upload the file to our minio + + +```python +!mc mb minio-seldon/data +!mc cp assets/input-data.txt minio-seldon/data/ +``` + + Bucket created successfully `minio-seldon/data`. + ...-data.txt: 146.48 KiB / 146.48 KiB ┃▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓┃ 2.03 MiB/s 0s + +### Create Argo Workflow + +In order to create our argo workflow we have made it simple so you can leverage the power of the helm charts. + +Before we dive into the contents of the full helm chart, let's first give it a try with some of the settings. + +We will run a batch job that will set up a Seldon Deployment with 10 replicas and 100 batch client workers to send requests. + + +```python +!helm template seldon-batch-workflow helm-charts/seldon-batch-workflow/ \ + --set workflow.name=seldon-batch-process \ + --set seldonDeployment.name=sklearn \ + --set seldonDeployment.replicas=10 \ + --set batchWorker.workers=100 \ + --set batchWorker.payloadType=ndarray \ + --set batchWorker.dataType=data \ + | argo submit - +``` + + Name: seldon-batch-process + Namespace: default + ServiceAccount: default + Status: Pending + Created: Mon Jun 08 11:57:56 +0100 (1 second ago) + + + +```python +!argo list +``` + + NAME STATUS AGE DURATION PRIORITY + seldon-batch-process Running 3s 3s 0 + + + +```python +!argo get seldon-batch-process +``` + + Name: seldon-batch-process + Namespace: default + ServiceAccount: default + Status: Succeeded + Created: Mon Jun 08 11:57:56 +0100 (1 minute ago) + Started: Mon Jun 08 11:57:57 +0100 (1 minute ago) + Finished: Mon Jun 08 11:59:33 +0100 (10 seconds ago) + Duration: 1 minute 36 seconds + + STEP PODNAME DURATION MESSAGE + ✔ seldon-batch-process (seldon-batch-process) + ├---✔ create-seldon-resource (create-seldon-resource-template) seldon-batch-process-3626514072 1s + ├---✔ wait-seldon-resource (wait-seldon-resource-template) seldon-batch-process-2052519094 32s + ├---✔ download-object-store (download-object-store-template) seldon-batch-process-1257652469 3s + ├---✔ process-batch-inputs (process-batch-inputs-template) seldon-batch-process-2033515954 50s + └---✔ upload-object-store (upload-object-store-template) seldon-batch-process-2123074048 3s + + + +```python +!argo logs -w seldon-batch-process || argo logs seldon-batch-process # The 2nd command is for argo 2.8+ +``` + + create-seldon-resource: time="2020-06-08T10:57:58Z" level=info msg="Starting Workflow Executor" version=v2.8.0-rc4+8f69617.dirty + create-seldon-resource: time="2020-06-08T10:57:58Z" level=info msg="Creating a docker executor" + create-seldon-resource: time="2020-06-08T10:57:58Z" level=info msg="Executor (version: v2.8.0-rc4+8f69617.dirty, build_date: 2020-05-12T15:17:15Z) initialized (pod: default/seldon-batch-process-3626514072) with template:\n{\"name\":\"create-seldon-resource-template\",\"arguments\":{},\"inputs\":{},\"outputs\":{},\"metadata\":{},\"resource\":{\"action\":\"create\",\"manifest\":\"apiVersion: machinelearning.seldon.io/v1\\nkind: SeldonDeployment\\nmetadata:\\n name: \\\"sklearn\\\"\\n namespace: default\\n ownerReferences:\\n - apiVersion: argoproj.io/v1alpha1\\n blockOwnerDeletion: true\\n kind: Workflow\\n name: \\\"seldon-batch-process\\\"\\n uid: \\\"15014702-d5a7-4dc6-a219-373b7013b744\\\"\\nspec:\\n name: \\\"sklearn\\\"\\n predictors:\\n - graph:\\n children: []\\n implementation: SKLEARN_SERVER\\n modelUri: gs://seldon-models/sklearn/iris\\n name: classifier\\n name: default\\n replicas: 10\\n \\n\"}}" + create-seldon-resource: time="2020-06-08T10:57:58Z" level=info msg="Loading manifest to /tmp/manifest.yaml" + create-seldon-resource: time="2020-06-08T10:57:58Z" level=info msg="kubectl create -f /tmp/manifest.yaml -o json" + create-seldon-resource: time="2020-06-08T10:57:58Z" level=info msg=default/SeldonDeployment.machinelearning.seldon.io/sklearn + create-seldon-resource: time="2020-06-08T10:57:58Z" level=info msg="No output parameters" + wait-seldon-resource: Waiting for deployment "sklearn-default-0-classifier" rollout to finish: 0 of 10 updated replicas are available... + wait-seldon-resource: Waiting for deployment "sklearn-default-0-classifier" rollout to finish: 1 of 10 updated replicas are available... + wait-seldon-resource: Waiting for deployment "sklearn-default-0-classifier" rollout to finish: 2 of 10 updated replicas are available... + wait-seldon-resource: Waiting for deployment "sklearn-default-0-classifier" rollout to finish: 3 of 10 updated replicas are available... + wait-seldon-resource: Waiting for deployment "sklearn-default-0-classifier" rollout to finish: 4 of 10 updated replicas are available... + wait-seldon-resource: Waiting for deployment "sklearn-default-0-classifier" rollout to finish: 5 of 10 updated replicas are available... + wait-seldon-resource: Waiting for deployment "sklearn-default-0-classifier" rollout to finish: 6 of 10 updated replicas are available... + wait-seldon-resource: Waiting for deployment "sklearn-default-0-classifier" rollout to finish: 7 of 10 updated replicas are available... + wait-seldon-resource: Waiting for deployment "sklearn-default-0-classifier" rollout to finish: 8 of 10 updated replicas are available... + wait-seldon-resource: Waiting for deployment "sklearn-default-0-classifier" rollout to finish: 9 of 10 updated replicas are available... + wait-seldon-resource: deployment "sklearn-default-0-classifier" successfully rolled out + download-object-store: Added `minio-local` successfully. + download-object-store: `minio-local/data/input-data.txt` -> `/assets/input-data.txt` + download-object-store: Total: 0 B, Transferred: 146.48 KiB, Speed: 9.99 MiB/s + process-batch-inputs: Elapsed time: 47.03067970275879 + upload-object-store: Added `minio-local` successfully. + upload-object-store: `/assets/output-data.txt` -> `minio-local/data/output-data-15014702-d5a7-4dc6-a219-373b7013b744.txt` + upload-object-store: Total: 0 B, Transferred: 2.75 MiB, Speed: 81.57 MiB/s + + +## Check output in object store + +We can now visualise the output that we obtained in the object store. + +First we can check that the file is present: + + +```python +import json +wf_arr = !argo get seldon-batch-process -o json +wf = json.loads("".join(wf_arr)) +WF_ID = wf["metadata"]["uid"] +print(f"Workflow ID is {WF_ID}") +``` + + Workflow ID is 15014702-d5a7-4dc6-a219-373b7013b744 + + + +```python +!mc ls minio-seldon/data/output-data-"$WF_ID".txt +``` + + [2020-06-08 11:59:31 BST]  2.7MiB output-data-15014702-d5a7-4dc6-a219-373b7013b744.txt +  + +Now we can output the contents of the file created using the `mc head` command. + + +```python +!mc cp minio-seldon/data/output-data-"$WF_ID".txt assets/output-data.txt +!head assets/output-data.txt +``` + + ...3b744.txt: 2.75 MiB / 2.75 MiB ┃▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓┃ 24.68 MiB/s 0s{"data": {"names": ["t:0", "t:1", "t:2"], "ndarray": [[0.0006985194531162841, 0.003668039039435755, 0.9956334415074478]]}, "meta": {"tags": {"tags": {"batch_id": "03403156-a977-11ea-88f0-ea5c4746b555", "batch_index": 2.0, "batch_instance_id": "0340bbe4-a977-11ea-9ca4-ea5c4746b555"}}}} + {"data": {"names": ["t:0", "t:1", "t:2"], "ndarray": [[0.0006985194531162841, 0.003668039039435755, 0.9956334415074478]]}, "meta": {"tags": {"tags": {"batch_id": "03403156-a977-11ea-88f0-ea5c4746b555", "batch_index": 6.0, "batch_instance_id": "03411da0-a977-11ea-9ca4-ea5c4746b555"}}}} + {"data": {"names": ["t:0", "t:1", "t:2"], "ndarray": [[0.0006985194531162841, 0.003668039039435755, 0.9956334415074478]]}, "meta": {"tags": {"tags": {"batch_id": "03403156-a977-11ea-88f0-ea5c4746b555", "batch_index": 9.0, "batch_instance_id": "03415234-a977-11ea-9ca4-ea5c4746b555"}}}} + {"data": {"names": ["t:0", "t:1", "t:2"], "ndarray": [[0.0006985194531162841, 0.003668039039435755, 0.9956334415074478]]}, "meta": {"tags": {"tags": {"batch_id": "03403156-a977-11ea-88f0-ea5c4746b555", "batch_index": 3.0, "batch_instance_id": "03410f72-a977-11ea-9ca4-ea5c4746b555"}}}} + {"data": {"names": ["t:0", "t:1", "t:2"], "ndarray": [[0.0006985194531162841, 0.003668039039435755, 0.9956334415074478]]}, "meta": {"tags": {"tags": {"batch_id": "03403156-a977-11ea-88f0-ea5c4746b555", "batch_index": 1.0, "batch_instance_id": "0340b8ce-a977-11ea-9ca4-ea5c4746b555"}}}} + {"data": {"names": ["t:0", "t:1", "t:2"], "ndarray": [[0.0006985194531162841, 0.003668039039435755, 0.9956334415074478]]}, "meta": {"tags": {"tags": {"batch_id": "03403156-a977-11ea-88f0-ea5c4746b555", "batch_index": 0.0, "batch_instance_id": "0340b432-a977-11ea-9ca4-ea5c4746b555"}}}} + {"data": {"names": ["t:0", "t:1", "t:2"], "ndarray": [[0.0006985194531162841, 0.003668039039435755, 0.9956334415074478]]}, "meta": {"tags": {"tags": {"batch_id": "03403156-a977-11ea-88f0-ea5c4746b555", "batch_index": 8.0, "batch_instance_id": "034123b8-a977-11ea-9ca4-ea5c4746b555"}}}} + {"data": {"names": ["t:0", "t:1", "t:2"], "ndarray": [[0.0006985194531162841, 0.003668039039435755, 0.9956334415074478]]}, "meta": {"tags": {"tags": {"batch_id": "03403156-a977-11ea-88f0-ea5c4746b555", "batch_index": 7.0, "batch_instance_id": "0341208e-a977-11ea-9ca4-ea5c4746b555"}}}} + {"data": {"names": ["t:0", "t:1", "t:2"], "ndarray": [[0.0006985194531162841, 0.003668039039435755, 0.9956334415074478]]}, "meta": {"tags": {"tags": {"batch_id": "03403156-a977-11ea-88f0-ea5c4746b555", "batch_index": 5.0, "batch_instance_id": "03411a9e-a977-11ea-9ca4-ea5c4746b555"}}}} + {"data": {"names": ["t:0", "t:1", "t:2"], "ndarray": [[0.0006985194531162841, 0.003668039039435755, 0.9956334415074478]]}, "meta": {"tags": {"tags": {"batch_id": "03403156-a977-11ea-88f0-ea5c4746b555", "batch_index": 4.0, "batch_instance_id": "03411288-a977-11ea-9ca4-ea5c4746b555"}}}} + + + +```python +!argo delete seldon-batch-process +``` + + Workflow 'seldon-batch-process' deleted + + + +```python + +``` diff --git a/examples/batch/argo-workflows-batch/helm-charts/seldon-batch-workflow/Chart.yaml b/examples/batch/argo-workflows-batch/helm-charts/seldon-batch-workflow/Chart.yaml new file mode 100644 index 0000000000..bf39237512 --- /dev/null +++ b/examples/batch/argo-workflows-batch/helm-charts/seldon-batch-workflow/Chart.yaml @@ -0,0 +1,9 @@ +apiVersion: v1 +description: Seldon Batch Workflow +keywords: +- kubernetes +- machine-learning +name: seldon-batch-workflow +sources: +- https://github.com/SeldonIO/seldon-core +version: 0.1 diff --git a/examples/batch/argo-workflows-batch/helm-charts/seldon-batch-workflow/README.md b/examples/batch/argo-workflows-batch/helm-charts/seldon-batch-workflow/README.md new file mode 100644 index 0000000000..47e2829975 --- /dev/null +++ b/examples/batch/argo-workflows-batch/helm-charts/seldon-batch-workflow/README.md @@ -0,0 +1,3 @@ +# Seldon Batch Workflow + +This chart creates a batch workflow which leverages the seldon batch processor functionality. diff --git a/examples/batch/argo-workflows-batch/helm-charts/seldon-batch-workflow/templates/workflow.yaml b/examples/batch/argo-workflows-batch/helm-charts/seldon-batch-workflow/templates/workflow.yaml new file mode 100644 index 0000000000..c52748a39d --- /dev/null +++ b/examples/batch/argo-workflows-batch/helm-charts/seldon-batch-workflow/templates/workflow.yaml @@ -0,0 +1,145 @@ +--- +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + {{- if eq .Values.workflow.useNameAsGenerateName false }} + name: {{ .Values.workflow.name }} + {{- else }} + generateName: {{ .Values.workflow.name }} + {{- end }} + namespace: {{ .Values.workflow.namespace }} +spec: + entrypoint: seldon-batch-process + volumeClaimTemplates: + - metadata: + name: "{{ .Values.pvc.name }}" + ownerReferences: + - apiVersion: argoproj.io/v1alpha1 + blockOwnerDeletion: true + kind: Workflow + name: "{{`{{workflow.name}}`}}" + uid: "{{`{{workflow.uid}}`}}" + spec: + accessModes: [ "ReadWriteOnce" ] + resources: + requests: + storage: "{{ .Values.pvc.storage }}" + templates: + - name: seldon-batch-process + steps: + - - name: create-seldon-resource + template: create-seldon-resource-template + - - name: wait-seldon-resource + template: wait-seldon-resource-template + - - name: download-object-store + template: download-object-store-template + - - name: process-batch-inputs + template: process-batch-inputs-template + - - name: upload-object-store + template: upload-object-store-template + + - name: create-seldon-resource-template + resource: + action: create + manifest: | + apiVersion: machinelearning.seldon.io/v1 + kind: SeldonDeployment + metadata: + name: "{{ .Values.seldonDeployment.name }}" + namespace: {{ .Values.workflow.namespace }} + ownerReferences: + - apiVersion: argoproj.io/v1alpha1 + blockOwnerDeletion: true + kind: Workflow + name: "{{`{{workflow.name}}`}}" + uid: "{{`{{workflow.uid}}`}}" + spec: + name: "{{ .Values.seldonDeployment.name }}" + predictors: + - graph: + children: [] + implementation: {{ .Values.seldonDeployment.server }} + modelUri: {{ .Values.seldonDeployment.modelUri }} + name: classifier + name: default + replicas: {{ .Values.seldonDeployment.replicas }} + + - name: wait-seldon-resource-template + script: + image: bitnami/kubectl:1.17 + command: [bash] + source: | + sleep {{ .Values.seldonDeployment.waitTime }} + kubectl rollout status \ + deploy/$(kubectl get deploy -l seldon-deployment-id="{{ .Values.seldonDeployment.name }}" -o jsonpath='{.items[0].metadata.name}') + + - name: download-object-store-template + script: + image: minio/mc:RELEASE.2020-04-17T08-55-48Z + env: + - name: MINIO_SERVER_ACCESS_KEY + valueFrom: + secretKeyRef: + name: {{ .Values.minio.secret.name }} + key: {{ .Values.minio.secret.keyName.accesskey }} + - name: MINIO_SERVER_ACCESS_SECRET + valueFrom: + secretKeyRef: + name: {{ .Values.minio.secret.name }} + key: {{ .Values.minio.secret.keyName.secretkey }} + - name: MINIO_SERVER_HOST + value: {{ .Values.minio.endpoint }} + volumeMounts: + - name: "{{ .Values.pvc.name }}" + mountPath: /assets + command: [sh] + source: | + mc config host add minio-local $MINIO_SERVER_HOST $MINIO_SERVER_ACCESS_KEY $MINIO_SERVER_ACCESS_KEY + mc cp minio-local/{{ .Values.minio.inputDataPath }} /assets/input-data.txt + + - name: process-batch-inputs-template + script: + image: {{ .Values.seldonDeployment.image }} + volumeMounts: + - name: "{{ .Values.pvc.name }}" + mountPath: /assets + command: [bash] + source: | + seldon-batch-processor \ + --deployment-name "{{ .Values.seldonDeployment.name }}" \ + {{- if eq .Values.batchWorker.enableBenchmark true }} + --benchmark \ + {{- end }} + --namespace "{{ .Values.workflow.namespace }}" \ + --host "{{ .Values.batchWorker.host }}" \ + --workers "{{ .Values.batchWorker.workers }}" \ + --data-type "{{ .Values.batchWorker.dataType }}" \ + --payload-type "{{ .Values.batchWorker.payloadType }}" \ + --retries "{{ .Values.batchWorker.retries }}" \ + --input-data-path "/assets/input-data.txt" \ + --output-data-path "/assets/output-data.txt" + + - name: upload-object-store-template + script: + image: minio/mc:RELEASE.2020-04-17T08-55-48Z + volumeMounts: + - name: "{{ .Values.pvc.name }}" + mountPath: /assets + command: [sh] + env: + - name: MINIO_SERVER_ACCESS_KEY + valueFrom: + secretKeyRef: + name: {{ .Values.minio.secret.name }} + key: {{ .Values.minio.secret.keyName.accesskey }} + - name: MINIO_SERVER_ACCESS_SECRET + valueFrom: + secretKeyRef: + name: {{ .Values.minio.secret.name }} + key: {{ .Values.minio.secret.keyName.secretkey }} + - name: MINIO_SERVER_HOST + value: {{ .Values.minio.endpoint }} + source: | + mc config host add minio-local $MINIO_SERVER_HOST $MINIO_SERVER_ACCESS_KEY $MINIO_SERVER_ACCESS_KEY + mc cp /assets/output-data.txt minio-local/{{ .Values.minio.outputDataPath }} + diff --git a/examples/batch/argo-workflows-batch/helm-charts/seldon-batch-workflow/values.yaml b/examples/batch/argo-workflows-batch/helm-charts/seldon-batch-workflow/values.yaml new file mode 100644 index 0000000000..5b2fc3b909 --- /dev/null +++ b/examples/batch/argo-workflows-batch/helm-charts/seldon-batch-workflow/values.yaml @@ -0,0 +1,56 @@ +workflow: + # Name of the overarching argo workflow + name: seldon-batch-process + # If true the randomly generated string will be apended as the name of the workflow + useNameAsGenerateName: false + # Namespace where to create the workflow and all resources in batch job + namespace: default +pvc: + # Name of the persistent volume claim to be created + name: seldon-pvc + # Size of the storage volume to be created for the batch job + storage: 2Mi +# Seldon deployment to be created for batch processing +seldonDeployment: + # Name to use for the seldon deployment which by default appends generated workflow ID + name: seldon-{{workflow.uid}} + # Image to use for the batch client + image: seldonio/seldon-core-s2i-python37:1.1.1-rc + # Prepackaged model server to use [see https://docs.seldon.io/projects/seldon-core/en/latest/servers/overview.html] + server: SKLEARN_SERVER + # The URL for the model that is to be used + modelUri: gs://seldon-models/sklearn/iris + # The number of seldon deployment replicas to launch + replicas: 10 + # Waiting time before checks for deployment to ensure kubernetes cluster registers create + waitTime: 5 +# The batch worker is the component that will send the requests from the files +batchWorker: + # Endpoint of for the batch client to contact the seldon deployment + host: istio-ingressgateway.istio-system.svc.cluster.local + # Number of parallel batch client workers to process the data + workers: 100 + # Number of times client will try to send a request if it fails + retries: 3 + # The payload type to convert if choosing "data" as the data-type param + payloadType: "ndarray" + # The type of data that is expected in the input data (json, str, data) + dataType: "data" + # Whether to enable benchmarking on the batch processor worker + enableBenchmark: true +minio: + # The location of the minio endpoint + endpoint: http://minio.minio-system.svc.cluster.local:9000 + # This is the secret that should contain the values to access minio + secret: + # The name of the secret which by default is "minio" but you can create a different one + name: minio + keyName: + # The key name inside that secret to find the access key to authenticate minio + accesskey: accesskey + # The key name inside that secret to find the secret key to authenticate minio + secretkey: secretkey + # The name of the file inside of minio that will contain the batch data to process + inputDataPath: data/input-data.txt + # The name of the file inside of minio that will contain the batch data to process + outputDataPath: data/output-data-{{workflow.uid}}.txt diff --git a/examples/batch/kubeflow-pipelines-batch/.gitignore b/examples/batch/kubeflow-pipelines-batch/.gitignore new file mode 100644 index 0000000000..ea76a1cd33 --- /dev/null +++ b/examples/batch/kubeflow-pipelines-batch/.gitignore @@ -0,0 +1,2 @@ +assets/input-data.txt +assets/output-data.txt diff --git a/examples/batch/kubeflow-pipelines-batch/README.ipynb b/examples/batch/kubeflow-pipelines-batch/README.ipynb new file mode 100644 index 0000000000..24f75d1909 --- /dev/null +++ b/examples/batch/kubeflow-pipelines-batch/README.ipynb @@ -0,0 +1,431 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Batch processing with Kubeflow Pipelines\n", + "In this notebook we will dive into how you can run batch processing with Kubeflow Pipelines and Seldon Core.\n", + "\n", + "Dependencies:\n", + "* Seldon core installed as per the docs with [Istio Ingress](https://docs.seldon.io/projects/seldon-core/en/latest/workflow/install.html#install-seldon-core-with-helm)\n", + "* Kubeflow Pipelines installed (installation instructions in this notebook)\n", + "\n", + "![](assets/kubeflow-pipeline.jpg)\n", + "\n", + "## Kubeflow Pipelines Setup\n", + "\n", + "Setup the pipeline in your current cluster:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%bash\n", + "export PIPELINE_VERSION=0.5.1\n", + "kubectl apply -k github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION\n", + "kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io\n", + "kubectl apply -k github.com/kubeflow/pipelines/manifests/kustomize/env/dev?ref=$PIPELINE_VERSION" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We also install the Python Library so we can create our pipeline:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pip install kfp==0.5.1" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Add Batch Data\n", + "\n", + "In order to run our batch job we will need to create some batch data that can be used to process.\n", + "\n", + "This batch dataset will be pushed to a minio instance so it can be downloaded from Minio (which we need to install first)\n", + "\n", + "### Install Minio" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%bash \n", + "helm install minio stable/minio \\\n", + " --set accessKey=minioadmin \\\n", + " --set secretKey=minioadmin \\\n", + " --set image.tag=RELEASE.2020-04-15T19-42-18Z" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Forward the Minio port so you can access it\n", + "\n", + "You can do this by runnning the following command in your terminal:\n", + "```\n", + "kubectl port-forward svc/minio 9000:9000\n", + " ```\n", + " \n", + "### Configure local minio client" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!mc config host add minio-local http://localhost:9000 minioadmin minioadmin" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create some input for our model\n", + "\n", + "We will create a file that will contain the inputs that will be sent to our model" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "with open(\"assets/input-data.txt\", \"w\") as f:\n", + " for i in range(10000):\n", + " f.write('[[1, 2, 3, 4]]\\n')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Check the contents of the file" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "10000 assets/input-data.txt\n", + "[[1, 2, 3, 4]]\n", + "[[1, 2, 3, 4]]\n", + "[[1, 2, 3, 4]]\n", + "[[1, 2, 3, 4]]\n", + "[[1, 2, 3, 4]]\n", + "[[1, 2, 3, 4]]\n", + "[[1, 2, 3, 4]]\n", + "[[1, 2, 3, 4]]\n", + "[[1, 2, 3, 4]]\n", + "[[1, 2, 3, 4]]\n" + ] + } + ], + "source": [ + "!wc -l assets/input-data.txt\n", + "!head assets/input-data.txt" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Upload the file to our minio" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!mc mb minio-local/data\n", + "!mc cp assets/input-data.txt minio-local/data/" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create Kubeflow Pipeline\n", + "\n", + "We are now able to create a kubeflow pipeline that will allow us to enter the batch parameters through the UI.\n", + "\n", + "We will also be able to add extra steps that will download the data from a Minio client." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "mkdir -p assets/" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We use the pipeline syntax to create the kubeflow pipeline, as outlined below:" + ] + }, + { + "cell_type": "code", + "execution_count": 40, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Overwriting assets/seldon-batch-pipeline.py\n" + ] + } + ], + "source": [ + "%%writefile assets/seldon-batch-pipeline.py\n", + "\n", + "import kfp.dsl as dsl\n", + "import yaml\n", + "from kubernetes import client as k8s\n", + "\n", + "@dsl.pipeline(\n", + " name='SeldonBatch',\n", + " description='A batch processing pipeline for seldon models'\n", + ")\n", + "def nlp_pipeline(\n", + " namespace=\"kubeflow\",\n", + " seldon_server=\"SKLEARN_SERVER\",\n", + " model_path=\"gs://seldon-models/sklearn/iris\",\n", + " gateway_endpoint=\"istio-ingressgateway.istio-system.svc.cluster.local\",\n", + " retries=3,\n", + " replicas=10,\n", + " workers=100,\n", + " input_path=\"data/input-data.txt\",\n", + " output_path=\"data/output-data.txt\"):\n", + " \"\"\"\n", + " Pipeline \n", + " \"\"\"\n", + " \n", + " vop = dsl.VolumeOp(\n", + " name='seldon-batch-pvc',\n", + " resource_name=\"seldon-batch-pvc\",\n", + " modes=dsl.VOLUME_MODE_RWO,\n", + " size=\"2Mi\"\n", + " )\n", + " \n", + " seldon_deployment_yaml = f\"\"\"\n", + "apiVersion: machinelearning.seldon.io/v1\n", + "kind: SeldonDeployment\n", + "metadata:\n", + " name: \"{{{{workflow.name}}}}\"\n", + " namespace: \"{namespace}\"\n", + "spec:\n", + " name: \"{{{{workflow.name}}}}\"\n", + " predictors:\n", + " - graph:\n", + " children: []\n", + " implementation: \"{seldon_server}\"\n", + " modelUri: \"{model_path}\"\n", + " name: classifier\n", + " name: default\n", + " \"\"\"\n", + " \n", + " deploy_step = dsl.ResourceOp(\n", + " name=\"deploy_seldon\",\n", + " action=\"create\",\n", + " k8s_resource=yaml.safe_load(seldon_deployment_yaml))\n", + " \n", + " scale_and_wait = dsl.ContainerOp(\n", + " name=\"scale_and_wait_seldon\",\n", + " image=\"bitnami/kubectl:1.17\",\n", + " command=\"bash\",\n", + " arguments=[\n", + " \"-c\",\n", + " f\"sleep 10 && kubectl scale --namespace {namespace} --replicas={replicas} sdep/{{{{workflow.name}}}} && sleep 2 && kubectl rollout status deploy/$(kubectl get deploy -l seldon-deployment-id={{{{workflow.name}}}} -o jsonpath='{{.items[0].metadata.name'}})\" \n", + " ])\n", + " \n", + " download_from_object_store = dsl.ContainerOp(\n", + " name=\"download-from-object-store\",\n", + " image=\"minio/mc:RELEASE.2020-04-17T08-55-48Z\",\n", + " command=\"sh\",\n", + " arguments=[\n", + " \"-c\",\n", + " f\"mc config host add minio-local http://minio.default.svc.cluster.local:9000 minioadmin minioadmin && mc cp minio-local/{input_path} /assets/input-data.txt\" \n", + " ],\n", + " pvolumes={ \"/assets\": vop.volume })\n", + " \n", + "\n", + " batch_process_step = dsl.ContainerOp(\n", + " name='data_downloader',\n", + " image='seldonio/seldon-core-s2i-python37:1.1.1-rc',\n", + " command=\"seldon-batch-processor\",\n", + " arguments=[\n", + " \"--deployment-name\", \"{{workflow.name}}\",\n", + " \"--namespace\", namespace,\n", + " \"--host\", gateway_endpoint,\n", + " \"--retries\", retries,\n", + " \"--input-data-path\", \"/assets/input-data.txt\",\n", + " \"--output-data-path\", \"/assets/output-data.txt\",\n", + " \"--benchmark\"\n", + " ],\n", + " pvolumes={ \"/assets\": vop.volume }\n", + " )\n", + " \n", + " upload_to_object_store = dsl.ContainerOp(\n", + " name=\"upload-to-object-store\",\n", + " image=\"minio/mc:RELEASE.2020-04-17T08-55-48Z\",\n", + " command=\"sh\",\n", + " arguments=[\n", + " \"-c\",\n", + " f\"mc config host add minio-local http://minio.default.svc.cluster.local:9000 minioadmin minioadmin && mc cp /assets/output-data.txt minio-local/{output_path}\" \n", + " ],\n", + " pvolumes={ \"/assets\": vop.volume })\n", + " \n", + " delete_step = dsl.ResourceOp(\n", + " name=\"delete_seldon\",\n", + " action=\"delete\",\n", + " k8s_resource=yaml.safe_load(seldon_deployment_yaml))\n", + " \n", + " scale_and_wait.after(deploy_step)\n", + " download_from_object_store.after(scale_and_wait)\n", + " batch_process_step.after(download_from_object_store)\n", + " upload_to_object_store.after(batch_process_step)\n", + " delete_step.after(upload_to_object_store)\n", + "\n", + "if __name__ == '__main__':\n", + " import kfp.compiler as compiler\n", + " compiler.Compiler().compile(nlp_pipeline, __file__ + '.tar.gz')\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Trigger the creation \n", + "We will run the python file which triggers the creation of the pipeline that we can the upload on the UI:" + ] + }, + { + "cell_type": "code", + "execution_count": 41, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "/home/alejandro/miniconda3/lib/python3.7/site-packages/kfp/components/_data_passing.py:168: UserWarning: Missing type name was inferred as \"Integer\" based on the value \"3\".\r\n", + " warnings.warn('Missing type name was inferred as \"{}\" based on the value \"{}\".'.format(type_name, str(value)))\r\n", + "/home/alejandro/miniconda3/lib/python3.7/site-packages/kfp/components/_data_passing.py:168: UserWarning: Missing type name was inferred as \"Integer\" based on the value \"10\".\r\n", + " warnings.warn('Missing type name was inferred as \"{}\" based on the value \"{}\".'.format(type_name, str(value)))\r\n", + "/home/alejandro/miniconda3/lib/python3.7/site-packages/kfp/components/_data_passing.py:168: UserWarning: Missing type name was inferred as \"Integer\" based on the value \"100\".\r\n", + " warnings.warn('Missing type name was inferred as \"{}\" based on the value \"{}\".'.format(type_name, str(value)))\r\n" + ] + } + ], + "source": [ + "!python assets/seldon-batch-pipeline.py" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Check the pipeline has been created:" + ] + }, + { + "cell_type": "code", + "execution_count": 42, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "input-data.txt\t\t seldon-batch-pipeline.py.tar.gz\r\n", + "kubeflow-pipeline.jpg\t seldon-kubeflow-batch.gif\r\n", + "seldon-batch-pipeline.py\r\n" + ] + } + ], + "source": [ + "!ls assets/" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Open the Kubeflow Pipelines UI\n", + "\n", + "We can now open the UI by port forwarding the UI with the following command:\n", + " \n", + "```\n", + "kubectl port-forward svc/ml-pipeline-ui -n kubeflow 8000:80\n", + "```\n", + "\n", + "And we can open it locally in our browser via [http://localhost:8000](http://localhost:8000)\n", + "\n", + "Now we can follow the standard steps to create and deploy the kubeflow pipline\n", + "\n", + "![](assets/seldon-kubeflow-batch.gif)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/examples/batch/kubeflow-pipelines-batch/README.md b/examples/batch/kubeflow-pipelines-batch/README.md new file mode 100644 index 0000000000..7b58a55140 --- /dev/null +++ b/examples/batch/kubeflow-pipelines-batch/README.md @@ -0,0 +1,280 @@ +## Batch processing with Kubeflow Pipelines +In this notebook we will dive into how you can run batch processing with Kubeflow Pipelines and Seldon Core. + +Dependencies: +* Seldon core installed as per the docs with [Istio Ingress](https://docs.seldon.io/projects/seldon-core/en/latest/workflow/install.html#install-seldon-core-with-helm) +* Kubeflow Pipelines installed (installation instructions in this notebook) + +![](assets/kubeflow-pipeline.jpg) + +## Kubeflow Pipelines Setup + +Setup the pipeline in your current cluster: + + +```bash +%%bash +export PIPELINE_VERSION=0.5.1 +kubectl apply -k github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION +kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io +kubectl apply -k github.com/kubeflow/pipelines/manifests/kustomize/env/dev?ref=$PIPELINE_VERSION +``` + +We also install the Python Library so we can create our pipeline: + + +```python +pip install kfp==0.5.1 +``` + +## Add Batch Data + +In order to run our batch job we will need to create some batch data that can be used to process. + +This batch dataset will be pushed to a minio instance so it can be downloaded from Minio (which we need to install first) + +### Install Minio + + +```bash +%%bash +helm install minio stable/minio \ + --set accessKey=minioadmin \ + --set secretKey=minioadmin \ + --set image.tag=RELEASE.2020-04-15T19-42-18Z +``` + +### Forward the Minio port so you can access it + +You can do this by runnning the following command in your terminal: +``` +kubectl port-forward svc/minio 9000:9000 + ``` + +### Configure local minio client + + +```python +!mc config host add minio-local http://localhost:9000 minioadmin minioadmin +``` + +### Create some input for our model + +We will create a file that will contain the inputs that will be sent to our model + + +```python +with open("assets/input-data.txt", "w") as f: + for i in range(10000): + f.write('[[1, 2, 3, 4]]\n') +``` + +Check the contents of the file + + +```python +!wc -l assets/input-data.txt +!head assets/input-data.txt +``` + + 10000 assets/input-data.txt + [[1, 2, 3, 4]] + [[1, 2, 3, 4]] + [[1, 2, 3, 4]] + [[1, 2, 3, 4]] + [[1, 2, 3, 4]] + [[1, 2, 3, 4]] + [[1, 2, 3, 4]] + [[1, 2, 3, 4]] + [[1, 2, 3, 4]] + [[1, 2, 3, 4]] + + +### Upload the file to our minio + + +```python +!mc mb minio-local/data +!mc cp assets/input-data.txt minio-local/data/ +``` + +## Create Kubeflow Pipeline + +We are now able to create a kubeflow pipeline that will allow us to enter the batch parameters through the UI. + +We will also be able to add extra steps that will download the data from a Minio client. + + +```python +mkdir -p assets/ +``` + +We use the pipeline syntax to create the kubeflow pipeline, as outlined below: + + +```python +%%writefile assets/seldon-batch-pipeline.py + +import kfp.dsl as dsl +import yaml +from kubernetes import client as k8s + +@dsl.pipeline( + name='SeldonBatch', + description='A batch processing pipeline for seldon models' +) +def nlp_pipeline( + namespace="kubeflow", + seldon_server="SKLEARN_SERVER", + model_path="gs://seldon-models/sklearn/iris", + gateway_endpoint="istio-ingressgateway.istio-system.svc.cluster.local", + retries=3, + replicas=10, + workers=100, + input_path="data/input-data.txt", + output_path="data/output-data.txt"): + """ + Pipeline + """ + + vop = dsl.VolumeOp( + name='seldon-batch-pvc', + resource_name="seldon-batch-pvc", + modes=dsl.VOLUME_MODE_RWO, + size="2Mi" + ) + + seldon_deployment_yaml = f""" +apiVersion: machinelearning.seldon.io/v1 +kind: SeldonDeployment +metadata: + name: "{{{{workflow.name}}}}" + namespace: "{namespace}" +spec: + name: "{{{{workflow.name}}}}" + predictors: + - graph: + children: [] + implementation: "{seldon_server}" + modelUri: "{model_path}" + name: classifier + name: default + """ + + deploy_step = dsl.ResourceOp( + name="deploy_seldon", + action="create", + k8s_resource=yaml.safe_load(seldon_deployment_yaml)) + + scale_and_wait = dsl.ContainerOp( + name="scale_and_wait_seldon", + image="bitnami/kubectl:1.17", + command="bash", + arguments=[ + "-c", + f"sleep 10 && kubectl scale --namespace {namespace} --replicas={replicas} sdep/{{{{workflow.name}}}} && sleep 2 && kubectl rollout status deploy/$(kubectl get deploy -l seldon-deployment-id={{{{workflow.name}}}} -o jsonpath='{{.items[0].metadata.name'}})" + ]) + + download_from_object_store = dsl.ContainerOp( + name="download-from-object-store", + image="minio/mc:RELEASE.2020-04-17T08-55-48Z", + command="sh", + arguments=[ + "-c", + f"mc config host add minio-local http://minio.default.svc.cluster.local:9000 minioadmin minioadmin && mc cp minio-local/{input_path} /assets/input-data.txt" + ], + pvolumes={ "/assets": vop.volume }) + + + batch_process_step = dsl.ContainerOp( + name='data_downloader', + image='seldonio/seldon-core-s2i-python37:1.1.1-rc', + command="seldon-batch-processor", + arguments=[ + "--deployment-name", "{{workflow.name}}", + "--namespace", namespace, + "--host", gateway_endpoint, + "--retries", retries, + "--input-data-path", "/assets/input-data.txt", + "--output-data-path", "/assets/output-data.txt", + "--benchmark" + ], + pvolumes={ "/assets": vop.volume } + ) + + upload_to_object_store = dsl.ContainerOp( + name="upload-to-object-store", + image="minio/mc:RELEASE.2020-04-17T08-55-48Z", + command="sh", + arguments=[ + "-c", + f"mc config host add minio-local http://minio.default.svc.cluster.local:9000 minioadmin minioadmin && mc cp /assets/output-data.txt minio-local/{output_path}" + ], + pvolumes={ "/assets": vop.volume }) + + delete_step = dsl.ResourceOp( + name="delete_seldon", + action="delete", + k8s_resource=yaml.safe_load(seldon_deployment_yaml)) + + scale_and_wait.after(deploy_step) + download_from_object_store.after(scale_and_wait) + batch_process_step.after(download_from_object_store) + upload_to_object_store.after(batch_process_step) + delete_step.after(upload_to_object_store) + +if __name__ == '__main__': + import kfp.compiler as compiler + compiler.Compiler().compile(nlp_pipeline, __file__ + '.tar.gz') + +``` + + Overwriting assets/seldon-batch-pipeline.py + + +## Trigger the creation +We will run the python file which triggers the creation of the pipeline that we can the upload on the UI: + + +```python +!python assets/seldon-batch-pipeline.py +``` + + /home/alejandro/miniconda3/lib/python3.7/site-packages/kfp/components/_data_passing.py:168: UserWarning: Missing type name was inferred as "Integer" based on the value "3". + warnings.warn('Missing type name was inferred as "{}" based on the value "{}".'.format(type_name, str(value))) + /home/alejandro/miniconda3/lib/python3.7/site-packages/kfp/components/_data_passing.py:168: UserWarning: Missing type name was inferred as "Integer" based on the value "10". + warnings.warn('Missing type name was inferred as "{}" based on the value "{}".'.format(type_name, str(value))) + /home/alejandro/miniconda3/lib/python3.7/site-packages/kfp/components/_data_passing.py:168: UserWarning: Missing type name was inferred as "Integer" based on the value "100". + warnings.warn('Missing type name was inferred as "{}" based on the value "{}".'.format(type_name, str(value))) + + +Check the pipeline has been created: + + +```python +!ls assets/ +``` + + input-data.txt seldon-batch-pipeline.py.tar.gz + kubeflow-pipeline.jpg seldon-kubeflow-batch.gif + seldon-batch-pipeline.py + + +## Open the Kubeflow Pipelines UI + +We can now open the UI by port forwarding the UI with the following command: + +``` +kubectl port-forward svc/ml-pipeline-ui -n kubeflow 8000:80 +``` + +And we can open it locally in our browser via [http://localhost:8000](http://localhost:8000) + +Now we can follow the standard steps to create and deploy the kubeflow pipline + +![](assets/seldon-kubeflow-batch.gif) + + +```python + +``` diff --git a/examples/batch/kubeflow-pipelines-batch/assets/kubeflow-pipeline.jpg b/examples/batch/kubeflow-pipelines-batch/assets/kubeflow-pipeline.jpg new file mode 100755 index 0000000000..f793407067 Binary files /dev/null and b/examples/batch/kubeflow-pipelines-batch/assets/kubeflow-pipeline.jpg differ diff --git a/examples/batch/kubeflow-pipelines-batch/assets/seldon-batch-pipeline.py b/examples/batch/kubeflow-pipelines-batch/assets/seldon-batch-pipeline.py new file mode 100644 index 0000000000..9d6edebcb3 --- /dev/null +++ b/examples/batch/kubeflow-pipelines-batch/assets/seldon-batch-pipeline.py @@ -0,0 +1,124 @@ +import kfp.dsl as dsl +import yaml +from kubernetes import client as k8s + + +@dsl.pipeline( + name="SeldonBatch", description="A batch processing pipeline for seldon models" +) +def nlp_pipeline( + namespace="kubeflow", + seldon_server="SKLEARN_SERVER", + model_path="gs://seldon-models/sklearn/iris", + gateway_endpoint="istio-ingressgateway.istio-system.svc.cluster.local", + retries=3, + replicas=10, + workers=100, + input_path="data/input-data.txt", + output_path="data/output-data.txt", +): + """ + Pipeline + """ + + vop = dsl.VolumeOp( + name="seldon-batch-pvc", + resource_name="seldon-batch-pvc", + modes=dsl.VOLUME_MODE_RWO, + size="2Mi", + ) + + seldon_deployment_yaml = f""" +apiVersion: machinelearning.seldon.io/v1 +kind: SeldonDeployment +metadata: + name: "{{{{workflow.name}}}}" + namespace: "{namespace}" +spec: + name: "{{{{workflow.name}}}}" + predictors: + - graph: + children: [] + implementation: "{seldon_server}" + modelUri: "{model_path}" + name: classifier + name: default + """ + + deploy_step = dsl.ResourceOp( + name="deploy_seldon", + action="create", + k8s_resource=yaml.safe_load(seldon_deployment_yaml), + ) + + scale_and_wait = dsl.ContainerOp( + name="scale_and_wait_seldon", + image="bitnami/kubectl:1.17", + command="bash", + arguments=[ + "-c", + f"sleep 10 && kubectl scale --namespace {namespace} --replicas={replicas} sdep/{{{{workflow.name}}}} && sleep 2 && kubectl rollout status deploy/$(kubectl get deploy -l seldon-deployment-id={{{{workflow.name}}}} -o jsonpath='{{.items[0].metadata.name'}})", + ], + ) + + download_from_object_store = dsl.ContainerOp( + name="download-from-object-store", + image="minio/mc:RELEASE.2020-04-17T08-55-48Z", + command="sh", + arguments=[ + "-c", + f"mc config host add minio-local http://minio.default.svc.cluster.local:9000 minioadmin minioadmin && mc cp minio-local/{input_path} /assets/input-data.txt", + ], + pvolumes={"/assets": vop.volume}, + ) + + batch_process_step = dsl.ContainerOp( + name="data_downloader", + image="seldonio/seldon-core-s2i-python37:1.1.1-rc", + command="seldon-batch-processor", + arguments=[ + "--deployment-name", + "{{workflow.name}}", + "--namespace", + namespace, + "--host", + gateway_endpoint, + "--retries", + retries, + "--input-data-path", + "/assets/input-data.txt", + "--output-data-path", + "/assets/output-data.txt", + "--benchmark", + ], + pvolumes={"/assets": vop.volume}, + ) + + upload_to_object_store = dsl.ContainerOp( + name="upload-to-object-store", + image="minio/mc:RELEASE.2020-04-17T08-55-48Z", + command="sh", + arguments=[ + "-c", + f"mc config host add minio-local http://minio.default.svc.cluster.local:9000 minioadmin minioadmin && mc cp /assets/output-data.txt minio-local/{output_path}", + ], + pvolumes={"/assets": vop.volume}, + ) + + delete_step = dsl.ResourceOp( + name="delete_seldon", + action="delete", + k8s_resource=yaml.safe_load(seldon_deployment_yaml), + ) + + scale_and_wait.after(deploy_step) + download_from_object_store.after(scale_and_wait) + batch_process_step.after(download_from_object_store) + upload_to_object_store.after(batch_process_step) + delete_step.after(upload_to_object_store) + + +if __name__ == "__main__": + import kfp.compiler as compiler + + compiler.Compiler().compile(nlp_pipeline, __file__ + ".tar.gz") diff --git a/examples/batch/kubeflow-pipelines-batch/assets/seldon-batch-pipeline.py.tar.gz b/examples/batch/kubeflow-pipelines-batch/assets/seldon-batch-pipeline.py.tar.gz new file mode 100644 index 0000000000..b5942f6f95 Binary files /dev/null and b/examples/batch/kubeflow-pipelines-batch/assets/seldon-batch-pipeline.py.tar.gz differ diff --git a/examples/batch/kubeflow-pipelines-batch/assets/seldon-kubeflow-batch.gif b/examples/batch/kubeflow-pipelines-batch/assets/seldon-kubeflow-batch.gif new file mode 100644 index 0000000000..905d4dd3c5 Binary files /dev/null and b/examples/batch/kubeflow-pipelines-batch/assets/seldon-kubeflow-batch.gif differ diff --git a/python/seldon_core/batch_processor.py b/python/seldon_core/batch_processor.py new file mode 100644 index 0000000000..25d08ec794 --- /dev/null +++ b/python/seldon_core/batch_processor.py @@ -0,0 +1,409 @@ +import click +import json +import requests +from queue import Queue +from threading import Thread +from seldon_core.seldon_client import SeldonClient +import numpy as np +import os +import uuid +import time + +CHOICES_GATEWAY_TYPE = ["ambassador", "istio", "seldon"] +CHOICES_TRANSPORT = ["rest", "grpc"] +CHOICES_PAYLOAD_TYPE = ["ndarray", "tensor", "tftensor"] +CHOICES_DATA_TYPE = ["data", "json", "str"] +CHOICES_METHOD = ["predict"] +CHOICES_LOG_LEVEL = ["debug", "info", "warning", "error"] + + +def start_multithreaded_batch_worker( + deployment_name: str, + gateway_type: str, + namespace: str, + host: str, + transport: str, + data_type: str, + payload_type: str, + workers: int, + retries: int, + input_data_path: str, + output_data_path: str, + method: str, + log_level: str, + benchmark: bool, + batch_id: str, +) -> None: + """ + Starts the multithreaded batch worker which consists of three worker types and + two queues; the input_file_worker which reads a file and puts all lines in an + input queue, which are then read by the multiple request_processor_workers (the + number of parallel workers is specified by the workers param), which puts the output + in the output queue and then the output_file_worker which puts all the outputs in the + output file in a thread-safe approach. + + All parameters are defined and explained in detail in the run_cli function. + """ + start_time = time.time() + + q_in = Queue(workers * 2) + q_out = Queue(workers * 2) + + sc = SeldonClient( + gateway=gateway_type, + transport=transport, + deployment_name=deployment_name, + payload_type=payload_type, + gateway_endpoint=host, + namespace=namespace, + client_return_type="dict", + ) + + Thread( + target=_start_input_file_worker, args=(q_in, input_data_path), daemon=True + ).start() + + for _ in range(workers): + Thread( + target=_start_request_worker, + args=(q_in, q_out, data_type, sc, retries, batch_id), + daemon=True, + ).start() + + Thread( + target=_start_output_file_worker, args=(q_out, output_data_path), daemon=True + ).start() + + q_in.join() + q_out.join() + + if benchmark: + print(f"Elapsed time: {time.time() - start_time}") + + +def _start_input_file_worker(q_in: Queue, input_data_path: str) -> None: + """ + Runs logic for the input file worker which reads the input file from filestore + and puts all of the lines into the input queue so it can be processed. + + Parameters + --- + q_in + The queue to put all the data into for further processing + input_data_path + The local file to read the data from to be processed + """ + input_data_file = open(input_data_path, "r") + enum_idx = 0 + for line in input_data_file: + unique_id = str(uuid.uuid1()) + q_in.put((enum_idx, unique_id, line)) + enum_idx += 1 + + +def _start_output_file_worker(q_out: Queue, output_data_path: str) -> None: + """ + Runs logic for the output file worker which receives all the processed output + fro the request worker through the queue and adds it into the output file in a + thread safe manner. + + Parameters + --- + q_out + The queue to read the results from + output_data_path + The local file to write the results into + """ + output_data_file = open(output_data_path, "w") + while True: + line = q_out.get() + output_data_file.write(f"{line}\n") + q_out.task_done() + + +def _start_request_worker( + q_in: Queue, + q_out: Queue, + data_type: str, + sc: SeldonClient, + retries: int, + batch_id: str, +) -> None: + """ + Runs logic for the worker that sends requests from the queue until the queue + gets completely empty. The worker marks the task as done when it finishes processing + to ensure that the queue gets populated as it's currently configured with a threshold. + + Parameters + --- + q_in + Queue to read the input data from + q_out + Queue to put the resulting requests into + data_type + The json/str/data type to send the requests as + sc + An initialised Seldon Client configured to send the requests to + retries + The number of attempts to try for each request + batch_id + The unique identifier for the batch which is passed to all requests + """ + while True: + batch_idx, batch_instance_id, input_raw = q_in.get() + str_output = _send_batch_predict( + batch_idx, batch_instance_id, input_raw, data_type, sc, retries, batch_id + ) + # Mark task as done in the queue to add space for new tasks + q_out.put(str_output) + q_in.task_done() + + +def _send_batch_predict( + batch_idx: int, + batch_instance_id: int, + input_raw: str, + data_type: str, + sc: SeldonClient, + retries: int, + batch_id: str, +) -> str: + """ + Send an request using the Seldon Client with batch context including the + unique ID of the batch and the Batch enumerated index as metadata. This + function also uses the unique batch ID as request ID so the request can be + traced back individually in the Seldon Request Logger context. Each request + will be attempted for the number of retries, and will return the string + serialised result. + + Paramters + --- + batch_idx + The enumerated index given to the batch datapoint in order of local dataset + batch_instance_id + The unique ID of the batch datapoint created with the python uuid function + input_raw + The raw input in string format to be loaded to the respective format + data_type + The data type to send which can be str, json and data + sc + The instance of SeldonClient to use to send the requests to the seldon model + retries + The number of times to retry the request + batch_id + The unique identifier for the batch which is passed to all requests + + Returns + --- + A string serialised result of the response (or equivallent data with error info) + """ + + predict_kwargs = {} + meta = { + "tags": { + "batch_id": batch_id, + "batch_instance_id": batch_instance_id, + "batch_index": batch_idx, + } + } + predict_kwargs["meta"] = meta + predict_kwargs["headers"] = {"Seldon-Puid": batch_instance_id} + try: + data = json.loads(input_raw) + + # TODO: Add functionality to send "raw" payload + if data_type == "data": + # TODO: Update client to avoid requiring a numpy array + data_np = np.array(data) + predict_kwargs["data"] = data_np + elif data_type == "str": + predict_kwargs["str_data"] = data + elif data_type == "json": + predict_kwargs["json_data"] = data + + str_output = None + for i in range(retries): + try: + # TODO: Add functionality for explainer + # as explainer currently doesn't support meta + # TODO: Optimize client to share session for requests + seldon_payload = sc.predict(**predict_kwargs) + assert seldon_payload.success + str_output = json.dumps(seldon_payload.response) + break + except requests.exceptions.RequestException: + if i == (retries - 1): + raise + + except Exception as e: + error_resp = { + "status": {"info": "FAILURE", "reason": str(e), "status": 1}, + "meta": meta, + } + str_output = json.dumps(error_resp) + + return str_output + + +@click.command() +@click.option( + "--deployment-name", + "-d", + envvar="SELDON_BATCH_DEPLOYMENT_NAME", + required=True, + help="The name of the SeldonDeployment to send the requests to", +) +@click.option( + "--gateway-type", + "-g", + envvar="SELDON_BATCH_GATEWAY_TYPE", + type=click.Choice(CHOICES_GATEWAY_TYPE), + default="istio", + help="The gateway type for the seldon model, which can be through the ingress provider (istio/ambassador) or directly through the service (seldon)", +) +@click.option( + "--namespace", + "-n", + envvar="SELDON_BATCH_NAMESPACE", + default="default", + help="The Kubernetes namespace where the SeldonDeployment is deployed in", +) +@click.option( + "--host", + "-h", + envvar="SELDON_BATCH_HOST", + default="istio-ingressgateway.istio-system.svc.cluster.local:80", + help="The hostname for the seldon model to send the request to, which can be the ingress of the Seldon model or the service itself", +) +@click.option( + "--transport", + "-t", + envvar="SELDON_BATCH_TRANSPORT", + type=click.Choice(CHOICES_TRANSPORT), + default="rest", + help="The transport type of the SeldonDeployment model which can be REST or GRPC", +) +@click.option( + "--data-type", + "-a", + envvar="SELDON_BATCH_DATA_TYPE", + type=click.Choice(CHOICES_DATA_TYPE), + default="data", + help="Whether to use json, strData or Seldon Data type for the payload to send to the SeldonDeployment which aligns with the SeldonClient format", +) +@click.option( + "--payload-type", + "-p", + envvar="SELDON_BATCH_PAYLOAD_TYPE", + type=click.Choice(CHOICES_PAYLOAD_TYPE), + default="ndarray", + help="The payload type expected by the SeldonDeployment and hence the expected format for the data in the input file which can be an array", +) +@click.option( + "--workers", + "-w", + envvar="SELDON_BATCH_WORKERS", + type=int, + default=1, + help="The number of parallel request processor workers to run for parallel processing", +) +@click.option( + "--retries", + "-r", + envvar="SELDON_BATCH_RETRIES", + type=int, + default=3, + help="The number of retries for each request before marking an error", +) +@click.option( + "--input-data-path", + "-i", + envvar="SELDON_BATCH_INPUT_DATA_PATH", + type=click.Path(), + default="/assets/input-data.txt", + help="The local filestore path where the input file with the data to process is located", +) +@click.option( + "--output-data-path", + "-o", + envvar="SELDON_BATCH_OUTPUT_DATA_PATH", + type=click.Path(), + default="/assets/input-data.txt", + help="The local filestore path where the output file should be written with the outputs of the batch processing", +) +@click.option( + "--method", + "-m", + envvar="SELDON_BATCH_METHOD", + type=click.Choice(CHOICES_METHOD), + default="predict", + help="The method of the SeldonDeployment to send the request to which currently only supports the predict method", +) +@click.option( + "--log-level", + "-l", + envvar="SELDON_BATCH_LOG_LEVEL", + type=click.Choice(CHOICES_LOG_LEVEL), + default="info", + help="The log level for the batch processor", +) +@click.option( + "--benchmark", + "-b", + envvar="SELDON_BATCH_BENCHMARK", + is_flag=True, + help="If true the batch processor will print the elapsed time taken to run the process", +) +@click.option( + "--batch-id", + "-u", + envvar="SELDON_BATCH_ID", + default=str(uuid.uuid1()), + type=str, + help="Unique batch ID to identify all datapoints processed in this batch, if not provided is auto generated", +) +def run_cli( + deployment_name: str, + gateway_type: str, + namespace: str, + host: str, + transport: str, + data_type: str, + payload_type: str, + workers: int, + retries: int, + input_data_path: str, + output_data_path: str, + method: str, + log_level: str, + benchmark: bool, + batch_id: str, +): + """ + Command line interface for Seldon Batch Processor, which can be used to send requests + through configurable parallel workers to Seldon Core models. It is recommended that the + respective Seldon Core model is also optimized with number of replicas to distribute + and scale out the batch processing work. The processor is able to process data from local + filestore input file in various formats supported by the SeldonClient module. It is also + suggested to use the batch processor component integrated with an ETL Workflow Manager + such as Kubeflow, Argo Pipelines, Airflow, etc. which would allow for extra setup / teardown + steps such as downloading the data from object store or starting a seldon core model with replicas. + See the Seldon Core examples folder for implementations of this batch module with Seldon Core. + """ + start_multithreaded_batch_worker( + deployment_name, + gateway_type, + namespace, + host, + transport, + data_type, + payload_type, + workers, + retries, + input_data_path, + output_data_path, + method, + log_level, + benchmark, + batch_id, + ) diff --git a/python/setup.py b/python/setup.py index 5abc0a5674..497c192e0b 100644 --- a/python/setup.py +++ b/python/setup.py @@ -53,6 +53,7 @@ "seldon-core-tester = seldon_core.microservice_tester:main", "seldon-core-microservice-tester = seldon_core.microservice_tester:main", "seldon-core-api-tester = seldon_core.api_tester:main", + "seldon-batch-processor = seldon_core.batch_processor:run_cli", ] }, zip_safe=False, diff --git a/testing/.gitignore b/testing/.gitignore new file mode 100644 index 0000000000..ba4f3f7536 --- /dev/null +++ b/testing/.gitignore @@ -0,0 +1,2 @@ +scripts/output-data.txt +scripts/input-data.txt diff --git a/testing/scripts/test_batch_processor.py b/testing/scripts/test_batch_processor.py new file mode 100644 index 0000000000..a818f58dd4 --- /dev/null +++ b/testing/scripts/test_batch_processor.py @@ -0,0 +1,61 @@ +from seldon_e2e_utils import ( + wait_for_rollout, + initial_rest_request, + rest_request_ambassador, + retry_run, + create_random_data, + wait_for_status, + rest_request, + API_ISTIO_GATEWAY, +) +from subprocess import run +import time +import logging +import json +import requests +import uuid +from seldon_core.batch_processor import start_multithreaded_batch_worker + + +class TestBatchWorker(object): + def test_batch_worker(self, namespace): + spec = "../../servers/sklearnserver/samples/iris.yaml" + retry_run(f"kubectl apply -f {spec} -n {namespace}") + wait_for_status("sklearn", namespace) + wait_for_rollout("sklearn", namespace) + time.sleep(1) + + batch_size = 1000 + input_data_path = "input-data.txt" + output_data_path = "output-data.txt" + + with open(input_data_path, "w") as f: + for i in range(batch_size): + f.write("[[1,2,3,4]]\n") + + start_multithreaded_batch_worker( + "sklearn", + "istio", + namespace, + API_ISTIO_GATEWAY, + "rest", + "data", + "ndarray", + 100, + 3, + input_data_path, + output_data_path, + "predict", + "debug", + True, + str(uuid.uuid1()), + ) + + with open(output_data_path, "r") as f: + for line in f: + output = json.loads(line) + # Ensure all requests are successful + assert output.get("data", {}).get("ndarray", False) + + logging.info("Success for test_batch_worker") + run(f"kubectl delete -f {spec} -n {namespace}", shell=True)