From 8c9b33c6a381913c9bd9f1e16d2f1a4ba669071e Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Fri, 16 Sep 2022 19:02:25 +0100 Subject: [PATCH] Create TFJob and PyTorchJob from Function APIs in the Training SDK (#1659) * Create TFJob and PyTorchJob from Function APIs in the Training SDK * Install SDK in tests * Use Kubeflow Group Const * Use Final for constants Add Client info in example * Modify packages_to_install doc * Add Tensorflow GPU Image --- .github/workflows/integration-tests.yaml | 2 +- .../create-pytorchjob-from-func.ipynb | 777 ++++++++++++++++++ .../kubeflow/training/api/mpi_job_client.py | 209 +++-- .../kubeflow/training/api/mpi_job_watch.py | 29 +- .../kubeflow/training/api/mx_job_client.py | 209 +++-- .../kubeflow/training/api/mx_job_watch.py | 29 +- .../training/api/py_torch_job_client.py | 304 +++++-- .../training/api/py_torch_job_watch.py | 29 +- .../kubeflow/training/api/tf_job_client.py | 333 +++++--- .../kubeflow/training/api/tf_job_watch.py | 29 +- .../training/api/xgboost_job_client.py | 209 +++-- .../training/api/xgboost_job_watch.py | 29 +- .../kubeflow/training/constants/constants.py | 73 +- sdk/python/kubeflow/training/utils/utils.py | 108 ++- sdk/python/requirements.txt | 7 - sdk/python/setup.py | 80 +- 16 files changed, 1920 insertions(+), 536 deletions(-) create mode 100644 sdk/python/examples/create-pytorchjob-from-func.ipynb delete mode 100644 sdk/python/requirements.txt diff --git a/.github/workflows/integration-tests.yaml b/.github/workflows/integration-tests.yaml index 609adfd8a4..b3024afa91 100644 --- a/.github/workflows/integration-tests.yaml +++ b/.github/workflows/integration-tests.yaml @@ -35,4 +35,4 @@ jobs: - name: Run tests run: | pip install pytest - python3 -m pip install -r sdk/python/requirements.txt; pytest sdk/python/test --log-cli-level=info + python3 -m pip install -e sdk/python; pytest sdk/python/test --log-cli-level=info diff --git a/sdk/python/examples/create-pytorchjob-from-func.ipynb b/sdk/python/examples/create-pytorchjob-from-func.ipynb new file mode 100644 index 0000000000..945160fe17 --- /dev/null +++ b/sdk/python/examples/create-pytorchjob-from-func.ipynb @@ -0,0 +1,777 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "90d43b56-97e5-45e2-8e67-4488ed31d2df", + "metadata": { + "tags": [] + }, + "source": [ + "# Run PyTorchJob From Function\n", + "\n", + "In this Notebook we are going to create [Kubeflow PyTorchJob](https://www.kubeflow.org/docs/components/training/pytorch/).\n", + "\n", + "The PyTorchJob will run distributive training using [DistributedDataParallel strategy](https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html)." + ] + }, + { + "cell_type": "markdown", + "id": "a8bb6564-fde3-4c28-841c-012122643dd9", + "metadata": {}, + "source": [ + "## Install Kubeflow Python SDKs\n", + "\n", + "You need to install PyTorch packages and Kubeflow SDKs to run this Notebook." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d49f072e-2221-48bb-9f6d-561713d1a45c", + "metadata": {}, + "outputs": [], + "source": [ + "!pip install torch==1.12.1\n", + "!pip install torchvision==0.13.1\n", + "\n", + "# TODO (andreyvelich): Change to release version when SDK with the new APIs is published.\n", + "!pip install git+https://github.com/kubeflow/training-operator.git#subdirectory=sdk/python" + ] + }, + { + "cell_type": "markdown", + "id": "e9331a05-9127-4b3a-8077-31157e267827", + "metadata": {}, + "source": [ + "## Create Train Script for CNN Model\n", + "\n", + "This is simple **Convolutional Neural Network (CNN)** model for recognizing different picture of clothing using [Fashion MNIST Dataset](https://github.com/zalandoresearch/fashion-mnist)." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "69f21f33-5c64-452c-90c4-977fc0dadb3b", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "def train_pytorch_model():\n", + " import logging\n", + " import os\n", + " from torchvision import transforms, datasets\n", + " import torch\n", + " from torch import nn\n", + " import torch.nn.functional as F\n", + " import torch.distributed as dist\n", + "\n", + " logging.basicConfig(\n", + " format=\"%(asctime)s %(levelname)-8s %(message)s\",\n", + " datefmt=\"%Y-%m-%dT%H:%M:%SZ\",\n", + " level=logging.DEBUG,\n", + " )\n", + "\n", + " # Create PyTorch CNN Model.\n", + " class Net(nn.Module):\n", + " def __init__(self):\n", + " super(Net, self).__init__()\n", + " self.conv1 = nn.Conv2d(1, 20, 5, 1)\n", + " self.conv2 = nn.Conv2d(20, 50, 5, 1)\n", + " self.fc1 = nn.Linear(4 * 4 * 50, 500)\n", + " self.fc2 = nn.Linear(500, 10)\n", + "\n", + " def forward(self, x):\n", + " x = F.relu(self.conv1(x))\n", + " x = F.max_pool2d(x, 2, 2)\n", + " x = F.relu(self.conv2(x))\n", + " x = F.max_pool2d(x, 2, 2)\n", + " x = x.view(-1, 4 * 4 * 50)\n", + " x = F.relu(self.fc1(x))\n", + " x = self.fc2(x)\n", + " return F.log_softmax(x, dim=1)\n", + "\n", + " # Get dist parameters.\n", + " # Kubeflow Training Operator automatically set appropriate RANK and WORLD_SIZE based on the configuration.\n", + " RANK = int(os.environ[\"RANK\"])\n", + " WORLD_SIZE = int(os.environ[\"WORLD_SIZE\"])\n", + " \n", + " model = Net()\n", + " # Attach model to DistributedDataParallel strategy.\n", + " dist.init_process_group(backend=\"gloo\", rank=RANK, world_size=WORLD_SIZE)\n", + " Distributor = nn.parallel.DistributedDataParallel\n", + " model = Distributor(model)\n", + "\n", + " # Split batch size for each worker.\n", + " batch_size = int(128 / WORLD_SIZE)\n", + "\n", + " # Get Fashion MNIST DataSet.\n", + " train_loader = torch.utils.data.DataLoader(\n", + " datasets.FashionMNIST(\n", + " \"./data\",\n", + " train=True,\n", + " download=True,\n", + " transform=transforms.Compose([transforms.ToTensor()]),\n", + " ),\n", + " batch_size=batch_size,\n", + " )\n", + "\n", + " # Start Training.\n", + " logging.info(f\"Start training for RANK: {RANK}. WORLD_SIZE: {WORLD_SIZE}\")\n", + " for epoch in range(1):\n", + " model.train()\n", + " optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.5)\n", + "\n", + " for batch_idx, (data, target) in enumerate(train_loader):\n", + " optimizer.zero_grad()\n", + " output = model(data)\n", + " loss = F.nll_loss(output, target)\n", + " loss.backward()\n", + " optimizer.step()\n", + " if batch_idx % 10 == 0:\n", + " logging.info(\n", + " \"Train Epoch: {} [{}/{} ({:.0f}%)]\\tloss={:.4f}\".format(\n", + " epoch,\n", + " batch_idx * len(data),\n", + " len(train_loader.dataset),\n", + " 100.0 * batch_idx / len(train_loader),\n", + " loss.item(),\n", + " )\n", + " )" + ] + }, + { + "cell_type": "markdown", + "id": "8cfe8739-1f94-476a-80e3-dd6e3237d9ed", + "metadata": { + "execution": { + "iopub.execute_input": "2022-09-01T19:32:37.813779Z", + "iopub.status.busy": "2022-09-01T19:32:37.812759Z", + "iopub.status.idle": "2022-09-01T19:32:37.827050Z", + "shell.execute_reply": "2022-09-01T19:32:37.825186Z", + "shell.execute_reply.started": "2022-09-01T19:32:37.813690Z" + } + }, + "source": [ + "## Run Training Locally in the Notebook\n", + "\n", + "We are going to download Fashion MNIST Dataset and start local training." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "9e2c6fd8-d0ba-4bc6-ac90-d4cf09751ace", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2022-09-12T18:21:28Z INFO Added key: store_based_barrier_key:1 to store for rank: 0\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz\n", + "Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz to ./data/FashionMNIST/raw/train-images-idx3-ubyte.gz\n" + ] + }, + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "851b228ae0324915882f834224abe134", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + " 0%| | 0/26421880 [00:00read_namespaced_pod_log: %s\n" % e) + "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e + ) def get_log_queue_pool(streams): @@ -52,8 +53,13 @@ def get_log_queue_pool(streams): class MPIJobClient(object): - def __init__(self, config_file=None, context=None, # pylint: disable=too-many-arguments - client_configuration=None, persist_config=True): + def __init__( + self, + config_file=None, + context=None, # pylint: disable=too-many-arguments + client_configuration=None, + persist_config=True, + ): """ MPIJob client constructor :param config_file: kubeconfig file, defaults to ~/.kube/config @@ -66,7 +72,8 @@ def __init__(self, config_file=None, context=None, # pylint: disable=too-many-a config_file=config_file, context=context, client_configuration=client_configuration, - persist_config=persist_config) + persist_config=persist_config, + ) else: config.load_incluster_config() @@ -86,20 +93,24 @@ def create(self, mpijob, namespace=None): try: outputs = self.custom_api.create_namespaced_custom_object( - constants.MPIJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.MPIJOB_VERSION, namespace, constants.MPIJOB_PLURAL, - mpijob) + mpijob, + ) except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->create_namespaced_custom_object:\ - %s\n" % e) + %s\n" + % e + ) return outputs - def get(self, name=None, namespace=None, watch=False, - timeout_seconds=600): # pylint: disable=inconsistent-return-statements + def get( + self, name=None, namespace=None, watch=False, timeout_seconds=600 + ): # pylint: disable=inconsistent-return-statements """ Get the mpijob :param name: existing mpijob name, if not defined, the get all mpijobs in the namespace. @@ -114,17 +125,17 @@ def get(self, name=None, namespace=None, watch=False, if name: if watch: mpijob_watch( - name=name, - namespace=namespace, - timeout_seconds=timeout_seconds) + name=name, namespace=namespace, timeout_seconds=timeout_seconds + ) else: thread = self.custom_api.get_namespaced_custom_object( - constants.MPIJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.MPIJOB_VERSION, namespace, constants.MPIJOB_PLURAL, name, - async_req=True) + async_req=True, + ) mpijob = None try: @@ -134,24 +145,28 @@ def get(self, name=None, namespace=None, watch=False, except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->get_namespaced_custom_object:\ - %s\n" % e) + %s\n" + % e + ) except Exception as e: raise RuntimeError( "There was a problem to get MPIJob {0} in namespace {1}. Exception: \ - {2} ".format(name, namespace, e)) + {2} ".format( + name, namespace, e + ) + ) return mpijob else: if watch: - mpijob_watch( - namespace=namespace, - timeout_seconds=timeout_seconds) + mpijob_watch(namespace=namespace, timeout_seconds=timeout_seconds) else: thread = self.custom_api.list_namespaced_custom_object( - constants.MPIJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.MPIJOB_VERSION, namespace, constants.MPIJOB_PLURAL, - async_req=True) + async_req=True, + ) mpijobs = None try: @@ -161,11 +176,16 @@ def get(self, name=None, namespace=None, watch=False, except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->list_namespaced_custom_object:\ - %s\n" % e) + %s\n" + % e + ) except Exception as e: raise RuntimeError( "There was a problem to list MPIJobs in namespace {0}. \ - Exception: {1} ".format(namespace, e)) + Exception: {1} ".format( + namespace, e + ) + ) return mpijobs def patch(self, name, mpijob, namespace=None): @@ -181,16 +201,19 @@ def patch(self, name, mpijob, namespace=None): try: outputs = self.custom_api.patch_namespaced_custom_object( - constants.MPIJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.MPIJOB_VERSION, namespace, constants.MPIJOB_PLURAL, name, - mpijob) + mpijob, + ) except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->patch_namespaced_custom_object:\ - %s\n" % e) + %s\n" + % e + ) return outputs @@ -206,23 +229,29 @@ def delete(self, name, namespace=None): try: return self.custom_api.delete_namespaced_custom_object( - group=constants.MPIJOB_GROUP, + group=constants.KUBEFLOW_GROUP, version=constants.MPIJOB_VERSION, namespace=namespace, plural=constants.MPIJOB_PLURAL, name=name, - body=client.V1DeleteOptions()) + body=client.V1DeleteOptions(), + ) except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->delete_namespaced_custom_object:\ - %s\n" % e) - - def wait_for_job(self, name, # pylint: disable=inconsistent-return-statements - namespace=None, - timeout_seconds=600, - polling_interval=30, - watch=False, - status_callback=None): + %s\n" + % e + ) + + def wait_for_job( + self, + name, # pylint: disable=inconsistent-return-statements + namespace=None, + timeout_seconds=600, + polling_interval=30, + watch=False, + status_callback=None, + ): """Wait for the specified job to finish. :param name: Name of the TfJob. @@ -240,9 +269,8 @@ def wait_for_job(self, name, # pylint: disable=inconsistent-return-statements if watch: mpijob_watch( - name=name, - namespace=namespace, - timeout_seconds=timeout_seconds) + name=name, namespace=namespace, timeout_seconds=timeout_seconds + ) else: return self.wait_for_condition( name, @@ -250,14 +278,18 @@ def wait_for_job(self, name, # pylint: disable=inconsistent-return-statements namespace=namespace, timeout_seconds=timeout_seconds, polling_interval=polling_interval, - status_callback=status_callback) - - def wait_for_condition(self, name, - expected_condition, - namespace=None, - timeout_seconds=600, - polling_interval=30, - status_callback=None): + status_callback=status_callback, + ) + + def wait_for_condition( + self, + name, + expected_condition, + namespace=None, + timeout_seconds=600, + polling_interval=30, + status_callback=None, + ): """Waits until any of the specified conditions occur. :param name: Name of the job. @@ -296,7 +328,9 @@ def wait_for_condition(self, name, raise RuntimeError( "Timeout waiting for MPIJob {0} in namespace {1} to enter one of the " - "conditions {2}.".format(name, namespace, expected_condition), mpijob) + "conditions {2}.".format(name, namespace, expected_condition), + mpijob, + ) def get_job_status(self, name, namespace=None): """Returns MPIJob status, such as Running, Failed or Succeeded. @@ -332,8 +366,14 @@ def is_job_succeeded(self, name, namespace=None): mpijob_status = self.get_job_status(name, namespace=namespace) return mpijob_status.lower() == "succeeded" - def get_pod_names(self, name, namespace=None, master=False, # pylint: disable=inconsistent-return-statements - replica_type=None, replica_index=None): + def get_pod_names( + self, + name, + namespace=None, + master=False, # pylint: disable=inconsistent-return-statements + replica_type=None, + replica_index=None, + ): """ Get pod names of MPIJob. :param name: mpijob name @@ -348,15 +388,17 @@ def get_pod_names(self, name, namespace=None, master=False, # pylint: disable=i if namespace is None: namespace = utils.get_default_target_namespace() - labels = utils.get_job_labels(name, master=master, - replica_type=replica_type, - replica_index=replica_index) + labels = utils.get_job_labels( + name, master=master, replica_type=replica_type, replica_index=replica_index + ) try: resp = self.core_api.list_namespaced_pod( - namespace, label_selector=utils.to_selector(labels)) + namespace, label_selector=utils.to_selector(labels) + ) except client.rest.ApiException as e: raise RuntimeError( - "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e) + "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e + ) pod_names = [] for pod in resp.items: @@ -364,13 +406,22 @@ def get_pod_names(self, name, namespace=None, master=False, # pylint: disable=i pod_names.append(pod.metadata.name) if not pod_names: - logging.warning("Not found Pods of the MPIJob %s with the labels %s.", name, labels) + logging.warning( + "Not found Pods of the MPIJob %s with the labels %s.", name, labels + ) else: return set(pod_names) - def get_logs(self, name, namespace=None, master=True, - replica_type=None, replica_index=None, - follow=False, container="mpi"): + def get_logs( + self, + name, + namespace=None, + master=True, + replica_type=None, + replica_index=None, + follow=False, + container="mpi", + ): """ Get training logs of the MPIJob. By default only get the logs of Pod that has labels 'job-role: master'. @@ -389,16 +440,27 @@ def get_logs(self, name, namespace=None, master=True, if namespace is None: namespace = utils.get_default_target_namespace() - pod_names = list(self.get_pod_names(name, namespace=namespace, - master=master, - replica_type=replica_type, - replica_index=replica_index)) + pod_names = list( + self.get_pod_names( + name, + namespace=namespace, + master=master, + replica_type=replica_type, + replica_index=replica_index, + ) + ) if pod_names: if follow: log_streams = [] for pod in pod_names: - log_streams.append(k8s_watch.Watch().stream(self.core_api.read_namespaced_pod_log, - name=pod, namespace=namespace, container=container)) + log_streams.append( + k8s_watch.Watch().stream( + self.core_api.read_namespaced_pod_log, + name=pod, + namespace=namespace, + container=container, + ) + ) finished = [False for _ in log_streams] # create thread and queue per stream, for non-blocking iteration @@ -424,11 +486,18 @@ def get_logs(self, name, namespace=None, master=True, else: for pod in pod_names: try: - pod_logs = self.core_api.read_namespaced_pod_log(pod, namespace, container=container) + pod_logs = self.core_api.read_namespaced_pod_log( + pod, namespace, container=container + ) logging.info("The logs of Pod %s:\n %s", pod, pod_logs) except client.rest.ApiException as e: raise RuntimeError( - "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e) + "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" + % e + ) else: - raise RuntimeError("Not found Pods of the MPIJob {} " - "in namespace {}".format(name, namespace)) + raise RuntimeError( + "Not found Pods of the MPIJob {} " + "in namespace {}".format(name, namespace) + ) + diff --git a/sdk/python/kubeflow/training/api/mpi_job_watch.py b/sdk/python/kubeflow/training/api/mpi_job_watch.py index 7c2eda6482..d540284c8c 100644 --- a/sdk/python/kubeflow/training/api/mpi_job_watch.py +++ b/sdk/python/kubeflow/training/api/mpi_job_watch.py @@ -20,8 +20,9 @@ from kubeflow.training.utils import utils tbl = utils.TableLogger( - header="{:<30.30} {:<20.20} {:<30.30}".format('NAME', 'STATE', 'TIME'), - column_format="{:<30.30} {:<20.20} {:<30.30}") + header="{:<30.30} {:<20.20} {:<30.30}".format("NAME", "STATE", "TIME"), + column_format="{:<30.30} {:<20.20} {:<30.30}", +) @retrying.retry(wait_fixed=1000, stop_max_attempt_number=20) @@ -33,26 +34,30 @@ def watch(name=None, namespace=None, timeout_seconds=600): stream = k8s_watch.Watch().stream( client.CustomObjectsApi().list_namespaced_custom_object, - constants.MPIJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.MPIJOB_VERSION, namespace, constants.MPIJOB_PLURAL, - timeout_seconds=timeout_seconds) + timeout_seconds=timeout_seconds, + ) for event in stream: - mpijob = event['object'] - mpijob_name = mpijob['metadata']['name'] + mpijob = event["object"] + mpijob_name = mpijob["metadata"]["name"] if name and name != mpijob_name: continue else: - status = '' - update_time = '' - last_condition = mpijob.get('status', {}).get('conditions', [{}])[-1] - status = last_condition.get('type', '') - update_time = last_condition.get('lastTransitionTime', '') + status = "" + update_time = "" + last_condition = mpijob.get("status", {}).get("conditions", [{}])[-1] + status = last_condition.get("type", "") + update_time = last_condition.get("lastTransitionTime", "") tbl(mpijob_name, status, update_time) if name == mpijob_name: - if status in [constants.JOB_STATUS_SUCCEEDED, constants.JOB_STATUS_FAILED]: + if status in [ + constants.JOB_STATUS_SUCCEEDED, + constants.JOB_STATUS_FAILED, + ]: break diff --git a/sdk/python/kubeflow/training/api/mx_job_client.py b/sdk/python/kubeflow/training/api/mx_job_client.py index c7bf2a7bac..c200ba3471 100644 --- a/sdk/python/kubeflow/training/api/mx_job_client.py +++ b/sdk/python/kubeflow/training/api/mx_job_client.py @@ -25,7 +25,7 @@ from .mx_job_watch import watch as mxjob_watch -logging.basicConfig(format='%(message)s') +logging.basicConfig(format="%(message)s") logging.getLogger().setLevel(logging.INFO) @@ -39,7 +39,8 @@ def wrap_log_stream(q, stream): return except Exception as e: raise RuntimeError( - "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e) + "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e + ) def get_log_queue_pool(streams): @@ -52,8 +53,13 @@ def get_log_queue_pool(streams): class MXJobClient(object): - def __init__(self, config_file=None, context=None, # pylint: disable=too-many-arguments - client_configuration=None, persist_config=True): + def __init__( + self, + config_file=None, + context=None, # pylint: disable=too-many-arguments + client_configuration=None, + persist_config=True, + ): """ MXJob client constructor :param config_file: kubeconfig file, defaults to ~/.kube/config @@ -66,7 +72,8 @@ def __init__(self, config_file=None, context=None, # pylint: disable=too-many-a config_file=config_file, context=context, client_configuration=client_configuration, - persist_config=persist_config) + persist_config=persist_config, + ) else: config.load_incluster_config() @@ -86,20 +93,24 @@ def create(self, mxjob, namespace=None): try: outputs = self.custom_api.create_namespaced_custom_object( - constants.MXJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.MXJOB_VERSION, namespace, constants.MXJOB_PLURAL, - mxjob) + mxjob, + ) except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->create_namespaced_custom_object:\ - %s\n" % e) + %s\n" + % e + ) return outputs - def get(self, name=None, namespace=None, watch=False, - timeout_seconds=600): # pylint: disable=inconsistent-return-statements + def get( + self, name=None, namespace=None, watch=False, timeout_seconds=600 + ): # pylint: disable=inconsistent-return-statements """ Get the mxjob :param name: existing mxjob name, if not defined, the get all mxjobs in the namespace. @@ -114,17 +125,17 @@ def get(self, name=None, namespace=None, watch=False, if name: if watch: mxjob_watch( - name=name, - namespace=namespace, - timeout_seconds=timeout_seconds) + name=name, namespace=namespace, timeout_seconds=timeout_seconds + ) else: thread = self.custom_api.get_namespaced_custom_object( - constants.MXJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.MXJOB_VERSION, namespace, constants.MXJOB_PLURAL, name, - async_req=True) + async_req=True, + ) mxjob = None try: @@ -134,24 +145,28 @@ def get(self, name=None, namespace=None, watch=False, except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->get_namespaced_custom_object:\ - %s\n" % e) + %s\n" + % e + ) except Exception as e: raise RuntimeError( "There was a problem to get MXJob {0} in namespace {1}. Exception: \ - {2} ".format(name, namespace, e)) + {2} ".format( + name, namespace, e + ) + ) return mxjob else: if watch: - mxjob_watch( - namespace=namespace, - timeout_seconds=timeout_seconds) + mxjob_watch(namespace=namespace, timeout_seconds=timeout_seconds) else: thread = self.custom_api.list_namespaced_custom_object( - constants.MXJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.MXJOB_VERSION, namespace, constants.MXJOB_PLURAL, - async_req=True) + async_req=True, + ) mxjobs = None try: @@ -161,11 +176,16 @@ def get(self, name=None, namespace=None, watch=False, except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->list_namespaced_custom_object:\ - %s\n" % e) + %s\n" + % e + ) except Exception as e: raise RuntimeError( "There was a problem to list MXJobs in namespace {0}. \ - Exception: {1} ".format(namespace, e)) + Exception: {1} ".format( + namespace, e + ) + ) return mxjobs def patch(self, name, mxjob, namespace=None): @@ -181,16 +201,19 @@ def patch(self, name, mxjob, namespace=None): try: outputs = self.custom_api.patch_namespaced_custom_object( - constants.MXJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.MXJOB_VERSION, namespace, constants.MXJOB_PLURAL, name, - mxjob) + mxjob, + ) except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->patch_namespaced_custom_object:\ - %s\n" % e) + %s\n" + % e + ) return outputs @@ -206,23 +229,29 @@ def delete(self, name, namespace=None): try: return self.custom_api.delete_namespaced_custom_object( - group=constants.MXJOB_GROUP, + group=constants.KUBEFLOW_GROUP, version=constants.MXJOB_VERSION, namespace=namespace, plural=constants.MXJOB_PLURAL, name=name, - body=client.V1DeleteOptions()) + body=client.V1DeleteOptions(), + ) except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->delete_namespaced_custom_object:\ - %s\n" % e) - - def wait_for_job(self, name, # pylint: disable=inconsistent-return-statements - namespace=None, - timeout_seconds=600, - polling_interval=30, - watch=False, - status_callback=None): + %s\n" + % e + ) + + def wait_for_job( + self, + name, # pylint: disable=inconsistent-return-statements + namespace=None, + timeout_seconds=600, + polling_interval=30, + watch=False, + status_callback=None, + ): """Wait for the specified job to finish. :param name: Name of the TfJob. @@ -239,10 +268,7 @@ def wait_for_job(self, name, # pylint: disable=inconsistent-return-statements namespace = utils.get_default_target_namespace() if watch: - mxjob_watch( - name=name, - namespace=namespace, - timeout_seconds=timeout_seconds) + mxjob_watch(name=name, namespace=namespace, timeout_seconds=timeout_seconds) else: return self.wait_for_condition( name, @@ -250,14 +276,18 @@ def wait_for_job(self, name, # pylint: disable=inconsistent-return-statements namespace=namespace, timeout_seconds=timeout_seconds, polling_interval=polling_interval, - status_callback=status_callback) - - def wait_for_condition(self, name, - expected_condition, - namespace=None, - timeout_seconds=600, - polling_interval=30, - status_callback=None): + status_callback=status_callback, + ) + + def wait_for_condition( + self, + name, + expected_condition, + namespace=None, + timeout_seconds=600, + polling_interval=30, + status_callback=None, + ): """Waits until any of the specified conditions occur. :param name: Name of the job. @@ -296,7 +326,9 @@ def wait_for_condition(self, name, raise RuntimeError( "Timeout waiting for MXJob {0} in namespace {1} to enter one of the " - "conditions {2}.".format(name, namespace, expected_condition), mxjob) + "conditions {2}.".format(name, namespace, expected_condition), + mxjob, + ) def get_job_status(self, name, namespace=None): """Returns MXJob status, such as Running, Failed or Succeeded. @@ -332,8 +364,14 @@ def is_job_succeeded(self, name, namespace=None): mxjob_status = self.get_job_status(name, namespace=namespace) return mxjob_status.lower() == "succeeded" - def get_pod_names(self, name, namespace=None, master=False, # pylint: disable=inconsistent-return-statements - replica_type=None, replica_index=None): + def get_pod_names( + self, + name, + namespace=None, + master=False, # pylint: disable=inconsistent-return-statements + replica_type=None, + replica_index=None, + ): """ Get pod names of MXJob. :param name: mxjob name @@ -348,16 +386,18 @@ def get_pod_names(self, name, namespace=None, master=False, # pylint: disable=i if namespace is None: namespace = utils.get_default_target_namespace() - labels = utils.get_job_labels(name, master=master, - replica_type=replica_type, - replica_index=replica_index) + labels = utils.get_job_labels( + name, master=master, replica_type=replica_type, replica_index=replica_index + ) try: resp = self.core_api.list_namespaced_pod( - namespace, label_selector=utils.to_selector(labels)) + namespace, label_selector=utils.to_selector(labels) + ) except client.rest.ApiException as e: raise RuntimeError( - "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e) + "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e + ) pod_names = [] for pod in resp.items: @@ -365,13 +405,22 @@ def get_pod_names(self, name, namespace=None, master=False, # pylint: disable=i pod_names.append(pod.metadata.name) if not pod_names: - logging.warning("Not found Pods of the MXJob %s with the labels %s.", name, labels) + logging.warning( + "Not found Pods of the MXJob %s with the labels %s.", name, labels + ) else: return set(pod_names) - def get_logs(self, name, namespace=None, master=True, - replica_type=None, replica_index=None, - follow=False, container="mxnet"): + def get_logs( + self, + name, + namespace=None, + master=True, + replica_type=None, + replica_index=None, + follow=False, + container="mxnet", + ): """ Get training logs of the MXJob. By default only get the logs of Pod that has labels 'job-role: master'. @@ -390,16 +439,27 @@ def get_logs(self, name, namespace=None, master=True, if namespace is None: namespace = utils.get_default_target_namespace() - pod_names = list(self.get_pod_names(name, namespace=namespace, - master=master, - replica_type=replica_type, - replica_index=replica_index)) + pod_names = list( + self.get_pod_names( + name, + namespace=namespace, + master=master, + replica_type=replica_type, + replica_index=replica_index, + ) + ) if pod_names: if follow: log_streams = [] for pod in pod_names: - log_streams.append(k8s_watch.Watch().stream(self.core_api.read_namespaced_pod_log, - name=pod, namespace=namespace, container=container)) + log_streams.append( + k8s_watch.Watch().stream( + self.core_api.read_namespaced_pod_log, + name=pod, + namespace=namespace, + container=container, + ) + ) finished = [False for _ in log_streams] # create thread and queue per stream, for non-blocking iteration @@ -425,11 +485,18 @@ def get_logs(self, name, namespace=None, master=True, else: for pod in pod_names: try: - pod_logs = self.core_api.read_namespaced_pod_log(pod, namespace, container=container) + pod_logs = self.core_api.read_namespaced_pod_log( + pod, namespace, container=container + ) logging.info("The logs of Pod %s:\n %s", pod, pod_logs) except client.rest.ApiException as e: raise RuntimeError( - "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e) + "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" + % e + ) else: - raise RuntimeError("Not found Pods of the MXJob {} " - "in namespace {}".format(name, namespace)) + raise RuntimeError( + "Not found Pods of the MXJob {} " + "in namespace {}".format(name, namespace) + ) + diff --git a/sdk/python/kubeflow/training/api/mx_job_watch.py b/sdk/python/kubeflow/training/api/mx_job_watch.py index 5dd05cc97c..e97bfd73fd 100644 --- a/sdk/python/kubeflow/training/api/mx_job_watch.py +++ b/sdk/python/kubeflow/training/api/mx_job_watch.py @@ -20,8 +20,9 @@ from kubeflow.training.utils import utils tbl = utils.TableLogger( - header="{:<30.30} {:<20.20} {:<30.30}".format('NAME', 'STATE', 'TIME'), - column_format="{:<30.30} {:<20.20} {:<30.30}") + header="{:<30.30} {:<20.20} {:<30.30}".format("NAME", "STATE", "TIME"), + column_format="{:<30.30} {:<20.20} {:<30.30}", +) @retrying.retry(wait_fixed=1000, stop_max_attempt_number=20) @@ -33,26 +34,30 @@ def watch(name=None, namespace=None, timeout_seconds=600): stream = k8s_watch.Watch().stream( client.CustomObjectsApi().list_namespaced_custom_object, - constants.MXJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.MXJOB_VERSION, namespace, constants.MXJOB_PLURAL, - timeout_seconds=timeout_seconds) + timeout_seconds=timeout_seconds, + ) for event in stream: - mxjob = event['object'] - mxjob_name = mxjob['metadata']['name'] + mxjob = event["object"] + mxjob_name = mxjob["metadata"]["name"] if name and name != mxjob_name: continue else: - status = '' - update_time = '' - last_condition = mxjob.get('status', {}).get('conditions', [{}])[-1] - status = last_condition.get('type', '') - update_time = last_condition.get('lastTransitionTime', '') + status = "" + update_time = "" + last_condition = mxjob.get("status", {}).get("conditions", [{}])[-1] + status = last_condition.get("type", "") + update_time = last_condition.get("lastTransitionTime", "") tbl(mxjob_name, status, update_time) if name == mxjob_name: - if status in [constants.JOB_STATUS_SUCCEEDED, constants.JOB_STATUS_FAILED]: + if status in [ + constants.JOB_STATUS_SUCCEEDED, + constants.JOB_STATUS_FAILED, + ]: break diff --git a/sdk/python/kubeflow/training/api/py_torch_job_client.py b/sdk/python/kubeflow/training/api/py_torch_job_client.py index 7d922b5f71..8dba410e70 100644 --- a/sdk/python/kubeflow/training/api/py_torch_job_client.py +++ b/sdk/python/kubeflow/training/api/py_torch_job_client.py @@ -15,26 +15,31 @@ import multiprocessing import time import logging - +from typing import Callable, List, Dict, Any from kubernetes import client, config from kubeflow.training.constants import constants from kubeflow.training.utils import utils +from kubeflow.training import models +from kubeflow.training.api.py_torch_job_watch import watch as pytorchjob_watch -from .py_torch_job_watch import watch as pytorchjob_watch - -logging.basicConfig(format='%(message)s') +logging.basicConfig(format="%(message)s") logging.getLogger().setLevel(logging.INFO) class PyTorchJobClient(object): - def __init__(self, config_file=None, context=None, # pylint: disable=too-many-arguments - client_configuration=None, persist_config=True): + def __init__( + self, + config_file=None, + context=None, # pylint: disable=too-many-arguments + client_configuration=None, + persist_config=True, + ): """ PyTorchJob client constructor :param config_file: kubeconfig file, defaults to ~/.kube/config - :param context: kubernetes context - :param client_configuration: kubernetes configuration object + :param context: Kubernetes context + :param client_configuration: configuration for Kubernetes client :param persist_config: """ if config_file or not utils.is_running_in_k8s(): @@ -42,40 +47,116 @@ def __init__(self, config_file=None, context=None, # pylint: disable=too-many-a config_file=config_file, context=context, client_configuration=client_configuration, - persist_config=persist_config) + persist_config=persist_config, + ) else: config.load_incluster_config() self.custom_api = client.CustomObjectsApi() self.core_api = client.CoreV1Api() - def create(self, pytorchjob, namespace=None): + def create(self, pytorchjob, namespace=utils.get_default_target_namespace()): """ Create the PyTorchJob - :param pytorchjob: pytorchjob object + :param pytorchjob: PyTorchJob object :param namespace: defaults to current or default namespace - :return: created pytorchjob """ - if namespace is None: - namespace = utils.set_pytorchjob_namespace(pytorchjob) - try: - outputs = self.custom_api.create_namespaced_custom_object( - constants.PYTORCHJOB_GROUP, + self.custom_api.create_namespaced_custom_object( + constants.KUBEFLOW_GROUP, constants.PYTORCHJOB_VERSION, namespace, constants.PYTORCHJOB_PLURAL, - pytorchjob) + pytorchjob, + ) except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->create_namespaced_custom_object:\ - %s\n" % e) - - return outputs + %s\n" + % e + ) + + logging.info("PyTorchJob {} has been created".format(pytorchjob.metadata.name)) + + def create_pytorchjob_from_func( + self, + name: str, + func: Callable, + parameters: Dict[str, Any] = None, + base_image: str = constants.PYTORCHJOB_BASE_IMAGE, + namespace: str = utils.get_default_target_namespace(), + num_worker_replicas: int = None, + packages_to_install: List[str] = None, + pip_index_url: str = "https://pypi.org/simple", + ): + """Create PyTorchJob from the function. + + Args: + name: Name for the PyTorchJob. + func: Function that PyTorchJob uses to train the model. This function + must be Callable. Optionally, this function might have one dict + argument to define input parameters for the function. + parameters: Dict of input parameters that training function might receive. + base_image: Image to use when executing the training function. + namespace: Namespace for the PyTorchJob. + num_worker_replicas: Number of Worker replicas for the PyTorchJob. + If number of Worker replicas is 1, PyTorchJob uses only + Master replica. + packages_to_install: List of Python packages to install in addition + to the base image packages. These packages are installed before + executing the objective function. + pip_index_url: The PyPI url from which to install Python packages. + """ - def get(self, name=None, namespace=None, watch=False, - timeout_seconds=600): # pylint: disable=inconsistent-return-statements + # Check if at least one worker replica is set. + if num_worker_replicas is None: + raise ValueError("At least one Worker replica for PyTorchJob must be set") + + # Check if function is callable. + if not callable(func): + raise ValueError( + f"Training function must be callable, got function type: {type(func)}" + ) + + # Get PyTorchJob Pod template spec. + pod_template_spec = utils.get_pod_template_spec( + func=func, + parameters=parameters, + base_image=base_image, + container_name="pytorch", + packages_to_install=packages_to_install, + pip_index_url=pip_index_url, + ) + + # Create PyTorchJob template. + pytorchjob = models.KubeflowOrgV1PyTorchJob( + api_version=f"{constants.KUBEFLOW_GROUP}/{constants.PYTORCHJOB_VERSION}", + kind=constants.PYTORCHJOB_KIND, + metadata=client.V1ObjectMeta(name=name, namespace=namespace), + spec=models.KubeflowOrgV1PyTorchJobSpec( + run_policy=models.V1RunPolicy(clean_pod_policy=None), + pytorch_replica_specs={}, + ), + ) + + # Add Master and Worker replicas to the PyTorchJob. + pytorchjob.spec.pytorch_replica_specs["Master"] = models.V1ReplicaSpec( + replicas=1, template=pod_template_spec, + ) + + # If number of Worker replicas is 1, PyTorchJob uses only Master replica. + if num_worker_replicas != 1: + pytorchjob.spec.pytorch_replica_specs["Worker"] = models.V1ReplicaSpec( + replicas=num_worker_replicas, template=pod_template_spec, + ) + + # Create PyTorchJob + self.create(pytorchjob=pytorchjob, namespace=namespace) + + def get( + self, name=None, namespace=None, watch=False, timeout_seconds=600 + ): # pylint: disable=inconsistent-return-statements """ Get the pytorchjob :param name: existing pytorchjob name, if not defined, get all pytorchjobs in the namespace. @@ -90,17 +171,17 @@ def get(self, name=None, namespace=None, watch=False, if name: if watch: pytorchjob_watch( - name=name, - namespace=namespace, - timeout_seconds=timeout_seconds) + name=name, namespace=namespace, timeout_seconds=timeout_seconds + ) else: thread = self.custom_api.get_namespaced_custom_object( - constants.PYTORCHJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.PYTORCHJOB_VERSION, namespace, constants.PYTORCHJOB_PLURAL, name, - async_req=True) + async_req=True, + ) pytorchjob = None try: @@ -110,24 +191,28 @@ def get(self, name=None, namespace=None, watch=False, except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->get_namespaced_custom_object:\ - %s\n" % e) + %s\n" + % e + ) except Exception as e: raise RuntimeError( "There was a problem to get PyTorchJob {0} in namespace {1}. Exception: \ - {2} ".format(name, namespace, e)) + {2} ".format( + name, namespace, e + ) + ) return pytorchjob else: if watch: - pytorchjob_watch( - namespace=namespace, - timeout_seconds=timeout_seconds) + pytorchjob_watch(namespace=namespace, timeout_seconds=timeout_seconds) else: thread = self.custom_api.list_namespaced_custom_object( - constants.PYTORCHJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.PYTORCHJOB_VERSION, namespace, constants.PYTORCHJOB_PLURAL, - async_req=True) + async_req=True, + ) pytorchjob = None try: @@ -137,11 +222,16 @@ def get(self, name=None, namespace=None, watch=False, except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->list_namespaced_custom_object: \ - %s\n" % e) + %s\n" + % e + ) except Exception as e: raise RuntimeError( "There was a problem to List PyTorchJob in namespace {0}. \ - Exception: {1} ".format(namespace, e)) + Exception: {1} ".format( + namespace, e + ) + ) return pytorchjob @@ -158,48 +248,56 @@ def patch(self, name, pytorchjob, namespace=None): try: outputs = self.custom_api.patch_namespaced_custom_object( - constants.PYTORCHJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.PYTORCHJOB_VERSION, namespace, constants.PYTORCHJOB_PLURAL, name, - pytorchjob) + pytorchjob, + ) except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->patch_namespaced_custom_object:\ - %s\n" % e) + %s\n" + % e + ) return outputs - def delete(self, name, namespace=None): + def delete(self, name, namespace=utils.get_default_target_namespace()): """ - Delete the pytorchjob - :param name: pytorchjob name + Delete the PyTorchJob + :param name: PyTorchJob name :param namespace: defaults to current or default namespace - :return: """ - if namespace is None: - namespace = utils.get_default_target_namespace() try: - return self.custom_api.delete_namespaced_custom_object( - group=constants.PYTORCHJOB_GROUP, + self.custom_api.delete_namespaced_custom_object( + group=constants.KUBEFLOW_GROUP, version=constants.PYTORCHJOB_VERSION, namespace=namespace, plural=constants.PYTORCHJOB_PLURAL, name=name, - body=client.V1DeleteOptions()) + body=client.V1DeleteOptions(), + ) except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->delete_namespaced_custom_object:\ - %s\n" % e) - - def wait_for_job(self, name, # pylint: disable=inconsistent-return-statements - namespace=None, - watch=False, - timeout_seconds=600, - polling_interval=30, - status_callback=None): + %s\n" + % e + ) + + logging.info("PyTorchJob {} has been deleted".format(name)) + + def wait_for_job( + self, + name, # pylint: disable=inconsistent-return-statements + namespace=None, + watch=False, + timeout_seconds=600, + polling_interval=30, + status_callback=None, + ): """Wait for the specified job to finish. :param name: Name of the PyTorchJob. @@ -216,9 +314,8 @@ def wait_for_job(self, name, # pylint: disable=inconsistent-return-statements if watch: pytorchjob_watch( - name=name, - namespace=namespace, - timeout_seconds=timeout_seconds) + name=name, namespace=namespace, timeout_seconds=timeout_seconds + ) else: return self.wait_for_condition( name, @@ -226,14 +323,18 @@ def wait_for_job(self, name, # pylint: disable=inconsistent-return-statements namespace=namespace, timeout_seconds=timeout_seconds, polling_interval=polling_interval, - status_callback=status_callback) - - def wait_for_condition(self, name, - expected_condition, - namespace=None, - timeout_seconds=600, - polling_interval=30, - status_callback=None): + status_callback=status_callback, + ) + + def wait_for_condition( + self, + name, + expected_condition, + namespace=None, + timeout_seconds=600, + polling_interval=30, + status_callback=None, + ): """Waits until any of the specified conditions occur. :param name: Name of the job. @@ -272,7 +373,9 @@ def wait_for_condition(self, name, raise RuntimeError( "Timeout waiting for PyTorchJob {0} in namespace {1} to enter one of the " - "conditions {2}.".format(name, namespace, expected_condition), pytorchjob) + "conditions {2}.".format(name, namespace, expected_condition), + pytorchjob, + ) def get_job_status(self, name, namespace=None): """Returns PyTorchJob status, such as Running, Failed or Succeeded. @@ -308,8 +411,14 @@ def is_job_succeeded(self, name, namespace=None): pytorchjob_status = self.get_job_status(name, namespace=namespace) return pytorchjob_status == constants.JOB_STATUS_SUCCEEDED - def get_pod_names(self, name, namespace=None, master=False, # pylint: disable=inconsistent-return-statements - replica_type=None, replica_index=None): + def get_pod_names( + self, + name, + namespace=None, + master=False, # pylint: disable=inconsistent-return-statements + replica_type=None, + replica_index=None, + ): """ Get pod names of PyTorchJob. :param name: PyTorchJob name @@ -324,16 +433,18 @@ def get_pod_names(self, name, namespace=None, master=False, # pylint: disable=i if namespace is None: namespace = utils.get_default_target_namespace() - labels = utils.get_job_labels(name, master=master, - replica_type=replica_type, - replica_index=replica_index) + labels = utils.get_job_labels( + name, master=master, replica_type=replica_type, replica_index=replica_index + ) try: resp = self.core_api.list_namespaced_pod( - namespace, label_selector=utils.to_selector(labels)) + namespace, label_selector=utils.to_selector(labels) + ) except client.rest.ApiException as e: raise RuntimeError( - "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e) + "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e + ) pod_names = [] for pod in resp.items: @@ -341,13 +452,22 @@ def get_pod_names(self, name, namespace=None, master=False, # pylint: disable=i pod_names.append(pod.metadata.name) if not pod_names: - logging.warning("Not found Pods of the PyTorchJob %s with the labels %s.", name, labels) + logging.warning( + "Not found Pods of the PyTorchJob %s with the labels %s.", name, labels + ) else: return set(pod_names) - def get_logs(self, name, namespace=None, master=True, - replica_type=None, replica_index=None, - follow=False, container="pytorch"): + def get_logs( + self, + name, + namespace=None, + master=True, + replica_type=None, + replica_index=None, + follow=False, + container="pytorch", + ): """ Get training logs of the PyTorchJob. By default only get the logs of Pod that has labels 'job-role: master'. @@ -366,20 +486,28 @@ def get_logs(self, name, namespace=None, master=True, if namespace is None: namespace = utils.get_default_target_namespace() - pod_names = self.get_pod_names(name, namespace=namespace, - master=master, - replica_type=replica_type, - replica_index=replica_index) + pod_names = self.get_pod_names( + name, + namespace=namespace, + master=master, + replica_type=replica_type, + replica_index=replica_index, + ) if pod_names: for pod in pod_names: try: pod_logs = self.core_api.read_namespaced_pod_log( - pod, namespace, follow=follow, container=container) + pod, namespace, follow=follow, container=container + ) logging.info("The logs of Pod %s:\n %s", pod, pod_logs) except client.rest.ApiException as e: raise RuntimeError( - "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e) + "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" + % e + ) else: - raise RuntimeError("Not found Pods of the PyTorchJob {} " - "in namespace {}".format(name, namespace)) + raise RuntimeError( + "Not found Pods of the PyTorchJob {} " + "in namespace {}".format(name, namespace) + ) diff --git a/sdk/python/kubeflow/training/api/py_torch_job_watch.py b/sdk/python/kubeflow/training/api/py_torch_job_watch.py index 0dcda4b9e9..b112c18e2e 100644 --- a/sdk/python/kubeflow/training/api/py_torch_job_watch.py +++ b/sdk/python/kubeflow/training/api/py_torch_job_watch.py @@ -20,8 +20,9 @@ from kubeflow.training.utils import utils tbl = utils.TableLogger( - header="{:<30.30} {:<20.20} {:<30.30}".format('NAME', 'STATE', 'TIME'), - column_format="{:<30.30} {:<20.20} {:<30.30}") + header="{:<30.30} {:<20.20} {:<30.30}".format("NAME", "STATE", "TIME"), + column_format="{:<30.30} {:<20.20} {:<30.30}", +) @retrying.retry(wait_fixed=1000, stop_max_attempt_number=20) @@ -33,26 +34,30 @@ def watch(name=None, namespace=None, timeout_seconds=600): stream = k8s_watch.Watch().stream( client.CustomObjectsApi().list_namespaced_custom_object, - constants.PYTORCHJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.PYTORCHJOB_VERSION, namespace, constants.PYTORCHJOB_PLURAL, - timeout_seconds=timeout_seconds) + timeout_seconds=timeout_seconds, + ) for event in stream: - pytorchjob = event['object'] - pytorchjob_name = pytorchjob['metadata']['name'] + pytorchjob = event["object"] + pytorchjob_name = pytorchjob["metadata"]["name"] if name and name != pytorchjob_name: continue else: - status = '' - update_time = '' - last_condition = pytorchjob.get('status', {}).get('conditions', [])[-1] - status = last_condition.get('type', '') - update_time = last_condition.get('lastTransitionTime', '') + status = "" + update_time = "" + last_condition = pytorchjob.get("status", {}).get("conditions", [])[-1] + status = last_condition.get("type", "") + update_time = last_condition.get("lastTransitionTime", "") tbl(pytorchjob_name, status, update_time) if name == pytorchjob_name: - if status in [constants.JOB_STATUS_SUCCEEDED, constants.JOB_STATUS_FAILED]: + if status in [ + constants.JOB_STATUS_SUCCEEDED, + constants.JOB_STATUS_FAILED, + ]: break diff --git a/sdk/python/kubeflow/training/api/tf_job_client.py b/sdk/python/kubeflow/training/api/tf_job_client.py index 6e984795df..8ab8fa6215 100644 --- a/sdk/python/kubeflow/training/api/tf_job_client.py +++ b/sdk/python/kubeflow/training/api/tf_job_client.py @@ -16,16 +16,16 @@ import logging import threading import queue - +from typing import Callable, List, Dict, Any from kubernetes import client, config from kubernetes import watch as k8s_watch from kubeflow.training.constants import constants from kubeflow.training.utils import utils +from kubeflow.training import models +from kubeflow.training.api.tf_job_watch import watch as tfjob_watch -from .tf_job_watch import watch as tfjob_watch - -logging.basicConfig(format='%(message)s') +logging.basicConfig(format="%(message)s") logging.getLogger().setLevel(logging.INFO) @@ -39,7 +39,8 @@ def wrap_log_stream(q, stream): return except Exception as e: raise RuntimeError( - "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e) + "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e + ) def get_log_queue_pool(streams): @@ -52,13 +53,18 @@ def get_log_queue_pool(streams): class TFJobClient(object): - def __init__(self, config_file=None, context=None, # pylint: disable=too-many-arguments - client_configuration=None, persist_config=True): + def __init__( + self, + config_file=None, + context=None, # pylint: disable=too-many-arguments + client_configuration=None, + persist_config=True, + ): """ TFJob client constructor :param config_file: kubeconfig file, defaults to ~/.kube/config - :param context: kubernetes context - :param client_configuration: kubernetes configuration object + :param context: Kubernetes context + :param client_configuration: configuration for Kubernetes client :param persist_config: """ if config_file or not utils.is_running_in_k8s(): @@ -66,40 +72,129 @@ def __init__(self, config_file=None, context=None, # pylint: disable=too-many-a config_file=config_file, context=context, client_configuration=client_configuration, - persist_config=persist_config) + persist_config=persist_config, + ) else: config.load_incluster_config() self.custom_api = client.CustomObjectsApi() self.core_api = client.CoreV1Api() - def create(self, tfjob, namespace=None): + def create(self, tfjob, namespace=utils.get_default_target_namespace()): """ Create the TFJob - :param tfjob: tfjob object + :param tfjob: TFJob object :param namespace: defaults to current or default namespace - :return: created tfjob """ - if namespace is None: - namespace = utils.set_tfjob_namespace(tfjob) - try: - outputs = self.custom_api.create_namespaced_custom_object( - constants.TFJOB_GROUP, + self.custom_api.create_namespaced_custom_object( + constants.KUBEFLOW_GROUP, constants.TFJOB_VERSION, namespace, constants.TFJOB_PLURAL, - tfjob) + tfjob, + ) + except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->create_namespaced_custom_object:\ - %s\n" % e) - - return outputs + %s\n" + % e + ) + + logging.info("TFJob {} has been created".format(tfjob.metadata.name)) + + def create_tfjob_from_func( + self, + name: str, + func: Callable, + parameters: Dict[str, Any] = None, + base_image: str = constants.TFJOB_BASE_IMAGE, + namespace: str = utils.get_default_target_namespace(), + num_chief_replicas: int = None, + num_ps_replicas: int = None, + num_worker_replicas: int = None, + packages_to_install: List[str] = None, + pip_index_url: str = "https://pypi.org/simple", + ): + """Create TFJob from the function. + + Args: + name: Name for the TFJob. + func: Function that TFJob uses to train the model. This function + must be Callable. Optionally, this function might have one dict + argument to define input parameters for the function. + parameters: Dict of input parameters that training function might receive. + base_image: Image to use when executing the training function. + namespace: Namespace for the TFJob. + num_chief_replicas: Number of Chief replicas for the TFJob. Number + of Chief replicas can't be more than 1. + num_ps_replicas: Number of Parameter Server replicas for the TFJob. + num_worker_replicas: Number of Worker replicas for the TFJob. + packages_to_install: List of Python packages to install in addition + to the base image packages. These packages are installed before + executing the objective function. + pip_index_url: The PyPI url from which to install Python packages. + """ - def get(self, name=None, namespace=None, watch=False, - timeout_seconds=600): # pylint: disable=inconsistent-return-statements + # Check if at least one replica is set. + if ( + num_chief_replicas is None + and num_ps_replicas is None + and num_worker_replicas is None + ): + raise ValueError("At least one replica for TFJob must be set") + + # Check if function is callable. + if not callable(func): + raise ValueError( + f"Training function must be callable, got function type: {type(func)}" + ) + + # Get TFJob Pod template spec. + pod_template_spec = utils.get_pod_template_spec( + func=func, + parameters=parameters, + base_image=base_image, + container_name="tensorflow", + packages_to_install=packages_to_install, + pip_index_url=pip_index_url, + ) + + # Create TFJob template. + tfjob = models.KubeflowOrgV1TFJob( + api_version=f"{constants.KUBEFLOW_GROUP}/{constants.TFJOB_VERSION}", + kind=constants.TFJOB_KIND, + metadata=client.V1ObjectMeta(name=name, namespace=namespace), + spec=models.KubeflowOrgV1TFJobSpec( + run_policy=models.V1RunPolicy(clean_pod_policy=None), + tf_replica_specs={}, + ), + ) + + # Add Chief, PS, and Worker replicas to the TFJob. + if num_chief_replicas is not None: + tfjob.spec.tf_replica_specs["Chief"] = models.V1ReplicaSpec( + replicas=num_chief_replicas, template=pod_template_spec, + ) + + if num_ps_replicas is not None: + tfjob.spec.tf_replica_specs["PS"] = models.V1ReplicaSpec( + replicas=num_ps_replicas, template=pod_template_spec, + ) + + if num_worker_replicas is not None: + tfjob.spec.tf_replica_specs["Worker"] = models.V1ReplicaSpec( + replicas=num_worker_replicas, template=pod_template_spec, + ) + + # Create TFJob. + self.create(tfjob=tfjob, namespace=namespace) + + def get( + self, name=None, namespace=None, watch=False, timeout_seconds=600 + ): # pylint: disable=inconsistent-return-statements """ Get the tfjob :param name: existing tfjob name, if not defined, the get all tfjobs in the namespace. @@ -114,17 +209,17 @@ def get(self, name=None, namespace=None, watch=False, if name: if watch: tfjob_watch( - name=name, - namespace=namespace, - timeout_seconds=timeout_seconds) + name=name, namespace=namespace, timeout_seconds=timeout_seconds + ) else: thread = self.custom_api.get_namespaced_custom_object( - constants.TFJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.TFJOB_VERSION, namespace, constants.TFJOB_PLURAL, name, - async_req=True) + async_req=True, + ) tfjob = None try: @@ -134,24 +229,28 @@ def get(self, name=None, namespace=None, watch=False, except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->get_namespaced_custom_object:\ - %s\n" % e) + %s\n" + % e + ) except Exception as e: raise RuntimeError( "There was a problem to get TFJob {0} in namespace {1}. Exception: \ - {2} ".format(name, namespace, e)) + {2} ".format( + name, namespace, e + ) + ) return tfjob else: if watch: - tfjob_watch( - namespace=namespace, - timeout_seconds=timeout_seconds) + tfjob_watch(namespace=namespace, timeout_seconds=timeout_seconds) else: thread = self.custom_api.list_namespaced_custom_object( - constants.TFJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.TFJOB_VERSION, namespace, constants.TFJOB_PLURAL, - async_req=True) + async_req=True, + ) tfjobs = None try: @@ -161,11 +260,16 @@ def get(self, name=None, namespace=None, watch=False, except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->list_namespaced_custom_object:\ - %s\n" % e) + %s\n" + % e + ) except Exception as e: raise RuntimeError( "There was a problem to list TFJobs in namespace {0}. \ - Exception: {1} ".format(namespace, e)) + Exception: {1} ".format( + namespace, e + ) + ) return tfjobs def patch(self, name, tfjob, namespace=None): @@ -181,48 +285,56 @@ def patch(self, name, tfjob, namespace=None): try: outputs = self.custom_api.patch_namespaced_custom_object( - constants.TFJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.TFJOB_VERSION, namespace, constants.TFJOB_PLURAL, name, - tfjob) + tfjob, + ) except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->patch_namespaced_custom_object:\ - %s\n" % e) + %s\n" + % e + ) return outputs - def delete(self, name, namespace=None): + def delete(self, name, namespace=utils.get_default_target_namespace()): """ - Delete the tfjob - :param name: tfjob name + Delete the TFJob + :param name: TFJob name :param namespace: defaults to current or default namespace - :return: """ - if namespace is None: - namespace = utils.get_default_target_namespace() try: - return self.custom_api.delete_namespaced_custom_object( - group=constants.TFJOB_GROUP, + self.custom_api.delete_namespaced_custom_object( + group=constants.KUBEFLOW_GROUP, version=constants.TFJOB_VERSION, namespace=namespace, plural=constants.TFJOB_PLURAL, name=name, - body=client.V1DeleteOptions()) + body=client.V1DeleteOptions(), + ) except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->delete_namespaced_custom_object:\ - %s\n" % e) - - def wait_for_job(self, name, # pylint: disable=inconsistent-return-statements - namespace=None, - timeout_seconds=600, - polling_interval=30, - watch=False, - status_callback=None): + %s\n" + % e + ) + + logging.info("TFJob {} has been deleted".format(name)) + + def wait_for_job( + self, + name, # pylint: disable=inconsistent-return-statements + namespace=None, + timeout_seconds=600, + polling_interval=30, + watch=False, + status_callback=None, + ): """Wait for the specified job to finish. :param name: Name of the TfJob. @@ -239,10 +351,7 @@ def wait_for_job(self, name, # pylint: disable=inconsistent-return-statements namespace = utils.get_default_target_namespace() if watch: - tfjob_watch( - name=name, - namespace=namespace, - timeout_seconds=timeout_seconds) + tfjob_watch(name=name, namespace=namespace, timeout_seconds=timeout_seconds) else: return self.wait_for_condition( name, @@ -250,14 +359,18 @@ def wait_for_job(self, name, # pylint: disable=inconsistent-return-statements namespace=namespace, timeout_seconds=timeout_seconds, polling_interval=polling_interval, - status_callback=status_callback) - - def wait_for_condition(self, name, - expected_condition, - namespace=None, - timeout_seconds=600, - polling_interval=30, - status_callback=None): + status_callback=status_callback, + ) + + def wait_for_condition( + self, + name, + expected_condition, + namespace=None, + timeout_seconds=600, + polling_interval=30, + status_callback=None, + ): """Waits until any of the specified conditions occur. :param name: Name of the job. @@ -296,7 +409,9 @@ def wait_for_condition(self, name, raise RuntimeError( "Timeout waiting for TFJob {0} in namespace {1} to enter one of the " - "conditions {2}.".format(name, namespace, expected_condition), tfjob) + "conditions {2}.".format(name, namespace, expected_condition), + tfjob, + ) def get_job_status(self, name, namespace=None): """Returns TFJob status, such as Running, Failed or Succeeded. @@ -332,8 +447,14 @@ def is_job_succeeded(self, name, namespace=None): tfjob_status = self.get_job_status(name, namespace=namespace) return tfjob_status.lower() == "succeeded" - def get_pod_names(self, name, namespace=None, master=False, # pylint: disable=inconsistent-return-statements - replica_type=None, replica_index=None): + def get_pod_names( + self, + name, + namespace=None, + master=False, # pylint: disable=inconsistent-return-statements + replica_type=None, + replica_index=None, + ): """ Get pod names of TFJob. :param name: tfjob name @@ -348,16 +469,18 @@ def get_pod_names(self, name, namespace=None, master=False, # pylint: disable=i if namespace is None: namespace = utils.get_default_target_namespace() - labels = utils.get_job_labels(name, master=master, - replica_type=replica_type, - replica_index=replica_index) + labels = utils.get_job_labels( + name, master=master, replica_type=replica_type, replica_index=replica_index + ) try: resp = self.core_api.list_namespaced_pod( - namespace, label_selector=utils.to_selector(labels)) + namespace, label_selector=utils.to_selector(labels) + ) except client.rest.ApiException as e: raise RuntimeError( - "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e) + "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e + ) pod_names = [] for pod in resp.items: @@ -365,13 +488,22 @@ def get_pod_names(self, name, namespace=None, master=False, # pylint: disable=i pod_names.append(pod.metadata.name) if not pod_names: - logging.warning("Not found Pods of the TFJob %s with the labels %s.", name, labels) + logging.warning( + "Not found Pods of the TFJob %s with the labels %s.", name, labels + ) else: return set(pod_names) - def get_logs(self, name, namespace=None, master=True, - replica_type=None, replica_index=None, - follow=False, container="tensorflow"): + def get_logs( + self, + name, + namespace=None, + master=True, + replica_type=None, + replica_index=None, + follow=False, + container="tensorflow", + ): """ Get training logs of the TFJob. By default only get the logs of Pod that has labels 'job-role: master'. @@ -390,16 +522,27 @@ def get_logs(self, name, namespace=None, master=True, if namespace is None: namespace = utils.get_default_target_namespace() - pod_names = list(self.get_pod_names(name, namespace=namespace, - master=master, - replica_type=replica_type, - replica_index=replica_index)) + pod_names = list( + self.get_pod_names( + name, + namespace=namespace, + master=master, + replica_type=replica_type, + replica_index=replica_index, + ) + ) if pod_names: if follow: log_streams = [] for pod in pod_names: - log_streams.append(k8s_watch.Watch().stream(self.core_api.read_namespaced_pod_log, - name=pod, namespace=namespace, container=container)) + log_streams.append( + k8s_watch.Watch().stream( + self.core_api.read_namespaced_pod_log, + name=pod, + namespace=namespace, + container=container, + ) + ) finished = [False for _ in log_streams] # create thread and queue per stream, for non-blocking iteration @@ -425,11 +568,17 @@ def get_logs(self, name, namespace=None, master=True, else: for pod in pod_names: try: - pod_logs = self.core_api.read_namespaced_pod_log(pod, namespace, container=container) + pod_logs = self.core_api.read_namespaced_pod_log( + pod, namespace, container=container + ) logging.info("The logs of Pod %s:\n %s", pod, pod_logs) except client.rest.ApiException as e: raise RuntimeError( - "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e) + "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" + % e + ) else: - raise RuntimeError("Not found Pods of the TFJob {} " - "in namespace {}".format(name, namespace)) + raise RuntimeError( + "Not found Pods of the TFJob {} " + "in namespace {}".format(name, namespace) + ) diff --git a/sdk/python/kubeflow/training/api/tf_job_watch.py b/sdk/python/kubeflow/training/api/tf_job_watch.py index ef890e35ed..55e8ff88c2 100644 --- a/sdk/python/kubeflow/training/api/tf_job_watch.py +++ b/sdk/python/kubeflow/training/api/tf_job_watch.py @@ -20,8 +20,9 @@ from kubeflow.training.utils import utils tbl = utils.TableLogger( - header="{:<30.30} {:<20.20} {:<30.30}".format('NAME', 'STATE', 'TIME'), - column_format="{:<30.30} {:<20.20} {:<30.30}") + header="{:<30.30} {:<20.20} {:<30.30}".format("NAME", "STATE", "TIME"), + column_format="{:<30.30} {:<20.20} {:<30.30}", +) @retrying.retry(wait_fixed=1000, stop_max_attempt_number=20) @@ -33,26 +34,30 @@ def watch(name=None, namespace=None, timeout_seconds=600): stream = k8s_watch.Watch().stream( client.CustomObjectsApi().list_namespaced_custom_object, - constants.TFJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.TFJOB_VERSION, namespace, constants.TFJOB_PLURAL, - timeout_seconds=timeout_seconds) + timeout_seconds=timeout_seconds, + ) for event in stream: - tfjob = event['object'] - tfjob_name = tfjob['metadata']['name'] + tfjob = event["object"] + tfjob_name = tfjob["metadata"]["name"] if name and name != tfjob_name: continue else: - status = '' - update_time = '' - last_condition = tfjob.get('status', {}).get('conditions', [{}])[-1] - status = last_condition.get('type', '') - update_time = last_condition.get('lastTransitionTime', '') + status = "" + update_time = "" + last_condition = tfjob.get("status", {}).get("conditions", [{}])[-1] + status = last_condition.get("type", "") + update_time = last_condition.get("lastTransitionTime", "") tbl(tfjob_name, status, update_time) if name == tfjob_name: - if status in [constants.JOB_STATUS_SUCCEEDED, constants.JOB_STATUS_FAILED]: + if status in [ + constants.JOB_STATUS_SUCCEEDED, + constants.JOB_STATUS_FAILED, + ]: break diff --git a/sdk/python/kubeflow/training/api/xgboost_job_client.py b/sdk/python/kubeflow/training/api/xgboost_job_client.py index 01caa4ace6..1e08e4327f 100644 --- a/sdk/python/kubeflow/training/api/xgboost_job_client.py +++ b/sdk/python/kubeflow/training/api/xgboost_job_client.py @@ -25,7 +25,7 @@ from .xgboost_job_watch import watch as xgboostjob_watch -logging.basicConfig(format='%(message)s') +logging.basicConfig(format="%(message)s") logging.getLogger().setLevel(logging.INFO) @@ -39,7 +39,8 @@ def wrap_log_stream(q, stream): return except Exception as e: raise RuntimeError( - "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e) + "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e + ) def get_log_queue_pool(streams): @@ -52,8 +53,13 @@ def get_log_queue_pool(streams): class XGBoostJobClient(object): - def __init__(self, config_file=None, context=None, # pylint: disable=too-many-arguments - client_configuration=None, persist_config=True): + def __init__( + self, + config_file=None, + context=None, # pylint: disable=too-many-arguments + client_configuration=None, + persist_config=True, + ): """ XGBoostJob client constructor :param config_file: kubeconfig file, defaults to ~/.kube/config @@ -66,7 +72,8 @@ def __init__(self, config_file=None, context=None, # pylint: disable=too-many-a config_file=config_file, context=context, client_configuration=client_configuration, - persist_config=persist_config) + persist_config=persist_config, + ) else: config.load_incluster_config() @@ -86,20 +93,24 @@ def create(self, xgboostjob, namespace=None): try: outputs = self.custom_api.create_namespaced_custom_object( - constants.XGBOOSTJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.XGBOOSTJOB_VERSION, namespace, constants.XGBOOSTJOB_PLURAL, - xgboostjob) + xgboostjob, + ) except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->create_namespaced_custom_object:\ - %s\n" % e) + %s\n" + % e + ) return outputs - def get(self, name=None, namespace=None, watch=False, - timeout_seconds=600): # pylint: disable=inconsistent-return-statements + def get( + self, name=None, namespace=None, watch=False, timeout_seconds=600 + ): # pylint: disable=inconsistent-return-statements """ Get the xgboostjob :param name: existing xgboostjob name, if not defined, the get all xgboostjobs in the namespace. @@ -114,17 +125,17 @@ def get(self, name=None, namespace=None, watch=False, if name: if watch: xgboostjob_watch( - name=name, - namespace=namespace, - timeout_seconds=timeout_seconds) + name=name, namespace=namespace, timeout_seconds=timeout_seconds + ) else: thread = self.custom_api.get_namespaced_custom_object( - constants.XGBOOSTJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.XGBOOSTJOB_VERSION, namespace, constants.XGBOOSTJOB_PLURAL, name, - async_req=True) + async_req=True, + ) xgboostjob = None try: @@ -134,24 +145,28 @@ def get(self, name=None, namespace=None, watch=False, except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->get_namespaced_custom_object:\ - %s\n" % e) + %s\n" + % e + ) except Exception as e: raise RuntimeError( "There was a problem to get XGBoostJob {0} in namespace {1}. Exception: \ - {2} ".format(name, namespace, e)) + {2} ".format( + name, namespace, e + ) + ) return xgboostjob else: if watch: - xgboostjob_watch( - namespace=namespace, - timeout_seconds=timeout_seconds) + xgboostjob_watch(namespace=namespace, timeout_seconds=timeout_seconds) else: thread = self.custom_api.list_namespaced_custom_object( - constants.XGBOOSTJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.XGBOOSTJOB_VERSION, namespace, constants.XGBOOSTJOB_PLURAL, - async_req=True) + async_req=True, + ) xgboostjobs = None try: @@ -161,11 +176,16 @@ def get(self, name=None, namespace=None, watch=False, except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->list_namespaced_custom_object:\ - %s\n" % e) + %s\n" + % e + ) except Exception as e: raise RuntimeError( "There was a problem to list XGBoostJobs in namespace {0}. \ - Exception: {1} ".format(namespace, e)) + Exception: {1} ".format( + namespace, e + ) + ) return xgboostjobs def patch(self, name, xgboostjob, namespace=None): @@ -181,16 +201,19 @@ def patch(self, name, xgboostjob, namespace=None): try: outputs = self.custom_api.patch_namespaced_custom_object( - constants.XGBOOSTJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.XGBOOSTJOB_VERSION, namespace, constants.XGBOOSTJOB_PLURAL, name, - xgboostjob) + xgboostjob, + ) except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->patch_namespaced_custom_object:\ - %s\n" % e) + %s\n" + % e + ) return outputs @@ -206,23 +229,29 @@ def delete(self, name, namespace=None): try: return self.custom_api.delete_namespaced_custom_object( - group=constants.XGBOOSTJOB_GROUP, + group=constants.KUBEFLOW_GROUP, version=constants.XGBOOSTJOB_VERSION, namespace=namespace, plural=constants.XGBOOSTJOB_PLURAL, name=name, - body=client.V1DeleteOptions()) + body=client.V1DeleteOptions(), + ) except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->delete_namespaced_custom_object:\ - %s\n" % e) - - def wait_for_job(self, name, # pylint: disable=inconsistent-return-statements - namespace=None, - timeout_seconds=600, - polling_interval=30, - watch=False, - status_callback=None): + %s\n" + % e + ) + + def wait_for_job( + self, + name, # pylint: disable=inconsistent-return-statements + namespace=None, + timeout_seconds=600, + polling_interval=30, + watch=False, + status_callback=None, + ): """Wait for the specified job to finish. :param name: Name of the TfJob. @@ -240,9 +269,8 @@ def wait_for_job(self, name, # pylint: disable=inconsistent-return-statements if watch: xgboostjob_watch( - name=name, - namespace=namespace, - timeout_seconds=timeout_seconds) + name=name, namespace=namespace, timeout_seconds=timeout_seconds + ) else: return self.wait_for_condition( name, @@ -250,14 +278,18 @@ def wait_for_job(self, name, # pylint: disable=inconsistent-return-statements namespace=namespace, timeout_seconds=timeout_seconds, polling_interval=polling_interval, - status_callback=status_callback) - - def wait_for_condition(self, name, - expected_condition, - namespace=None, - timeout_seconds=600, - polling_interval=30, - status_callback=None): + status_callback=status_callback, + ) + + def wait_for_condition( + self, + name, + expected_condition, + namespace=None, + timeout_seconds=600, + polling_interval=30, + status_callback=None, + ): """Waits until any of the specified conditions occur. :param name: Name of the job. @@ -296,7 +328,9 @@ def wait_for_condition(self, name, raise RuntimeError( "Timeout waiting for XGBoostJob {0} in namespace {1} to enter one of the " - "conditions {2}.".format(name, namespace, expected_condition), xgboostjob) + "conditions {2}.".format(name, namespace, expected_condition), + xgboostjob, + ) def get_job_status(self, name, namespace=None): """Returns XGBoostJob status, such as Running, Failed or Succeeded. @@ -332,8 +366,14 @@ def is_job_succeeded(self, name, namespace=None): xgboostjob_status = self.get_job_status(name, namespace=namespace) return xgboostjob_status.lower() == "succeeded" - def get_pod_names(self, name, namespace=None, master=False, # pylint: disable=inconsistent-return-statements - replica_type=None, replica_index=None): + def get_pod_names( + self, + name, + namespace=None, + master=False, # pylint: disable=inconsistent-return-statements + replica_type=None, + replica_index=None, + ): """ Get pod names of XGBoostJob. :param name: xgboostjob name @@ -348,16 +388,18 @@ def get_pod_names(self, name, namespace=None, master=False, # pylint: disable=i if namespace is None: namespace = utils.get_default_target_namespace() - labels = utils.get_job_labels(name, master=master, - replica_type=replica_type, - replica_index=replica_index) + labels = utils.get_job_labels( + name, master=master, replica_type=replica_type, replica_index=replica_index + ) try: resp = self.core_api.list_namespaced_pod( - namespace, label_selector=utils.to_selector(labels)) + namespace, label_selector=utils.to_selector(labels) + ) except client.rest.ApiException as e: raise RuntimeError( - "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e) + "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e + ) pod_names = [] for pod in resp.items: @@ -365,13 +407,22 @@ def get_pod_names(self, name, namespace=None, master=False, # pylint: disable=i pod_names.append(pod.metadata.name) if not pod_names: - logging.warning("Not found Pods of the XGBoostJob %s with the labels %s.", name, labels) + logging.warning( + "Not found Pods of the XGBoostJob %s with the labels %s.", name, labels + ) else: return set(pod_names) - def get_logs(self, name, namespace=None, master=True, - replica_type=None, replica_index=None, - follow=False, container="xgboost"): + def get_logs( + self, + name, + namespace=None, + master=True, + replica_type=None, + replica_index=None, + follow=False, + container="xgboost", + ): """ Get training logs of the XGBoostJob. By default only get the logs of Pod that has labels 'job-role: master'. @@ -390,16 +441,27 @@ def get_logs(self, name, namespace=None, master=True, if namespace is None: namespace = utils.get_default_target_namespace() - pod_names = list(self.get_pod_names(name, namespace=namespace, - master=master, - replica_type=replica_type, - replica_index=replica_index)) + pod_names = list( + self.get_pod_names( + name, + namespace=namespace, + master=master, + replica_type=replica_type, + replica_index=replica_index, + ) + ) if pod_names: if follow: log_streams = [] for pod in pod_names: - log_streams.append(k8s_watch.Watch().stream(self.core_api.read_namespaced_pod_log, - name=pod, namespace=namespace, container=container)) + log_streams.append( + k8s_watch.Watch().stream( + self.core_api.read_namespaced_pod_log, + name=pod, + namespace=namespace, + container=container, + ) + ) finished = [False for _ in log_streams] # create thread and queue per stream, for non-blocking iteration @@ -425,11 +487,18 @@ def get_logs(self, name, namespace=None, master=True, else: for pod in pod_names: try: - pod_logs = self.core_api.read_namespaced_pod_log(pod, namespace, container=container) + pod_logs = self.core_api.read_namespaced_pod_log( + pod, namespace, container=container + ) logging.info("The logs of Pod %s:\n %s", pod, pod_logs) except client.rest.ApiException as e: raise RuntimeError( - "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e) + "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" + % e + ) else: - raise RuntimeError("Not found Pods of the XGBoostJob {} " - "in namespace {}".format(name, namespace)) + raise RuntimeError( + "Not found Pods of the XGBoostJob {} " + "in namespace {}".format(name, namespace) + ) + diff --git a/sdk/python/kubeflow/training/api/xgboost_job_watch.py b/sdk/python/kubeflow/training/api/xgboost_job_watch.py index a2b8104fa3..64d7ebca39 100644 --- a/sdk/python/kubeflow/training/api/xgboost_job_watch.py +++ b/sdk/python/kubeflow/training/api/xgboost_job_watch.py @@ -20,8 +20,9 @@ from kubeflow.training.utils import utils tbl = utils.TableLogger( - header="{:<30.30} {:<20.20} {:<30.30}".format('NAME', 'STATE', 'TIME'), - column_format="{:<30.30} {:<20.20} {:<30.30}") + header="{:<30.30} {:<20.20} {:<30.30}".format("NAME", "STATE", "TIME"), + column_format="{:<30.30} {:<20.20} {:<30.30}", +) @retrying.retry(wait_fixed=1000, stop_max_attempt_number=20) @@ -33,26 +34,30 @@ def watch(name=None, namespace=None, timeout_seconds=600): stream = k8s_watch.Watch().stream( client.CustomObjectsApi().list_namespaced_custom_object, - constants.XGBOOSTJOB_GROUP, + constants.KUBEFLOW_GROUP, constants.XGBOOSTJOB_VERSION, namespace, constants.XGBOOSTJOB_PLURAL, - timeout_seconds=timeout_seconds) + timeout_seconds=timeout_seconds, + ) for event in stream: - xgboostjob = event['object'] - xgboostjob_name = xgboostjob['metadata']['name'] + xgboostjob = event["object"] + xgboostjob_name = xgboostjob["metadata"]["name"] if name and name != xgboostjob_name: continue else: - status = '' - update_time = '' - last_condition = xgboostjob.get('status', {}).get('conditions', [{}])[-1] - status = last_condition.get('type', '') - update_time = last_condition.get('lastTransitionTime', '') + status = "" + update_time = "" + last_condition = xgboostjob.get("status", {}).get("conditions", [{}])[-1] + status = last_condition.get("type", "") + update_time = last_condition.get("lastTransitionTime", "") tbl(xgboostjob_name, status, update_time) if name == xgboostjob_name: - if status in [constants.JOB_STATUS_SUCCEEDED, constants.JOB_STATUS_FAILED]: + if status in [ + constants.JOB_STATUS_SUCCEEDED, + constants.JOB_STATUS_FAILED, + ]: break diff --git a/sdk/python/kubeflow/training/constants/constants.py b/sdk/python/kubeflow/training/constants/constants.py index 96ebaebc16..5288bce850 100644 --- a/sdk/python/kubeflow/training/constants/constants.py +++ b/sdk/python/kubeflow/training/constants/constants.py @@ -13,58 +13,63 @@ # limitations under the License. import os +from typing import Final # General constants # How long to wait in seconds for requests to the ApiServer -APISERVER_TIMEOUT = 120 +APISERVER_TIMEOUT: Final[int] = 120 +KUBEFLOW_GROUP: Final[str] = "kubeflow.org" # TFJob K8S constants -TFJOB_GROUP = 'kubeflow.org' -TFJOB_KIND = 'TFJob' -TFJOB_PLURAL = 'tfjobs' -TFJOB_VERSION = os.environ.get('TFJOB_VERSION', 'v1') +TFJOB_KIND: Final[str] = "TFJob" +TFJOB_PLURAL: Final[str] = "tfjobs" +TFJOB_VERSION: Final[str] = os.environ.get("TFJOB_VERSION", "v1") -TFJOB_LOGLEVEL = os.environ.get('TFJOB_LOGLEVEL', 'INFO').upper() +TFJOB_LOGLEVEL: Final[str] = os.environ.get("TFJOB_LOGLEVEL", "INFO").upper() + +TFJOB_BASE_IMAGE: Final[str] = "docker.io/tensorflow/tensorflow:2.9.1" +TFJOB_BASE_IMAGE_GPU: Final[str] = "docker.io/tensorflow/tensorflow:2.9.1-gpu" # Job Label Names -JOB_GROUP_LABEL = 'group-name' -JOB_NAME_LABEL = 'training.kubeflow.org/job-name' -JOB_TYPE_LABEL = 'training.kubeflow.org/replica-type' -JOB_INDEX_LABEL = 'training.kubeflow.org/replica-index' -JOB_ROLE_LABEL = 'training.kubeflow.org/job-role' +JOB_GROUP_LABEL: Final[str] = "group-name" +JOB_NAME_LABEL: Final[str] = "training.kubeflow.org/job-name" +JOB_TYPE_LABEL: Final[str] = "training.kubeflow.org/replica-type" +JOB_INDEX_LABEL: Final[str] = "training.kubeflow.org/replica-index" +JOB_ROLE_LABEL: Final[str] = "training.kubeflow.org/job-role" +JOB_ROLE_MASTER: Final[str] = "master" -JOB_STATUS_SUCCEEDED = 'Succeeded' -JOB_STATUS_FAILED = 'Failed' -JOB_STATUS_RUNNING = 'Running' +JOB_STATUS_SUCCEEDED: Final[str] = "Succeeded" +JOB_STATUS_FAILED: Final[str] = "Failed" +JOB_STATUS_RUNNING: Final[str] = "Running" # PyTorchJob K8S constants -PYTORCHJOB_GROUP = 'kubeflow.org' -PYTORCHJOB_KIND = 'PyTorchJob' -PYTORCHJOB_PLURAL = 'pytorchjobs' -PYTORCHJOB_VERSION = os.environ.get('PYTORCHJOB_VERSION', 'v1') +PYTORCHJOB_KIND: Final[str] = "PyTorchJob" +PYTORCHJOB_PLURAL: Final[str] = "pytorchjobs" +PYTORCHJOB_VERSION: Final[str] = os.environ.get("PYTORCHJOB_VERSION", "v1") + +PYTORCH_LOGLEVEL: Final[str] = os.environ.get("PYTORCHJOB_LOGLEVEL", "INFO").upper() -PYTORCH_LOGLEVEL = os.environ.get('PYTORCHJOB_LOGLEVEL', 'INFO').upper() +PYTORCHJOB_BASE_IMAGE: Final[ + str +] = "docker.io/pytorch/pytorch:1.12.1-cuda11.3-cudnn8-runtime" # XGBoostJob K8S constants -XGBOOSTJOB_GROUP = 'kubeflow.org' -XGBOOSTJOB_KIND = 'XGBoostJob' -XGBOOSTJOB_PLURAL = 'xgboostjobs' -XGBOOSTJOB_VERSION = os.environ.get('XGBOOSTJOB_VERSION', 'v1') +XGBOOSTJOB_KIND: Final[str] = "XGBoostJob" +XGBOOSTJOB_PLURAL: Final[str] = "xgboostjobs" +XGBOOSTJOB_VERSION: Final[str] = os.environ.get("XGBOOSTJOB_VERSION", "v1") -XGBOOST_LOGLEVEL = os.environ.get('XGBOOSTJOB_LOGLEVEL', 'INFO').upper() +XGBOOST_LOGLEVEL: Final[str] = os.environ.get("XGBOOSTJOB_LOGLEVEL", "INFO").upper() # MPIJob K8S constants -MPIJOB_GROUP = 'kubeflow.org' -MPIJOB_KIND = 'MPIJob' -MPIJOB_PLURAL = 'mpijobs' -MPIJOB_VERSION = os.environ.get('MPIJOB_VERSION', 'v1') +MPIJOB_KIND: Final[str] = "MPIJob" +MPIJOB_PLURAL: Final[str] = "mpijobs" +MPIJOB_VERSION: Final[str] = os.environ.get("MPIJOB_VERSION", "v1") -MPI_LOGLEVEL = os.environ.get('MPIJOB_LOGLEVEL', 'INFO').upper() +MPI_LOGLEVEL: Final[str] = os.environ.get("MPIJOB_LOGLEVEL", "INFO").upper() # MXNETJob K8S constants -MXJOB_GROUP = 'kubeflow.org' -MXJOB_KIND = 'MXJob' -MXJOB_PLURAL = 'mxjobs' -MXJOB_VERSION = os.environ.get('MXJOB_VERSION', 'v1') +MXJOB_KIND: Final[str] = "MXJob" +MXJOB_PLURAL: Final[str] = "mxjobs" +MXJOB_VERSION: Final[str] = os.environ.get("MXJOB_VERSION", "v1") -MX_LOGLEVEL = os.environ.get('MXJOB_LOGLEVEL', 'INFO').upper() +MX_LOGLEVEL: Final[str] = os.environ.get("MXJOB_LOGLEVEL", "INFO").upper() diff --git a/sdk/python/kubeflow/training/utils/utils.py b/sdk/python/kubeflow/training/utils/utils.py index 2fd41f4ddb..06c2d49e5f 100644 --- a/sdk/python/kubeflow/training/utils/utils.py +++ b/sdk/python/kubeflow/training/utils/utils.py @@ -13,22 +13,26 @@ # limitations under the License. import os +import textwrap +import inspect +from typing import Callable, List, Dict, Any +from kubernetes import client from kubeflow.training.constants import constants def is_running_in_k8s(): - return os.path.isdir('/var/run/secrets/kubernetes.io/') + return os.path.isdir("/var/run/secrets/kubernetes.io/") def get_current_k8s_namespace(): - with open('/var/run/secrets/kubernetes.io/serviceaccount/namespace', 'r') as f: + with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") as f: return f.readline() def get_default_target_namespace(): if not is_running_in_k8s(): - return 'default' + return "default" return get_current_k8s_namespace() @@ -43,21 +47,25 @@ def set_pytorchjob_namespace(pytorchjob): namespace = pytorchjob_namespace or get_default_target_namespace() return namespace + def set_xgboostjob_namespace(xgboostjob): xgboostjob_namespace = xgboostjob.metadata.namespace namespace = xgboostjob_namespace or get_default_target_namespace() return namespace + def set_mpijob_namespace(mpijob): mpijob_namespace = mpijob.metadata.namespace namespace = mpijob_namespace or get_default_target_namespace() return namespace + def set_mxjob_namespace(mxjob): mxjob_namespace = mxjob.metadata.namespace namespace = mxjob_namespace or get_default_target_namespace() return namespace + def get_job_labels(name, master=False, replica_type=None, replica_index=None): """ Get labels according to specified flags. @@ -68,11 +76,11 @@ def get_job_labels(name, master=False, replica_type=None, replica_index=None): :return: Dict: Labels """ labels = { - constants.JOB_GROUP_LABEL: 'kubeflow.org', + constants.JOB_GROUP_LABEL: constants.KUBEFLOW_GROUP, constants.JOB_NAME_LABEL: name, } if master: - labels[constants.JOB_ROLE_LABEL] = 'master' + labels[constants.JOB_ROLE_LABEL] = constants.JOB_ROLE_MASTER if replica_type: labels[constants.JOB_TYPE_LABEL] = str.lower(replica_type) @@ -94,6 +102,96 @@ def to_selector(labels): return ",".join(parts) +def get_script_for_python_packages(packages_to_install, pip_index_url): + packages_str = " ".join([str(package) for package in packages_to_install]) + + script_for_python_packages = textwrap.dedent( + f""" + if ! [ -x "$(command -v pip)" ]; then + python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip + fi + + PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet \ + --no-warn-script-location --index-url {pip_index_url} {packages_str} + """ + ) + + return script_for_python_packages + + +def get_pod_template_spec( + func: Callable, + parameters: Dict[str, Any], + base_image: str, + container_name: str, + packages_to_install: List[str], + pip_index_url: str, +): + """ + Get Pod template spec from the given function and input parameters. + """ + + # Check if function is callable. + if not callable(func): + raise ValueError( + f"Training function must be callable, got function type: {type(func)}" + ) + + # Extract function implementation. + func_code = inspect.getsource(func) + + # Function might be defined in some indented scope (e.g. in another function). + # We need to dedent the function code. + func_code = textwrap.dedent(func_code) + + # Wrap function code to execute it from the file. For example: + # def train(parameters): + # print('Start Training...') + # train({'lr': 0.01}) + if parameters is None: + func_code = f"{func_code}\n{func.__name__}()\n" + else: + func_code = f"{func_code}\n{func.__name__}({parameters})\n" + + # Prepare execute script template. + exec_script = textwrap.dedent( + """ + program_path=$(mktemp -d) + read -r -d '' SCRIPT << EOM\n + {func_code} + EOM + printf "%s" "$SCRIPT" > $program_path/ephemeral_script.py + python3 -u $program_path/ephemeral_script.py""" + ) + + # Add function code to the execute script. + exec_script = exec_script.format(func_code=func_code) + + # Install Python packages if that is required. + if packages_to_install is not None: + exec_script = ( + get_script_for_python_packages(packages_to_install, pip_index_url) + + exec_script + ) + + # Create Pod template spec. + pod_template_spec = client.V1PodTemplateSpec( + metadata=client.V1ObjectMeta(annotations={"sidecar.istio.io/inject": "false"}), + spec=client.V1PodSpec( + containers=[ + client.V1Container( + name=container_name, + image=base_image, + command=["bash", "-c"], + args=[exec_script], + ) + ] + ), + ) + + return pod_template_spec + + class TableLogger: def __init__(self, header, column_format): self.header = header diff --git a/sdk/python/requirements.txt b/sdk/python/requirements.txt deleted file mode 100644 index b293781fd6..0000000000 --- a/sdk/python/requirements.txt +++ /dev/null @@ -1,7 +0,0 @@ -certifi>=14.05.14 -six>=1.10 -python_dateutil>=2.5.3 -setuptools>=21.0.0 -urllib3>=1.15.1 -kubernetes>=23.6.0 -retrying>=1.3.3 diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 3a0eb7b682..c837d447db 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -14,45 +14,49 @@ import setuptools -TESTS_REQUIRES = [ - 'pytest', - 'pytest-tornasync', - 'mypy' -] +TESTS_REQUIRES = ["pytest", "pytest-tornasync", "mypy"] -with open('requirements.txt') as f: - REQUIRES = f.readlines() +REQUIRES = [ + "certifi>=14.05.14", + "six>=1.10", + "setuptools>=21.0.0", + "urllib3>=1.15.1", + "kubernetes>=23.6.0", + "retrying>=1.3.3", +] setuptools.setup( - name='kubeflow-training', - version='1.5.0', - author="Kubeflow Authors", - author_email='hejinchi@cn.ibm.com', - license="Apache License Version 2.0", - url="https://github.com/kubeflow/training-operator/sdk/python", - description="Training Operator Python SDK", - long_description="Training Operator Python SDK", - packages=setuptools.find_packages( - include=("kubeflow*")), - package_data={}, - include_package_data=False, - zip_safe=False, - classifiers=[ - 'Intended Audience :: Developers', - 'Intended Audience :: Education', - 'Intended Audience :: Science/Research', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - "License :: OSI Approved :: Apache Software License", - "Operating System :: OS Independent", - 'Topic :: Scientific/Engineering', - 'Topic :: Scientific/Engineering :: Artificial Intelligence', - 'Topic :: Software Development', - 'Topic :: Software Development :: Libraries', - 'Topic :: Software Development :: Libraries :: Python Modules', - ], - install_requires=REQUIRES, - tests_require=TESTS_REQUIRES, - extras_require={'test': TESTS_REQUIRES} + name="kubeflow-training", + version="1.5.0", + author="Kubeflow Authors", + author_email="hejinchi@cn.ibm.com", + license="Apache License Version 2.0", + url="https://github.com/kubeflow/training-operator/sdk/python", + description="Training Operator Python SDK", + long_description="Training Operator Python SDK", + packages=setuptools.find_packages(include=("kubeflow*")), + package_data={}, + include_package_data=False, + zip_safe=False, + classifiers=[ + "Intended Audience :: Developers", + "Intended Audience :: Education", + "Intended Audience :: Science/Research", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "License :: OSI Approved :: Apache Software License", + "Operating System :: OS Independent", + "Topic :: Scientific/Engineering", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + "Topic :: Software Development", + "Topic :: Software Development :: Libraries", + "Topic :: Software Development :: Libraries :: Python Modules", + ], + install_requires=REQUIRES, + tests_require=TESTS_REQUIRES, + extras_require={"test": TESTS_REQUIRES}, )