From 4859559ff594396e34835d55e8c7dbc395266f68 Mon Sep 17 00:00:00 2001 From: terrytangyuan Date: Thu, 10 Jan 2019 11:35:51 -0500 Subject: [PATCH 1/7] Support processing resource types other than GPU --- cmd/mpi-operator/main.go | 19 +-- pkg/controllers/mpi_job_controller.go | 128 +++++++++++++-------- pkg/controllers/mpi_job_controller_test.go | 15 +-- 3 files changed, 97 insertions(+), 65 deletions(-) diff --git a/cmd/mpi-operator/main.go b/cmd/mpi-operator/main.go index d1ae7161..82f598ea 100644 --- a/cmd/mpi-operator/main.go +++ b/cmd/mpi-operator/main.go @@ -29,11 +29,12 @@ import ( ) var ( - masterURL string - kubeConfig string - gpusPerNode int - kubectlDeliveryImage string - namespace string + masterURL string + kubeConfig string + processingUnitsPerNode int + processingResourceType string + kubectlDeliveryImage string + namespace string ) func main() { @@ -77,7 +78,8 @@ func main() { kubeInformerFactory.Apps().V1().StatefulSets(), kubeInformerFactory.Batch().V1().Jobs(), kubeflowInformerFactory.Kubeflow().V1alpha1().MPIJobs(), - gpusPerNode, + processingUnitsPerNode, + processingResourceType, kubectlDeliveryImage) go kubeInformerFactory.Start(stopCh) @@ -92,10 +94,11 @@ func init() { flag.StringVar(&kubeConfig, "kubeConfig", "", "Path to a kubeConfig. Only required if out-of-cluster.") flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeConfig. Only required if out-of-cluster.") flag.IntVar( - &gpusPerNode, - "gpus-per-node", + &processingUnitsPerNode, + "processing-units-per-node", 1, "The maximum number of GPUs available per node. Note that this will be ignored if the GPU resources are explicitly specified in the MPIJob pod spec.") flag.StringVar(&kubectlDeliveryImage, "kubectl-delivery-image", "", "The container image used to deliver the kubectl binary.") flag.StringVar(&namespace, "namespace", "", "The namespace used to obtain the listers.") + flag.StringVar(&processingResourceType, "processing-resource-type", "nvidia.com/gpu", "The compute resource name, e.g. 'nvidia.com/gpu' or 'cpu'.") } diff --git a/pkg/controllers/mpi_job_controller.go b/pkg/controllers/mpi_job_controller.go index 3458f445..c0bce164 100644 --- a/pkg/controllers/mpi_job_controller.go +++ b/pkg/controllers/mpi_job_controller.go @@ -67,11 +67,16 @@ const ( launcherSuffix = "-launcher" workerSuffix = "-worker" gpuResourceName = "nvidia.com/gpu" + cpuResourceName = "cpu" labelGroupName = "group_name" labelMPIJobName = "mpi_job_name" labelMPIRoleType = "mpi_role_type" ) +var ( + processingResourceType string +) + const ( // SuccessSynced is used as part of the Event 'reason' when an MPIJob is // synced. @@ -124,7 +129,9 @@ type MPIJobController struct { // Kubernetes API. recorder record.EventRecorder // The maximum number of GPUs per node. - gpusPerNode int + processingUnitsPerNode int + // The processing resource name, e.g. "nvidia.com/gpu" or "cpu" + processingResourceType string // The container image used to deliver the kubectl binary. kubectlDeliveryImage string } @@ -140,7 +147,8 @@ func NewMPIJobController( statefulSetInformer appsinformers.StatefulSetInformer, jobInformer batchinformers.JobInformer, mpiJobInformer informers.MPIJobInformer, - gpusPerNode int, + processingUnitsPerNode int, + processingResourceType string, kubectlDeliveryImage string) *MPIJobController { // Create event broadcaster. @@ -154,26 +162,26 @@ func NewMPIJobController( recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) controller := &MPIJobController{ - kubeClient: kubeClient, - kubeflowClient: kubeflowClient, - configMapLister: configMapInformer.Lister(), - configMapSynced: configMapInformer.Informer().HasSynced, - serviceAccountLister: serviceAccountInformer.Lister(), - serviceAccountSynced: serviceAccountInformer.Informer().HasSynced, - roleLister: roleInformer.Lister(), - roleSynced: roleInformer.Informer().HasSynced, - roleBindingLister: roleBindingInformer.Lister(), - roleBindingSynced: roleBindingInformer.Informer().HasSynced, - statefulSetLister: statefulSetInformer.Lister(), - statefulSetSynced: statefulSetInformer.Informer().HasSynced, - jobLister: jobInformer.Lister(), - jobSynced: jobInformer.Informer().HasSynced, - mpiJobLister: mpiJobInformer.Lister(), - mpiJobSynced: mpiJobInformer.Informer().HasSynced, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MPIJobs"), - recorder: recorder, - gpusPerNode: gpusPerNode, - kubectlDeliveryImage: kubectlDeliveryImage, + kubeClient: kubeClient, + kubeflowClient: kubeflowClient, + configMapLister: configMapInformer.Lister(), + configMapSynced: configMapInformer.Informer().HasSynced, + serviceAccountLister: serviceAccountInformer.Lister(), + serviceAccountSynced: serviceAccountInformer.Informer().HasSynced, + roleLister: roleInformer.Lister(), + roleSynced: roleInformer.Informer().HasSynced, + roleBindingLister: roleBindingInformer.Lister(), + roleBindingSynced: roleBindingInformer.Informer().HasSynced, + statefulSetLister: statefulSetInformer.Lister(), + statefulSetSynced: statefulSetInformer.Informer().HasSynced, + jobLister: jobInformer.Lister(), + jobSynced: jobInformer.Informer().HasSynced, + mpiJobLister: mpiJobInformer.Lister(), + mpiJobSynced: mpiJobInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MPIJobs"), + recorder: recorder, + processingUnitsPerNode: processingUnitsPerNode, + kubectlDeliveryImage: kubectlDeliveryImage, } glog.Info("Setting up event handlers") @@ -406,7 +414,7 @@ func (c *MPIJobController) syncHandler(key string) error { // We're done if the launcher either succeeded or failed. done := launcher != nil && (launcher.Status.Succeeded == 1 || launcher.Status.Failed == 1) - workerReplicas, gpusPerWorker, err := allocateGPUs(mpiJob, c.gpusPerNode, done) + workerReplicas, processingUnitsPerWorker, err := allocateProcessingUnits(mpiJob, c.processingUnitsPerNode, c.processingResourceType, done) if err != nil { runtime.HandleError(err) return nil @@ -414,7 +422,7 @@ func (c *MPIJobController) syncHandler(key string) error { if !done { // Get the ConfigMap for this MPIJob. - if config, err := c.getOrCreateConfigMap(mpiJob, workerReplicas, gpusPerWorker); config == nil || err != nil { + if config, err := c.getOrCreateConfigMap(mpiJob, workerReplicas, processingUnitsPerWorker); config == nil || err != nil { return err } @@ -434,7 +442,7 @@ func (c *MPIJobController) syncHandler(key string) error { } } - worker, err := c.getOrCreateWorkerStatefulSet(mpiJob, workerReplicas, gpusPerWorker) + worker, err := c.getOrCreateWorkerStatefulSet(mpiJob, workerReplicas, processingUnitsPerWorker) if err != nil { return err } @@ -483,45 +491,51 @@ func (c *MPIJobController) getLauncherJob(mpiJob *kubeflow.MPIJob) (*batchv1.Job return launcher, nil } -// allocateGPUs allocates the worker replicas and GPUs per worker. -func allocateGPUs(mpiJob *kubeflow.MPIJob, gpusPerNode int, done bool) (workerReplicas int, gpusPerWorker int, err error) { +// allocateProcessingUnits allocates the worker replicas and processing units per worker. +func allocateProcessingUnits( + mpiJob *kubeflow.MPIJob, + processingUnitsPerNode int, + processingResourceType string, + done bool) (workerReplicas int, processingUnitsPerWorker int, err error) { workerReplicas = 0 - gpusPerWorker = 0 + processingUnitsPerWorker = 0 err = nil + // TODO: Update MPIJob spec if mpiJob.Spec.GPUs != nil { - totalGPUs := int(*mpiJob.Spec.GPUs) - if totalGPUs < gpusPerNode { + totalProcessingUnits := int(*mpiJob.Spec.GPUs) + if totalProcessingUnits < processingUnitsPerNode { workerReplicas = 1 - gpusPerWorker = totalGPUs - } else if totalGPUs%gpusPerNode == 0 { - workerReplicas = totalGPUs / gpusPerNode - gpusPerWorker = gpusPerNode + processingUnitsPerWorker = totalProcessingUnits + } else if totalProcessingUnits%processingUnitsPerNode == 0 { + workerReplicas = totalProcessingUnits / processingUnitsPerNode + processingUnitsPerWorker = processingUnitsPerNode } else { - err = fmt.Errorf("specified #GPUs is not a multiple of GPUs per node (%d)", gpusPerNode) + // TODO: Update error message + err = fmt.Errorf("specified #GPUs is not a multiple of GPUs per node (%d)", processingUnitsPerNode) } } else if mpiJob.Spec.Replicas != nil { workerReplicas = int(*mpiJob.Spec.Replicas) container := mpiJob.Spec.Template.Spec.Containers[0] if container.Resources.Limits != nil { - if val, ok := container.Resources.Limits[gpuResourceName]; ok { - gpus, _ := val.AsInt64() - gpusPerWorker = int(gpus) + if val, ok := container.Resources.Limits[convertProcessingResourceType(processingResourceType)]; ok { + processingUnits, _ := val.AsInt64() + processingUnitsPerWorker = int(processingUnits) } } } if done { workerReplicas = 0 } - return workerReplicas, gpusPerWorker, err + return workerReplicas, processingUnitsPerWorker, err } // getOrCreateConfigMap gets the ConfigMap controlled by this MPIJob, or creates // one if it doesn't exist. -func (c *MPIJobController) getOrCreateConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int, gpusPerWorker int) (*corev1.ConfigMap, error) { +func (c *MPIJobController) getOrCreateConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int, processingUnitsPerWorker int) (*corev1.ConfigMap, error) { cm, err := c.configMapLister.ConfigMaps(mpiJob.Namespace).Get(mpiJob.Name + configSuffix) // If the ConfigMap doesn't exist, we'll create it. if errors.IsNotFound(err) { - cm, err = c.kubeClient.CoreV1().ConfigMaps(mpiJob.Namespace).Create(newConfigMap(mpiJob, workerReplicas, gpusPerWorker)) + cm, err = c.kubeClient.CoreV1().ConfigMaps(mpiJob.Namespace).Create(newConfigMap(mpiJob, workerReplicas, processingUnitsPerWorker)) } // If an error occurs during Get/Create, we'll requeue the item so we // can attempt processing again later. This could have been caused by a @@ -616,11 +630,11 @@ func (c *MPIJobController) getLauncherRoleBinding(mpiJob *kubeflow.MPIJob) (*rba // getOrCreateWorkerStatefulSet gets the worker StatefulSet controlled by this // MPIJob, or creates one if it doesn't exist. -func (c *MPIJobController) getOrCreateWorkerStatefulSet(mpiJob *kubeflow.MPIJob, workerReplicas int, gpusPerWorker int) (*appsv1.StatefulSet, error) { +func (c *MPIJobController) getOrCreateWorkerStatefulSet(mpiJob *kubeflow.MPIJob, workerReplicas int, processingUnitsPerWorker int) (*appsv1.StatefulSet, error) { worker, err := c.statefulSetLister.StatefulSets(mpiJob.Namespace).Get(mpiJob.Name + workerSuffix) // If the StatefulSet doesn't exist, we'll create it. if errors.IsNotFound(err) && workerReplicas > 0 { - worker, err = c.kubeClient.AppsV1().StatefulSets(mpiJob.Namespace).Create(newWorker(mpiJob, int32(workerReplicas), gpusPerWorker)) + worker, err = c.kubeClient.AppsV1().StatefulSets(mpiJob.Namespace).Create(newWorker(mpiJob, int32(workerReplicas), processingUnitsPerWorker, processingResourceType)) } // If an error occurs during Get/Create, we'll requeue the item so we // can attempt processing again later. This could have been caused by a @@ -639,7 +653,7 @@ func (c *MPIJobController) getOrCreateWorkerStatefulSet(mpiJob *kubeflow.MPIJob, // If the worker is out of date, update the worker. if worker != nil && int(*worker.Spec.Replicas) != workerReplicas { - worker, err = c.kubeClient.AppsV1().StatefulSets(mpiJob.Namespace).Update(newWorker(mpiJob, int32(workerReplicas), gpusPerWorker)) + worker, err = c.kubeClient.AppsV1().StatefulSets(mpiJob.Namespace).Update(newWorker(mpiJob, int32(workerReplicas), processingUnitsPerWorker, processingResourceType)) // If an error occurs during Update, we'll requeue the item so we can // attempt processing again later. This could have been caused by a // temporary network failure, or any other transient reason. @@ -732,7 +746,7 @@ func (c *MPIJobController) handleObject(obj interface{}) { // newConfigMap creates a new ConfigMap containing configurations for an MPIJob // resource. It also sets the appropriate OwnerReferences on the resource so // handleObject can discover the MPIJob resource that 'owns' it. -func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int, gpusPerWorker int) *corev1.ConfigMap { +func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int, processingUnitsPerWorker int) *corev1.ConfigMap { kubexec := fmt.Sprintf(`#!/bin/sh set -x POD_NAME=$1 @@ -740,10 +754,10 @@ shift %s/kubectl exec ${POD_NAME} -- /bin/sh -c "$*" `, kubectlMountPath) - // If no GPU is specified, default to 1 slot. + // If no processing unit is specified, default to 1 slot. slots := 1 - if gpusPerWorker > 0 { - slots = gpusPerWorker + if processingUnitsPerWorker > 0 { + slots = processingUnitsPerWorker } var buffer bytes.Buffer for i := 0; i < workerReplicas; i++ { @@ -850,10 +864,23 @@ func newLauncherRoleBinding(mpiJob *kubeflow.MPIJob) *rbacv1.RoleBinding { } } +func convertProcessingResourceType(processingResourceType string) corev1.ResourceName { + if processingResourceType == gpuResourceName { + return corev1.ResourceNvidiaGPU + } else if processingResourceType == cpuResourceName { + return corev1.ResourceCPU + } + fmt.Printf( + "Unsupported processing resource type specified: %q. \nSwitching to use NVIDIA GPU by default.", + processingResourceType) + + return corev1.ResourceNvidiaGPU +} + // newWorker creates a new worker StatefulSet for an MPIJob resource. It also // sets the appropriate OwnerReferences on the resource so handleObject can // discover the MPIJob resource that 'owns' it. -func newWorker(mpiJob *kubeflow.MPIJob, desiredReplicas int32, gpus int) *appsv1.StatefulSet { +func newWorker(mpiJob *kubeflow.MPIJob, desiredReplicas int32, processingUnits int, processingResourceType string) *appsv1.StatefulSet { labels := map[string]string{ labelGroupName: "kubeflow.org", labelMPIJobName: mpiJob.Name, @@ -878,7 +905,8 @@ func newWorker(mpiJob *kubeflow.MPIJob, desiredReplicas int32, gpus int) *appsv1 if container.Resources.Limits == nil { container.Resources.Limits = make(corev1.ResourceList) } - container.Resources.Limits[gpuResourceName] = *resource.NewQuantity(int64(gpus), resource.DecimalExponent) + // TODO: Validate processingResourceType + container.Resources.Limits[convertProcessingResourceType(processingResourceType)] = *resource.NewQuantity(int64(processingUnits), resource.DecimalExponent) // We need the kubexec.sh script here because Open MPI checks for the path // in every rank. diff --git a/pkg/controllers/mpi_job_controller_test.go b/pkg/controllers/mpi_job_controller_test.go index eb6ce631..d1431941 100644 --- a/pkg/controllers/mpi_job_controller_test.go +++ b/pkg/controllers/mpi_job_controller_test.go @@ -145,6 +145,7 @@ func (f *fixture) newController() (*MPIJobController, informers.SharedInformerFa k8sI.Batch().V1().Jobs(), i.Kubeflow().V1alpha1().MPIJobs(), 8, + "nvidia.com/gpu", "kubectl-delivery") c.configMapSynced = alwaysReady @@ -503,7 +504,7 @@ func TestLauncherDoesNotExist(t *testing.T) { expRoleBinding := newLauncherRoleBinding(mpiJob) f.expectCreateRoleBindingAction(expRoleBinding) - expWorker := newWorker(mpiJob, 8, 8) + expWorker := newWorker(mpiJob, 8, 8, gpuResourceName) f.expectCreateStatefulSetAction(expWorker) mpiJobCopy := mpiJob.DeepCopy() @@ -531,7 +532,7 @@ func TestLauncherDoesNotExistWithCustomResources(t *testing.T) { expRoleBinding := newLauncherRoleBinding(mpiJob) f.expectCreateRoleBindingAction(expRoleBinding) - expWorker := newWorker(mpiJob, 4, 4) + expWorker := newWorker(mpiJob, 4, 4, gpuResourceName) f.expectCreateStatefulSetAction(expWorker) mpiJobCopy := mpiJob.DeepCopy() @@ -612,10 +613,10 @@ func TestShutdownWorker(t *testing.T) { launcher.Status.Succeeded = 1 f.setUpLauncher(launcher) - worker := newWorker(mpiJob, 8, 8) + worker := newWorker(mpiJob, 8, 8, gpuResourceName) f.setUpWorker(worker) - expWorker := newWorker(mpiJob, 0, 8) + expWorker := newWorker(mpiJob, 0, 8, gpuResourceName) f.expectUpdateStatefulSetAction(expWorker) mpiJobCopy := mpiJob.DeepCopy() @@ -635,7 +636,7 @@ func TestWorkerNotControlledByUs(t *testing.T) { f.setUpConfigMap(newConfigMap(mpiJob, 8, 8)) f.setUpRbac(mpiJob, 8) - worker := newWorker(mpiJob, 8, 8) + worker := newWorker(mpiJob, 8, 8, gpuResourceName) worker.OwnerReferences = nil f.setUpWorker(worker) @@ -655,7 +656,7 @@ func TestLauncherActive(t *testing.T) { launcher.Status.Active = 1 f.setUpLauncher(launcher) - worker := newWorker(mpiJob, 1, 8) + worker := newWorker(mpiJob, 1, 8, gpuResourceName) f.setUpWorker(worker) mpiJobCopy := mpiJob.DeepCopy() @@ -674,7 +675,7 @@ func TestWorkerReady(t *testing.T) { f.setUpConfigMap(newConfigMap(mpiJob, 2, 8)) f.setUpRbac(mpiJob, 2) - worker := newWorker(mpiJob, 2, 8) + worker := newWorker(mpiJob, 2, 8, gpuResourceName) worker.Status.ReadyReplicas = 2 f.setUpWorker(worker) From 41128436bcea4412d13d67b0df25484d7ad24ec8 Mon Sep 17 00:00:00 2001 From: terrytangyuan Date: Thu, 10 Jan 2019 11:56:22 -0500 Subject: [PATCH 2/7] Removed unneccessary var --- pkg/controllers/mpi_job_controller.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/pkg/controllers/mpi_job_controller.go b/pkg/controllers/mpi_job_controller.go index c0bce164..7ef5ba1c 100644 --- a/pkg/controllers/mpi_job_controller.go +++ b/pkg/controllers/mpi_job_controller.go @@ -73,10 +73,6 @@ const ( labelMPIRoleType = "mpi_role_type" ) -var ( - processingResourceType string -) - const ( // SuccessSynced is used as part of the Event 'reason' when an MPIJob is // synced. @@ -442,7 +438,7 @@ func (c *MPIJobController) syncHandler(key string) error { } } - worker, err := c.getOrCreateWorkerStatefulSet(mpiJob, workerReplicas, processingUnitsPerWorker) + worker, err := c.getOrCreateWorkerStatefulSet(mpiJob, workerReplicas, processingUnitsPerWorker, c.processingResourceType) if err != nil { return err } @@ -630,7 +626,7 @@ func (c *MPIJobController) getLauncherRoleBinding(mpiJob *kubeflow.MPIJob) (*rba // getOrCreateWorkerStatefulSet gets the worker StatefulSet controlled by this // MPIJob, or creates one if it doesn't exist. -func (c *MPIJobController) getOrCreateWorkerStatefulSet(mpiJob *kubeflow.MPIJob, workerReplicas int, processingUnitsPerWorker int) (*appsv1.StatefulSet, error) { +func (c *MPIJobController) getOrCreateWorkerStatefulSet(mpiJob *kubeflow.MPIJob, workerReplicas int, processingUnitsPerWorker int, processingResourceType string) (*appsv1.StatefulSet, error) { worker, err := c.statefulSetLister.StatefulSets(mpiJob.Namespace).Get(mpiJob.Name + workerSuffix) // If the StatefulSet doesn't exist, we'll create it. if errors.IsNotFound(err) && workerReplicas > 0 { @@ -905,7 +901,6 @@ func newWorker(mpiJob *kubeflow.MPIJob, desiredReplicas int32, processingUnits i if container.Resources.Limits == nil { container.Resources.Limits = make(corev1.ResourceList) } - // TODO: Validate processingResourceType container.Resources.Limits[convertProcessingResourceType(processingResourceType)] = *resource.NewQuantity(int64(processingUnits), resource.DecimalExponent) // We need the kubexec.sh script here because Open MPI checks for the path From df3f8705659ee0f0ef36188a2e9664ef13f4d38c Mon Sep 17 00:00:00 2001 From: terrytangyuan Date: Mon, 14 Jan 2019 14:34:32 -0500 Subject: [PATCH 3/7] Logic to handle old and new params --- cmd/mpi-operator/main.go | 11 +++- pkg/apis/kubeflow/v1alpha1/types.go | 6 +++ .../v1alpha1/zz_generated.deepcopy.go | 5 ++ pkg/controllers/mpi_job_controller.go | 51 +++++++++++++------ pkg/controllers/mpi_job_controller_test.go | 3 +- 5 files changed, 58 insertions(+), 18 deletions(-) diff --git a/cmd/mpi-operator/main.go b/cmd/mpi-operator/main.go index 82f598ea..07aaed40 100644 --- a/cmd/mpi-operator/main.go +++ b/cmd/mpi-operator/main.go @@ -31,6 +31,7 @@ import ( var ( masterURL string kubeConfig string + gpusPerNode int processingUnitsPerNode int processingResourceType string kubectlDeliveryImage string @@ -78,6 +79,7 @@ func main() { kubeInformerFactory.Apps().V1().StatefulSets(), kubeInformerFactory.Batch().V1().Jobs(), kubeflowInformerFactory.Kubeflow().V1alpha1().MPIJobs(), + gpusPerNode, processingUnitsPerNode, processingResourceType, kubectlDeliveryImage) @@ -94,11 +96,16 @@ func init() { flag.StringVar(&kubeConfig, "kubeConfig", "", "Path to a kubeConfig. Only required if out-of-cluster.") flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeConfig. Only required if out-of-cluster.") flag.IntVar( - &processingUnitsPerNode, - "processing-units-per-node", + &gpusPerNode, + "gpus-per-node", 1, "The maximum number of GPUs available per node. Note that this will be ignored if the GPU resources are explicitly specified in the MPIJob pod spec.") flag.StringVar(&kubectlDeliveryImage, "kubectl-delivery-image", "", "The container image used to deliver the kubectl binary.") flag.StringVar(&namespace, "namespace", "", "The namespace used to obtain the listers.") + flag.IntVar( + &processingUnitsPerNode, + "processing-units-per-node", + 1, + "The maximum number of processing units available per node. Note that this will be ignored if the processing resources are explicitly specified in the MPIJob pod spec.") flag.StringVar(&processingResourceType, "processing-resource-type", "nvidia.com/gpu", "The compute resource name, e.g. 'nvidia.com/gpu' or 'cpu'.") } diff --git a/pkg/apis/kubeflow/v1alpha1/types.go b/pkg/apis/kubeflow/v1alpha1/types.go index b62938bc..6faedd4b 100644 --- a/pkg/apis/kubeflow/v1alpha1/types.go +++ b/pkg/apis/kubeflow/v1alpha1/types.go @@ -40,9 +40,15 @@ type MPIJobList struct { type MPIJobSpec struct { // Specifies the desired number of GPUs the MPIJob should run on. // Mutually exclusive with the `Replicas` field. + // Note that this is deprecated in favor of `ProcessingUnits` field. // +optional GPUs *int32 `json:"gpus,omitempty"` + // Specifies the desired number of processing units the MPIJob should run on. + // Mutually exclusive with the `Replicas` field. + // +optional + ProcessingUnits *int32 `json:"processingUnits,omitempty"` + // Run the launcher on the master. // Optional: Default to false // +optional diff --git a/pkg/apis/kubeflow/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/kubeflow/v1alpha1/zz_generated.deepcopy.go index a04fd704..778ba689 100644 --- a/pkg/apis/kubeflow/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow/v1alpha1/zz_generated.deepcopy.go @@ -91,6 +91,11 @@ func (in *MPIJobSpec) DeepCopyInto(out *MPIJobSpec) { *out = new(int32) **out = **in } + if in.ProcessingUnits != nil { + in, out := &in.ProcessingUnits, &out.ProcessingUnits + *out = new(int32) + **out = **in + } if in.BackoffLimit != nil { in, out := &in.BackoffLimit, &out.BackoffLimit *out = new(int32) diff --git a/pkg/controllers/mpi_job_controller.go b/pkg/controllers/mpi_job_controller.go index 7ef5ba1c..091c1103 100644 --- a/pkg/controllers/mpi_job_controller.go +++ b/pkg/controllers/mpi_job_controller.go @@ -125,6 +125,8 @@ type MPIJobController struct { // Kubernetes API. recorder record.EventRecorder // The maximum number of GPUs per node. + gpusPerNode int + // The maximum number of processing units per node. processingUnitsPerNode int // The processing resource name, e.g. "nvidia.com/gpu" or "cpu" processingResourceType string @@ -143,6 +145,7 @@ func NewMPIJobController( statefulSetInformer appsinformers.StatefulSetInformer, jobInformer batchinformers.JobInformer, mpiJobInformer informers.MPIJobInformer, + gpusPerNode int, processingUnitsPerNode int, processingResourceType string, kubectlDeliveryImage string) *MPIJobController { @@ -176,7 +179,9 @@ func NewMPIJobController( mpiJobSynced: mpiJobInformer.Informer().HasSynced, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MPIJobs"), recorder: recorder, + gpusPerNode: gpusPerNode, processingUnitsPerNode: processingUnitsPerNode, + processingResourceType: processingResourceType, kubectlDeliveryImage: kubectlDeliveryImage, } @@ -410,7 +415,7 @@ func (c *MPIJobController) syncHandler(key string) error { // We're done if the launcher either succeeded or failed. done := launcher != nil && (launcher.Status.Succeeded == 1 || launcher.Status.Failed == 1) - workerReplicas, processingUnitsPerWorker, err := allocateProcessingUnits(mpiJob, c.processingUnitsPerNode, c.processingResourceType, done) + workerReplicas, processingUnitsPerWorker, err := allocateProcessingUnits(mpiJob, c.gpusPerNode, c.processingUnitsPerNode, c.processingResourceType, done) if err != nil { runtime.HandleError(err) return nil @@ -490,24 +495,40 @@ func (c *MPIJobController) getLauncherJob(mpiJob *kubeflow.MPIJob) (*batchv1.Job // allocateProcessingUnits allocates the worker replicas and processing units per worker. func allocateProcessingUnits( mpiJob *kubeflow.MPIJob, + gpusPerNode int, processingUnitsPerNode int, processingResourceType string, done bool) (workerReplicas int, processingUnitsPerWorker int, err error) { workerReplicas = 0 processingUnitsPerWorker = 0 err = nil - // TODO: Update MPIJob spec - if mpiJob.Spec.GPUs != nil { - totalProcessingUnits := int(*mpiJob.Spec.GPUs) - if totalProcessingUnits < processingUnitsPerNode { - workerReplicas = 1 - processingUnitsPerWorker = totalProcessingUnits - } else if totalProcessingUnits%processingUnitsPerNode == 0 { - workerReplicas = totalProcessingUnits / processingUnitsPerNode - processingUnitsPerWorker = processingUnitsPerNode + if mpiJob.Spec.GPUs != nil || mpiJob.Spec.ProcessingUnits != nil { + if mpiJob.Spec.ProcessingUnits != nil && mpiJob.Spec.GPUs != nil { + err = fmt.Errorf("Cannot specify both GPUs and ProcessingUnits at the same time") } else { - // TODO: Update error message - err = fmt.Errorf("specified #GPUs is not a multiple of GPUs per node (%d)", processingUnitsPerNode) + totalProcessingUnits := 0 + usedSpecField := "" + pusPerNode := 0 + if mpiJob.Spec.GPUs != nil { + fmt.Println("GPUs field is deprecated. Please switch to use ProcessingUnits.") + totalProcessingUnits = int(*mpiJob.Spec.GPUs) + pusPerNode = gpusPerNode + usedSpecField = "GPUs" + } else if mpiJob.Spec.ProcessingUnits != nil { + totalProcessingUnits = int(*mpiJob.Spec.ProcessingUnits) + pusPerNode = processingUnitsPerNode + usedSpecField = "ProcessingUnits" + } + if totalProcessingUnits < pusPerNode { + workerReplicas = 1 + processingUnitsPerWorker = totalProcessingUnits + } else if totalProcessingUnits%pusPerNode == 0 { + workerReplicas = totalProcessingUnits / pusPerNode + processingUnitsPerWorker = pusPerNode + } else { + err = fmt.Errorf( + "specified #%s is not a multiple of %s per node (%d)", usedSpecField, usedSpecField, processingUnitsPerNode) + } } } else if mpiJob.Spec.Replicas != nil { workerReplicas = int(*mpiJob.Spec.Replicas) @@ -862,15 +883,15 @@ func newLauncherRoleBinding(mpiJob *kubeflow.MPIJob) *rbacv1.RoleBinding { func convertProcessingResourceType(processingResourceType string) corev1.ResourceName { if processingResourceType == gpuResourceName { - return corev1.ResourceNvidiaGPU + return gpuResourceName } else if processingResourceType == cpuResourceName { - return corev1.ResourceCPU + return cpuResourceName } fmt.Printf( "Unsupported processing resource type specified: %q. \nSwitching to use NVIDIA GPU by default.", processingResourceType) - return corev1.ResourceNvidiaGPU + return gpuResourceName } // newWorker creates a new worker StatefulSet for an MPIJob resource. It also diff --git a/pkg/controllers/mpi_job_controller_test.go b/pkg/controllers/mpi_job_controller_test.go index d1431941..883641d9 100644 --- a/pkg/controllers/mpi_job_controller_test.go +++ b/pkg/controllers/mpi_job_controller_test.go @@ -145,7 +145,8 @@ func (f *fixture) newController() (*MPIJobController, informers.SharedInformerFa k8sI.Batch().V1().Jobs(), i.Kubeflow().V1alpha1().MPIJobs(), 8, - "nvidia.com/gpu", + 8, + gpuResourceName, "kubectl-delivery") c.configMapSynced = alwaysReady From 75685ca021e1a8261d36602853bb6d06ce261c34 Mon Sep 17 00:00:00 2001 From: terrytangyuan Date: Mon, 14 Jan 2019 15:46:27 -0500 Subject: [PATCH 4/7] Expose SlotsPerWorker in MPIJobSpec --- pkg/apis/kubeflow/v1alpha1/types.go | 4 ++++ pkg/apis/kubeflow/v1alpha1/zz_generated.deepcopy.go | 5 +++++ pkg/controllers/mpi_job_controller.go | 4 +++- 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/pkg/apis/kubeflow/v1alpha1/types.go b/pkg/apis/kubeflow/v1alpha1/types.go index 6faedd4b..00496506 100644 --- a/pkg/apis/kubeflow/v1alpha1/types.go +++ b/pkg/apis/kubeflow/v1alpha1/types.go @@ -49,6 +49,10 @@ type MPIJobSpec struct { // +optional ProcessingUnits *int32 `json:"processingUnits,omitempty"` + // Specifies the number of slots per worker used in hostfile. + // +optional + SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"` + // Run the launcher on the master. // Optional: Default to false // +optional diff --git a/pkg/apis/kubeflow/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/kubeflow/v1alpha1/zz_generated.deepcopy.go index 778ba689..d37ad64d 100644 --- a/pkg/apis/kubeflow/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow/v1alpha1/zz_generated.deepcopy.go @@ -96,6 +96,11 @@ func (in *MPIJobSpec) DeepCopyInto(out *MPIJobSpec) { *out = new(int32) **out = **in } + if in.SlotsPerWorker != nil { + in, out := &in.SlotsPerWorker, &out.SlotsPerWorker + *out = new(int32) + **out = **in + } if in.BackoffLimit != nil { in, out := &in.BackoffLimit, &out.BackoffLimit *out = new(int32) diff --git a/pkg/controllers/mpi_job_controller.go b/pkg/controllers/mpi_job_controller.go index 091c1103..c3807a91 100644 --- a/pkg/controllers/mpi_job_controller.go +++ b/pkg/controllers/mpi_job_controller.go @@ -773,8 +773,10 @@ shift // If no processing unit is specified, default to 1 slot. slots := 1 - if processingUnitsPerWorker > 0 { + if mpiJob.Spec.SlotsPerWorker == nil && processingUnitsPerWorker > 0 { slots = processingUnitsPerWorker + } else { + slots = int(*mpiJob.Spec.SlotsPerWorker) } var buffer bytes.Buffer for i := 0; i < workerReplicas; i++ { From a8855527758aeac86473aaa6663f06108ea8b3a9 Mon Sep 17 00:00:00 2001 From: terrytangyuan Date: Mon, 14 Jan 2019 16:01:33 -0500 Subject: [PATCH 5/7] Fixed hostfile slots logic --- pkg/apis/kubeflow/v1alpha1/types.go | 5 +++-- pkg/controllers/mpi_job_controller.go | 6 ++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/apis/kubeflow/v1alpha1/types.go b/pkg/apis/kubeflow/v1alpha1/types.go index 00496506..21f2b0db 100644 --- a/pkg/apis/kubeflow/v1alpha1/types.go +++ b/pkg/apis/kubeflow/v1alpha1/types.go @@ -50,16 +50,17 @@ type MPIJobSpec struct { ProcessingUnits *int32 `json:"processingUnits,omitempty"` // Specifies the number of slots per worker used in hostfile. + // Defaults to the number of processing units per worker. // +optional SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"` // Run the launcher on the master. - // Optional: Default to false + // Defaults to false. // +optional LauncherOnMaster bool `json:"launcherOnMaster,omitempty"` // Specifies the number of retries before marking this job failed. - // Defaults to 6 + // Defaults to 6. // +optional BackoffLimit *int32 `json:"backoffLimit,omitempty"` diff --git a/pkg/controllers/mpi_job_controller.go b/pkg/controllers/mpi_job_controller.go index c3807a91..7b06688e 100644 --- a/pkg/controllers/mpi_job_controller.go +++ b/pkg/controllers/mpi_job_controller.go @@ -773,8 +773,10 @@ shift // If no processing unit is specified, default to 1 slot. slots := 1 - if mpiJob.Spec.SlotsPerWorker == nil && processingUnitsPerWorker > 0 { - slots = processingUnitsPerWorker + if mpiJob.Spec.SlotsPerWorker == nil { + if processingUnitsPerWorker > 0 { + slots = processingUnitsPerWorker + } } else { slots = int(*mpiJob.Spec.SlotsPerWorker) } From e97a3d1fe729d27cb22ffb3d1a236b2d72ec8caf Mon Sep 17 00:00:00 2001 From: terrytangyuan Date: Mon, 14 Jan 2019 20:02:25 -0500 Subject: [PATCH 6/7] Added tests for cpu resources --- pkg/controllers/mpi_job_controller_test.go | 131 ++++++++++++++------- 1 file changed, 90 insertions(+), 41 deletions(-) diff --git a/pkg/controllers/mpi_job_controller_test.go b/pkg/controllers/mpi_job_controller_test.go index 883641d9..a410fb2e 100644 --- a/pkg/controllers/mpi_job_controller_test.go +++ b/pkg/controllers/mpi_job_controller_test.go @@ -99,7 +99,30 @@ func newMPIJob(name string, gpus *int32) *kubeflow.MPIJob { } } -func newMPIJobWithCustomResources(name string, replicas *int32, gpusPerReplica int64) *kubeflow.MPIJob { +func newMPIJobWithCPUs(name string, cpus *int32) *kubeflow.MPIJob { + return &kubeflow.MPIJob{ + TypeMeta: metav1.TypeMeta{APIVersion: kubeflow.SchemeGroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: metav1.NamespaceDefault, + }, + Spec: kubeflow.MPIJobSpec{ + ProcessingUnits: cpus, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "foo", + Image: "bar", + }, + }, + }, + }, + }, + } +} + +func newMPIJobWithCustomResources(name string, replicas *int32, pusPerReplica int64, processingResourceType string) *kubeflow.MPIJob { return &kubeflow.MPIJob{ TypeMeta: metav1.TypeMeta{APIVersion: kubeflow.SchemeGroupVersion.String()}, ObjectMeta: metav1.ObjectMeta{ @@ -116,7 +139,7 @@ func newMPIJobWithCustomResources(name string, replicas *int32, gpusPerReplica i Image: "bar", Resources: corev1.ResourceRequirements{ Limits: corev1.ResourceList{ - "nvidia.com/gpu": *resource.NewQuantity(gpusPerReplica, resource.DecimalExponent), + convertProcessingResourceType(processingResourceType): *resource.NewQuantity(pusPerReplica, resource.DecimalExponent), }, }, }, @@ -127,7 +150,7 @@ func newMPIJobWithCustomResources(name string, replicas *int32, gpusPerReplica i } } -func (f *fixture) newController() (*MPIJobController, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) { +func (f *fixture) newController(processingResourceType string) (*MPIJobController, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) { f.client = fake.NewSimpleClientset(f.objects...) f.kubeClient = k8sfake.NewSimpleClientset(f.kubeObjects...) @@ -146,7 +169,7 @@ func (f *fixture) newController() (*MPIJobController, informers.SharedInformerFa i.Kubeflow().V1alpha1().MPIJobs(), 8, 8, - gpuResourceName, + processingResourceType, "kubectl-delivery") c.configMapSynced = alwaysReady @@ -189,16 +212,16 @@ func (f *fixture) newController() (*MPIJobController, informers.SharedInformerFa return c, i, k8sI } -func (f *fixture) run(mpiJobName string) { - f.runController(mpiJobName, true, false) +func (f *fixture) run(mpiJobName string, processingResourceType string) { + f.runController(mpiJobName, true, false, processingResourceType) } -func (f *fixture) runExpectError(mpiJobName string) { - f.runController(mpiJobName, true, true) +func (f *fixture) runExpectError(mpiJobName string, processingResourceType string) { + f.runController(mpiJobName, true, true, processingResourceType) } -func (f *fixture) runController(mpiJobName string, startInformers bool, expectError bool) { - c, i, k8sI := f.newController() +func (f *fixture) runController(mpiJobName string, startInformers bool, expectError bool, processingResourceType string) { + c, i, k8sI := f.newController(processingResourceType) if startInformers { stopCh := make(chan struct{}) defer close(stopCh) @@ -431,13 +454,13 @@ func getKey(mpiJob *kubeflow.MPIJob, t *testing.T) string { func TestDoNothingWithInvalidKey(t *testing.T) { f := newFixture(t) - f.run("foo/bar/baz") + f.run("foo/bar/baz", gpuResourceName) } func TestDoNothingWithNonexistentMPIJob(t *testing.T) { f := newFixture(t) mpiJob := newMPIJob("test", int32Ptr(64)) - f.run(getKey(mpiJob, t)) + f.run(getKey(mpiJob, t), gpuResourceName) } func TestLauncherNotControlledByUs(t *testing.T) { @@ -450,7 +473,7 @@ func TestLauncherNotControlledByUs(t *testing.T) { launcher.OwnerReferences = nil f.setUpLauncher(launcher) - f.runExpectError(getKey(mpiJob, t)) + f.runExpectError(getKey(mpiJob, t), gpuResourceName) } func TestLauncherSucceeded(t *testing.T) { @@ -467,7 +490,7 @@ func TestLauncherSucceeded(t *testing.T) { mpiJobCopy.Status.LauncherStatus = kubeflow.LauncherSucceeded f.expectUpdateMPIJobStatusAction(mpiJobCopy) - f.run(getKey(mpiJob, t)) + f.run(getKey(mpiJob, t), gpuResourceName) } func TestLauncherFailed(t *testing.T) { @@ -484,7 +507,7 @@ func TestLauncherFailed(t *testing.T) { mpiJobCopy.Status.LauncherStatus = kubeflow.LauncherFailed f.expectUpdateMPIJobStatusAction(mpiJobCopy) - f.run(getKey(mpiJob, t)) + f.run(getKey(mpiJob, t), gpuResourceName) } func TestLauncherDoesNotExist(t *testing.T) { @@ -512,35 +535,38 @@ func TestLauncherDoesNotExist(t *testing.T) { mpiJobCopy.Status.WorkerReplicas = 0 f.expectUpdateMPIJobStatusAction(mpiJobCopy) - f.run(getKey(mpiJob, t)) + f.run(getKey(mpiJob, t), gpuResourceName) } func TestLauncherDoesNotExistWithCustomResources(t *testing.T) { - f := newFixture(t) + resourceNames := []string{cpuResourceName, gpuResourceName} + for _, resourceName := range resourceNames { + f := newFixture(t) - mpiJob := newMPIJobWithCustomResources("test", int32Ptr(4), 4) - f.setUpMPIJob(mpiJob) + mpiJob := newMPIJobWithCustomResources("test", int32Ptr(4), 4, resourceName) + f.setUpMPIJob(mpiJob) - expConfigMap := newConfigMap(mpiJob, 4, 4) - f.expectCreateConfigMapAction(expConfigMap) + expConfigMap := newConfigMap(mpiJob, 4, 4) + f.expectCreateConfigMapAction(expConfigMap) - expServiceAccount := newLauncherServiceAccount(mpiJob) - f.expectCreateServiceAccountAction(expServiceAccount) + expServiceAccount := newLauncherServiceAccount(mpiJob) + f.expectCreateServiceAccountAction(expServiceAccount) - expRole := newLauncherRole(mpiJob, 4) - f.expectCreateRoleAction(expRole) + expRole := newLauncherRole(mpiJob, 4) + f.expectCreateRoleAction(expRole) - expRoleBinding := newLauncherRoleBinding(mpiJob) - f.expectCreateRoleBindingAction(expRoleBinding) + expRoleBinding := newLauncherRoleBinding(mpiJob) + f.expectCreateRoleBindingAction(expRoleBinding) - expWorker := newWorker(mpiJob, 4, 4, gpuResourceName) - f.expectCreateStatefulSetAction(expWorker) + expWorker := newWorker(mpiJob, 4, 4, resourceName) + f.expectCreateStatefulSetAction(expWorker) - mpiJobCopy := mpiJob.DeepCopy() - mpiJobCopy.Status.WorkerReplicas = 0 - f.expectUpdateMPIJobStatusAction(mpiJobCopy) + mpiJobCopy := mpiJob.DeepCopy() + mpiJobCopy.Status.WorkerReplicas = 0 + f.expectUpdateMPIJobStatusAction(mpiJobCopy) - f.run(getKey(mpiJob, t)) + f.run(getKey(mpiJob, t), resourceName) + } } func TestConfigMapNotControlledByUs(t *testing.T) { @@ -553,7 +579,7 @@ func TestConfigMapNotControlledByUs(t *testing.T) { configMap.OwnerReferences = nil f.setUpConfigMap(configMap) - f.runExpectError(getKey(mpiJob, t)) + f.runExpectError(getKey(mpiJob, t), gpuResourceName) } func TestServiceAccountNotControlledByUs(t *testing.T) { @@ -568,7 +594,7 @@ func TestServiceAccountNotControlledByUs(t *testing.T) { serviceAccount.OwnerReferences = nil f.setUpServiceAccount(serviceAccount) - f.runExpectError(getKey(mpiJob, t)) + f.runExpectError(getKey(mpiJob, t), gpuResourceName) } func TestRoleNotControlledByUs(t *testing.T) { @@ -584,7 +610,7 @@ func TestRoleNotControlledByUs(t *testing.T) { role.OwnerReferences = nil f.setUpRole(role) - f.runExpectError(getKey(mpiJob, t)) + f.runExpectError(getKey(mpiJob, t), gpuResourceName) } func TestRoleBindingNotControlledByUs(t *testing.T) { @@ -601,7 +627,7 @@ func TestRoleBindingNotControlledByUs(t *testing.T) { roleBinding.OwnerReferences = nil f.setUpRoleBinding(roleBinding) - f.runExpectError(getKey(mpiJob, t)) + f.runExpectError(getKey(mpiJob, t), gpuResourceName) } func TestShutdownWorker(t *testing.T) { @@ -625,7 +651,7 @@ func TestShutdownWorker(t *testing.T) { mpiJobCopy.Status.LauncherStatus = kubeflow.LauncherSucceeded f.expectUpdateMPIJobStatusAction(mpiJobCopy) - f.run(getKey(mpiJob, t)) + f.run(getKey(mpiJob, t), gpuResourceName) } func TestWorkerNotControlledByUs(t *testing.T) { @@ -641,7 +667,7 @@ func TestWorkerNotControlledByUs(t *testing.T) { worker.OwnerReferences = nil f.setUpWorker(worker) - f.runExpectError(getKey(mpiJob, t)) + f.runExpectError(getKey(mpiJob, t), gpuResourceName) } func TestLauncherActive(t *testing.T) { @@ -664,7 +690,7 @@ func TestLauncherActive(t *testing.T) { mpiJobCopy.Status.LauncherStatus = kubeflow.LauncherActive f.expectUpdateMPIJobStatusAction(mpiJobCopy) - f.run(getKey(mpiJob, t)) + f.run(getKey(mpiJob, t), gpuResourceName) } func TestWorkerReady(t *testing.T) { @@ -687,7 +713,30 @@ func TestWorkerReady(t *testing.T) { mpiJobCopy.Status.WorkerReplicas = 2 f.expectUpdateMPIJobStatusAction(mpiJobCopy) - f.run(getKey(mpiJob, t)) + f.run(getKey(mpiJob, t), gpuResourceName) +} + +func TestWorkerReadyWithCPUs(t *testing.T) { + f := newFixture(t) + + mpiJob := newMPIJobWithCPUs("test", int32Ptr(16)) + f.setUpMPIJob(mpiJob) + + f.setUpConfigMap(newConfigMap(mpiJob, 2, 8)) + f.setUpRbac(mpiJob, 2) + + worker := newWorker(mpiJob, 2, 8, cpuResourceName) + worker.Status.ReadyReplicas = 2 + f.setUpWorker(worker) + + expLauncher := newLauncher(mpiJob, "kubectl-delivery") + f.expectCreateJobAction(expLauncher) + + mpiJobCopy := mpiJob.DeepCopy() + mpiJobCopy.Status.WorkerReplicas = 2 + f.expectUpdateMPIJobStatusAction(mpiJobCopy) + + f.run(getKey(mpiJob, t), cpuResourceName) } func int32Ptr(i int32) *int32 { return &i } From badf580a9968d526f4fa141a240348cb790cde98 Mon Sep 17 00:00:00 2001 From: terrytangyuan Date: Tue, 15 Jan 2019 16:10:59 -0500 Subject: [PATCH 7/7] Added validation for new fields --- deploy/0-crd.yaml | 32 ++++++++++++++++++++++++++++- pkg/apis/kubeflow/v1alpha1/types.go | 2 +- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/deploy/0-crd.yaml b/deploy/0-crd.yaml index 29ce6301..b0b79943 100644 --- a/deploy/0-crd.yaml +++ b/deploy/0-crd.yaml @@ -18,7 +18,7 @@ spec: properties: spec: title: The MPIJob spec - description: Either `gpus` or `replicas` should be specified, but not both + description: Only one of `gpus`, `processingUnits`, or `replicas` should be specified oneOf: - properties: gpus: @@ -33,13 +33,43 @@ spec: - type: integer multipleOf: 8 minimum: 8 + slotsPerWorker: + title: The number of slots per worker used in hostfile + description: Defaults to the number of processing units per worker + type: integer + minimum: 1 required: - gpus + - properties: + processingUnits: + title: Total number of processing units + description: Valid values are 1, 2, 4, or any multiple of 8 + oneOf: + - type: integer + enum: + - 1 + - 2 + - 4 + - type: integer + multipleOf: 8 + minimum: 8 + slotsPerWorker: + title: The number of slots per worker used in hostfile + description: Defaults to the number of processing units per worker + type: integer + minimum: 1 + required: + - processingUnits - properties: replicas: title: Total number of replicas description: The GPU resource limit should be specified for each replica type: integer minimum: 1 + slotsPerWorker: + title: The number of slots per worker used in hostfile + description: Defaults to the number of processing units per worker + type: integer + minimum: 1 required: - replicas diff --git a/pkg/apis/kubeflow/v1alpha1/types.go b/pkg/apis/kubeflow/v1alpha1/types.go index 21f2b0db..e90f14e4 100644 --- a/pkg/apis/kubeflow/v1alpha1/types.go +++ b/pkg/apis/kubeflow/v1alpha1/types.go @@ -72,7 +72,7 @@ type MPIJobSpec struct { // Specifies the desired number of replicas the MPIJob should run on. // The `PodSpec` should specify the number of GPUs. - // Mutually exclusive with the `GPUs` field. + // Mutually exclusive with the `GPUs` or `ProcessingUnits` fields. // +optional Replicas *int32 `json:"replicas,omitempty"`