diff --git a/cmd/mpi-operator/main.go b/cmd/mpi-operator/main.go index d1ae7161..07aaed40 100644 --- a/cmd/mpi-operator/main.go +++ b/cmd/mpi-operator/main.go @@ -29,11 +29,13 @@ import ( ) var ( - masterURL string - kubeConfig string - gpusPerNode int - kubectlDeliveryImage string - namespace string + masterURL string + kubeConfig string + gpusPerNode int + processingUnitsPerNode int + processingResourceType string + kubectlDeliveryImage string + namespace string ) func main() { @@ -78,6 +80,8 @@ func main() { kubeInformerFactory.Batch().V1().Jobs(), kubeflowInformerFactory.Kubeflow().V1alpha1().MPIJobs(), gpusPerNode, + processingUnitsPerNode, + processingResourceType, kubectlDeliveryImage) go kubeInformerFactory.Start(stopCh) @@ -98,4 +102,10 @@ func init() { "The maximum number of GPUs available per node. Note that this will be ignored if the GPU resources are explicitly specified in the MPIJob pod spec.") flag.StringVar(&kubectlDeliveryImage, "kubectl-delivery-image", "", "The container image used to deliver the kubectl binary.") flag.StringVar(&namespace, "namespace", "", "The namespace used to obtain the listers.") + flag.IntVar( + &processingUnitsPerNode, + "processing-units-per-node", + 1, + "The maximum number of processing units available per node. Note that this will be ignored if the processing resources are explicitly specified in the MPIJob pod spec.") + flag.StringVar(&processingResourceType, "processing-resource-type", "nvidia.com/gpu", "The compute resource name, e.g. 'nvidia.com/gpu' or 'cpu'.") } diff --git a/deploy/0-crd.yaml b/deploy/0-crd.yaml index 29ce6301..b0b79943 100644 --- a/deploy/0-crd.yaml +++ b/deploy/0-crd.yaml @@ -18,7 +18,7 @@ spec: properties: spec: title: The MPIJob spec - description: Either `gpus` or `replicas` should be specified, but not both + description: Only one of `gpus`, `processingUnits`, or `replicas` should be specified oneOf: - properties: gpus: @@ -33,13 +33,43 @@ spec: - type: integer multipleOf: 8 minimum: 8 + slotsPerWorker: + title: The number of slots per worker used in hostfile + description: Defaults to the number of processing units per worker + type: integer + minimum: 1 required: - gpus + - properties: + processingUnits: + title: Total number of processing units + description: Valid values are 1, 2, 4, or any multiple of 8 + oneOf: + - type: integer + enum: + - 1 + - 2 + - 4 + - type: integer + multipleOf: 8 + minimum: 8 + slotsPerWorker: + title: The number of slots per worker used in hostfile + description: Defaults to the number of processing units per worker + type: integer + minimum: 1 + required: + - processingUnits - properties: replicas: title: Total number of replicas description: The GPU resource limit should be specified for each replica type: integer minimum: 1 + slotsPerWorker: + title: The number of slots per worker used in hostfile + description: Defaults to the number of processing units per worker + type: integer + minimum: 1 required: - replicas diff --git a/pkg/apis/kubeflow/v1alpha1/types.go b/pkg/apis/kubeflow/v1alpha1/types.go index b62938bc..e90f14e4 100644 --- a/pkg/apis/kubeflow/v1alpha1/types.go +++ b/pkg/apis/kubeflow/v1alpha1/types.go @@ -40,16 +40,27 @@ type MPIJobList struct { type MPIJobSpec struct { // Specifies the desired number of GPUs the MPIJob should run on. // Mutually exclusive with the `Replicas` field. + // Note that this is deprecated in favor of `ProcessingUnits` field. // +optional GPUs *int32 `json:"gpus,omitempty"` + // Specifies the desired number of processing units the MPIJob should run on. + // Mutually exclusive with the `Replicas` field. + // +optional + ProcessingUnits *int32 `json:"processingUnits,omitempty"` + + // Specifies the number of slots per worker used in hostfile. + // Defaults to the number of processing units per worker. + // +optional + SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"` + // Run the launcher on the master. - // Optional: Default to false + // Defaults to false. // +optional LauncherOnMaster bool `json:"launcherOnMaster,omitempty"` // Specifies the number of retries before marking this job failed. - // Defaults to 6 + // Defaults to 6. // +optional BackoffLimit *int32 `json:"backoffLimit,omitempty"` @@ -61,7 +72,7 @@ type MPIJobSpec struct { // Specifies the desired number of replicas the MPIJob should run on. // The `PodSpec` should specify the number of GPUs. - // Mutually exclusive with the `GPUs` field. + // Mutually exclusive with the `GPUs` or `ProcessingUnits` fields. // +optional Replicas *int32 `json:"replicas,omitempty"` diff --git a/pkg/apis/kubeflow/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/kubeflow/v1alpha1/zz_generated.deepcopy.go index a04fd704..d37ad64d 100644 --- a/pkg/apis/kubeflow/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow/v1alpha1/zz_generated.deepcopy.go @@ -91,6 +91,16 @@ func (in *MPIJobSpec) DeepCopyInto(out *MPIJobSpec) { *out = new(int32) **out = **in } + if in.ProcessingUnits != nil { + in, out := &in.ProcessingUnits, &out.ProcessingUnits + *out = new(int32) + **out = **in + } + if in.SlotsPerWorker != nil { + in, out := &in.SlotsPerWorker, &out.SlotsPerWorker + *out = new(int32) + **out = **in + } if in.BackoffLimit != nil { in, out := &in.BackoffLimit, &out.BackoffLimit *out = new(int32) diff --git a/pkg/controllers/mpi_job_controller.go b/pkg/controllers/mpi_job_controller.go index 3458f445..7b06688e 100644 --- a/pkg/controllers/mpi_job_controller.go +++ b/pkg/controllers/mpi_job_controller.go @@ -67,6 +67,7 @@ const ( launcherSuffix = "-launcher" workerSuffix = "-worker" gpuResourceName = "nvidia.com/gpu" + cpuResourceName = "cpu" labelGroupName = "group_name" labelMPIJobName = "mpi_job_name" labelMPIRoleType = "mpi_role_type" @@ -125,6 +126,10 @@ type MPIJobController struct { recorder record.EventRecorder // The maximum number of GPUs per node. gpusPerNode int + // The maximum number of processing units per node. + processingUnitsPerNode int + // The processing resource name, e.g. "nvidia.com/gpu" or "cpu" + processingResourceType string // The container image used to deliver the kubectl binary. kubectlDeliveryImage string } @@ -141,6 +146,8 @@ func NewMPIJobController( jobInformer batchinformers.JobInformer, mpiJobInformer informers.MPIJobInformer, gpusPerNode int, + processingUnitsPerNode int, + processingResourceType string, kubectlDeliveryImage string) *MPIJobController { // Create event broadcaster. @@ -154,26 +161,28 @@ func NewMPIJobController( recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) controller := &MPIJobController{ - kubeClient: kubeClient, - kubeflowClient: kubeflowClient, - configMapLister: configMapInformer.Lister(), - configMapSynced: configMapInformer.Informer().HasSynced, - serviceAccountLister: serviceAccountInformer.Lister(), - serviceAccountSynced: serviceAccountInformer.Informer().HasSynced, - roleLister: roleInformer.Lister(), - roleSynced: roleInformer.Informer().HasSynced, - roleBindingLister: roleBindingInformer.Lister(), - roleBindingSynced: roleBindingInformer.Informer().HasSynced, - statefulSetLister: statefulSetInformer.Lister(), - statefulSetSynced: statefulSetInformer.Informer().HasSynced, - jobLister: jobInformer.Lister(), - jobSynced: jobInformer.Informer().HasSynced, - mpiJobLister: mpiJobInformer.Lister(), - mpiJobSynced: mpiJobInformer.Informer().HasSynced, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MPIJobs"), - recorder: recorder, - gpusPerNode: gpusPerNode, - kubectlDeliveryImage: kubectlDeliveryImage, + kubeClient: kubeClient, + kubeflowClient: kubeflowClient, + configMapLister: configMapInformer.Lister(), + configMapSynced: configMapInformer.Informer().HasSynced, + serviceAccountLister: serviceAccountInformer.Lister(), + serviceAccountSynced: serviceAccountInformer.Informer().HasSynced, + roleLister: roleInformer.Lister(), + roleSynced: roleInformer.Informer().HasSynced, + roleBindingLister: roleBindingInformer.Lister(), + roleBindingSynced: roleBindingInformer.Informer().HasSynced, + statefulSetLister: statefulSetInformer.Lister(), + statefulSetSynced: statefulSetInformer.Informer().HasSynced, + jobLister: jobInformer.Lister(), + jobSynced: jobInformer.Informer().HasSynced, + mpiJobLister: mpiJobInformer.Lister(), + mpiJobSynced: mpiJobInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MPIJobs"), + recorder: recorder, + gpusPerNode: gpusPerNode, + processingUnitsPerNode: processingUnitsPerNode, + processingResourceType: processingResourceType, + kubectlDeliveryImage: kubectlDeliveryImage, } glog.Info("Setting up event handlers") @@ -406,7 +415,7 @@ func (c *MPIJobController) syncHandler(key string) error { // We're done if the launcher either succeeded or failed. done := launcher != nil && (launcher.Status.Succeeded == 1 || launcher.Status.Failed == 1) - workerReplicas, gpusPerWorker, err := allocateGPUs(mpiJob, c.gpusPerNode, done) + workerReplicas, processingUnitsPerWorker, err := allocateProcessingUnits(mpiJob, c.gpusPerNode, c.processingUnitsPerNode, c.processingResourceType, done) if err != nil { runtime.HandleError(err) return nil @@ -414,7 +423,7 @@ func (c *MPIJobController) syncHandler(key string) error { if !done { // Get the ConfigMap for this MPIJob. - if config, err := c.getOrCreateConfigMap(mpiJob, workerReplicas, gpusPerWorker); config == nil || err != nil { + if config, err := c.getOrCreateConfigMap(mpiJob, workerReplicas, processingUnitsPerWorker); config == nil || err != nil { return err } @@ -434,7 +443,7 @@ func (c *MPIJobController) syncHandler(key string) error { } } - worker, err := c.getOrCreateWorkerStatefulSet(mpiJob, workerReplicas, gpusPerWorker) + worker, err := c.getOrCreateWorkerStatefulSet(mpiJob, workerReplicas, processingUnitsPerWorker, c.processingResourceType) if err != nil { return err } @@ -483,45 +492,67 @@ func (c *MPIJobController) getLauncherJob(mpiJob *kubeflow.MPIJob) (*batchv1.Job return launcher, nil } -// allocateGPUs allocates the worker replicas and GPUs per worker. -func allocateGPUs(mpiJob *kubeflow.MPIJob, gpusPerNode int, done bool) (workerReplicas int, gpusPerWorker int, err error) { +// allocateProcessingUnits allocates the worker replicas and processing units per worker. +func allocateProcessingUnits( + mpiJob *kubeflow.MPIJob, + gpusPerNode int, + processingUnitsPerNode int, + processingResourceType string, + done bool) (workerReplicas int, processingUnitsPerWorker int, err error) { workerReplicas = 0 - gpusPerWorker = 0 + processingUnitsPerWorker = 0 err = nil - if mpiJob.Spec.GPUs != nil { - totalGPUs := int(*mpiJob.Spec.GPUs) - if totalGPUs < gpusPerNode { - workerReplicas = 1 - gpusPerWorker = totalGPUs - } else if totalGPUs%gpusPerNode == 0 { - workerReplicas = totalGPUs / gpusPerNode - gpusPerWorker = gpusPerNode + if mpiJob.Spec.GPUs != nil || mpiJob.Spec.ProcessingUnits != nil { + if mpiJob.Spec.ProcessingUnits != nil && mpiJob.Spec.GPUs != nil { + err = fmt.Errorf("Cannot specify both GPUs and ProcessingUnits at the same time") } else { - err = fmt.Errorf("specified #GPUs is not a multiple of GPUs per node (%d)", gpusPerNode) + totalProcessingUnits := 0 + usedSpecField := "" + pusPerNode := 0 + if mpiJob.Spec.GPUs != nil { + fmt.Println("GPUs field is deprecated. Please switch to use ProcessingUnits.") + totalProcessingUnits = int(*mpiJob.Spec.GPUs) + pusPerNode = gpusPerNode + usedSpecField = "GPUs" + } else if mpiJob.Spec.ProcessingUnits != nil { + totalProcessingUnits = int(*mpiJob.Spec.ProcessingUnits) + pusPerNode = processingUnitsPerNode + usedSpecField = "ProcessingUnits" + } + if totalProcessingUnits < pusPerNode { + workerReplicas = 1 + processingUnitsPerWorker = totalProcessingUnits + } else if totalProcessingUnits%pusPerNode == 0 { + workerReplicas = totalProcessingUnits / pusPerNode + processingUnitsPerWorker = pusPerNode + } else { + err = fmt.Errorf( + "specified #%s is not a multiple of %s per node (%d)", usedSpecField, usedSpecField, processingUnitsPerNode) + } } } else if mpiJob.Spec.Replicas != nil { workerReplicas = int(*mpiJob.Spec.Replicas) container := mpiJob.Spec.Template.Spec.Containers[0] if container.Resources.Limits != nil { - if val, ok := container.Resources.Limits[gpuResourceName]; ok { - gpus, _ := val.AsInt64() - gpusPerWorker = int(gpus) + if val, ok := container.Resources.Limits[convertProcessingResourceType(processingResourceType)]; ok { + processingUnits, _ := val.AsInt64() + processingUnitsPerWorker = int(processingUnits) } } } if done { workerReplicas = 0 } - return workerReplicas, gpusPerWorker, err + return workerReplicas, processingUnitsPerWorker, err } // getOrCreateConfigMap gets the ConfigMap controlled by this MPIJob, or creates // one if it doesn't exist. -func (c *MPIJobController) getOrCreateConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int, gpusPerWorker int) (*corev1.ConfigMap, error) { +func (c *MPIJobController) getOrCreateConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int, processingUnitsPerWorker int) (*corev1.ConfigMap, error) { cm, err := c.configMapLister.ConfigMaps(mpiJob.Namespace).Get(mpiJob.Name + configSuffix) // If the ConfigMap doesn't exist, we'll create it. if errors.IsNotFound(err) { - cm, err = c.kubeClient.CoreV1().ConfigMaps(mpiJob.Namespace).Create(newConfigMap(mpiJob, workerReplicas, gpusPerWorker)) + cm, err = c.kubeClient.CoreV1().ConfigMaps(mpiJob.Namespace).Create(newConfigMap(mpiJob, workerReplicas, processingUnitsPerWorker)) } // If an error occurs during Get/Create, we'll requeue the item so we // can attempt processing again later. This could have been caused by a @@ -616,11 +647,11 @@ func (c *MPIJobController) getLauncherRoleBinding(mpiJob *kubeflow.MPIJob) (*rba // getOrCreateWorkerStatefulSet gets the worker StatefulSet controlled by this // MPIJob, or creates one if it doesn't exist. -func (c *MPIJobController) getOrCreateWorkerStatefulSet(mpiJob *kubeflow.MPIJob, workerReplicas int, gpusPerWorker int) (*appsv1.StatefulSet, error) { +func (c *MPIJobController) getOrCreateWorkerStatefulSet(mpiJob *kubeflow.MPIJob, workerReplicas int, processingUnitsPerWorker int, processingResourceType string) (*appsv1.StatefulSet, error) { worker, err := c.statefulSetLister.StatefulSets(mpiJob.Namespace).Get(mpiJob.Name + workerSuffix) // If the StatefulSet doesn't exist, we'll create it. if errors.IsNotFound(err) && workerReplicas > 0 { - worker, err = c.kubeClient.AppsV1().StatefulSets(mpiJob.Namespace).Create(newWorker(mpiJob, int32(workerReplicas), gpusPerWorker)) + worker, err = c.kubeClient.AppsV1().StatefulSets(mpiJob.Namespace).Create(newWorker(mpiJob, int32(workerReplicas), processingUnitsPerWorker, processingResourceType)) } // If an error occurs during Get/Create, we'll requeue the item so we // can attempt processing again later. This could have been caused by a @@ -639,7 +670,7 @@ func (c *MPIJobController) getOrCreateWorkerStatefulSet(mpiJob *kubeflow.MPIJob, // If the worker is out of date, update the worker. if worker != nil && int(*worker.Spec.Replicas) != workerReplicas { - worker, err = c.kubeClient.AppsV1().StatefulSets(mpiJob.Namespace).Update(newWorker(mpiJob, int32(workerReplicas), gpusPerWorker)) + worker, err = c.kubeClient.AppsV1().StatefulSets(mpiJob.Namespace).Update(newWorker(mpiJob, int32(workerReplicas), processingUnitsPerWorker, processingResourceType)) // If an error occurs during Update, we'll requeue the item so we can // attempt processing again later. This could have been caused by a // temporary network failure, or any other transient reason. @@ -732,7 +763,7 @@ func (c *MPIJobController) handleObject(obj interface{}) { // newConfigMap creates a new ConfigMap containing configurations for an MPIJob // resource. It also sets the appropriate OwnerReferences on the resource so // handleObject can discover the MPIJob resource that 'owns' it. -func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int, gpusPerWorker int) *corev1.ConfigMap { +func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int, processingUnitsPerWorker int) *corev1.ConfigMap { kubexec := fmt.Sprintf(`#!/bin/sh set -x POD_NAME=$1 @@ -740,10 +771,14 @@ shift %s/kubectl exec ${POD_NAME} -- /bin/sh -c "$*" `, kubectlMountPath) - // If no GPU is specified, default to 1 slot. + // If no processing unit is specified, default to 1 slot. slots := 1 - if gpusPerWorker > 0 { - slots = gpusPerWorker + if mpiJob.Spec.SlotsPerWorker == nil { + if processingUnitsPerWorker > 0 { + slots = processingUnitsPerWorker + } + } else { + slots = int(*mpiJob.Spec.SlotsPerWorker) } var buffer bytes.Buffer for i := 0; i < workerReplicas; i++ { @@ -850,10 +885,23 @@ func newLauncherRoleBinding(mpiJob *kubeflow.MPIJob) *rbacv1.RoleBinding { } } +func convertProcessingResourceType(processingResourceType string) corev1.ResourceName { + if processingResourceType == gpuResourceName { + return gpuResourceName + } else if processingResourceType == cpuResourceName { + return cpuResourceName + } + fmt.Printf( + "Unsupported processing resource type specified: %q. \nSwitching to use NVIDIA GPU by default.", + processingResourceType) + + return gpuResourceName +} + // newWorker creates a new worker StatefulSet for an MPIJob resource. It also // sets the appropriate OwnerReferences on the resource so handleObject can // discover the MPIJob resource that 'owns' it. -func newWorker(mpiJob *kubeflow.MPIJob, desiredReplicas int32, gpus int) *appsv1.StatefulSet { +func newWorker(mpiJob *kubeflow.MPIJob, desiredReplicas int32, processingUnits int, processingResourceType string) *appsv1.StatefulSet { labels := map[string]string{ labelGroupName: "kubeflow.org", labelMPIJobName: mpiJob.Name, @@ -878,7 +926,7 @@ func newWorker(mpiJob *kubeflow.MPIJob, desiredReplicas int32, gpus int) *appsv1 if container.Resources.Limits == nil { container.Resources.Limits = make(corev1.ResourceList) } - container.Resources.Limits[gpuResourceName] = *resource.NewQuantity(int64(gpus), resource.DecimalExponent) + container.Resources.Limits[convertProcessingResourceType(processingResourceType)] = *resource.NewQuantity(int64(processingUnits), resource.DecimalExponent) // We need the kubexec.sh script here because Open MPI checks for the path // in every rank. diff --git a/pkg/controllers/mpi_job_controller_test.go b/pkg/controllers/mpi_job_controller_test.go index eb6ce631..a410fb2e 100644 --- a/pkg/controllers/mpi_job_controller_test.go +++ b/pkg/controllers/mpi_job_controller_test.go @@ -99,7 +99,30 @@ func newMPIJob(name string, gpus *int32) *kubeflow.MPIJob { } } -func newMPIJobWithCustomResources(name string, replicas *int32, gpusPerReplica int64) *kubeflow.MPIJob { +func newMPIJobWithCPUs(name string, cpus *int32) *kubeflow.MPIJob { + return &kubeflow.MPIJob{ + TypeMeta: metav1.TypeMeta{APIVersion: kubeflow.SchemeGroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: metav1.NamespaceDefault, + }, + Spec: kubeflow.MPIJobSpec{ + ProcessingUnits: cpus, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "foo", + Image: "bar", + }, + }, + }, + }, + }, + } +} + +func newMPIJobWithCustomResources(name string, replicas *int32, pusPerReplica int64, processingResourceType string) *kubeflow.MPIJob { return &kubeflow.MPIJob{ TypeMeta: metav1.TypeMeta{APIVersion: kubeflow.SchemeGroupVersion.String()}, ObjectMeta: metav1.ObjectMeta{ @@ -116,7 +139,7 @@ func newMPIJobWithCustomResources(name string, replicas *int32, gpusPerReplica i Image: "bar", Resources: corev1.ResourceRequirements{ Limits: corev1.ResourceList{ - "nvidia.com/gpu": *resource.NewQuantity(gpusPerReplica, resource.DecimalExponent), + convertProcessingResourceType(processingResourceType): *resource.NewQuantity(pusPerReplica, resource.DecimalExponent), }, }, }, @@ -127,7 +150,7 @@ func newMPIJobWithCustomResources(name string, replicas *int32, gpusPerReplica i } } -func (f *fixture) newController() (*MPIJobController, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) { +func (f *fixture) newController(processingResourceType string) (*MPIJobController, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) { f.client = fake.NewSimpleClientset(f.objects...) f.kubeClient = k8sfake.NewSimpleClientset(f.kubeObjects...) @@ -145,6 +168,8 @@ func (f *fixture) newController() (*MPIJobController, informers.SharedInformerFa k8sI.Batch().V1().Jobs(), i.Kubeflow().V1alpha1().MPIJobs(), 8, + 8, + processingResourceType, "kubectl-delivery") c.configMapSynced = alwaysReady @@ -187,16 +212,16 @@ func (f *fixture) newController() (*MPIJobController, informers.SharedInformerFa return c, i, k8sI } -func (f *fixture) run(mpiJobName string) { - f.runController(mpiJobName, true, false) +func (f *fixture) run(mpiJobName string, processingResourceType string) { + f.runController(mpiJobName, true, false, processingResourceType) } -func (f *fixture) runExpectError(mpiJobName string) { - f.runController(mpiJobName, true, true) +func (f *fixture) runExpectError(mpiJobName string, processingResourceType string) { + f.runController(mpiJobName, true, true, processingResourceType) } -func (f *fixture) runController(mpiJobName string, startInformers bool, expectError bool) { - c, i, k8sI := f.newController() +func (f *fixture) runController(mpiJobName string, startInformers bool, expectError bool, processingResourceType string) { + c, i, k8sI := f.newController(processingResourceType) if startInformers { stopCh := make(chan struct{}) defer close(stopCh) @@ -429,13 +454,13 @@ func getKey(mpiJob *kubeflow.MPIJob, t *testing.T) string { func TestDoNothingWithInvalidKey(t *testing.T) { f := newFixture(t) - f.run("foo/bar/baz") + f.run("foo/bar/baz", gpuResourceName) } func TestDoNothingWithNonexistentMPIJob(t *testing.T) { f := newFixture(t) mpiJob := newMPIJob("test", int32Ptr(64)) - f.run(getKey(mpiJob, t)) + f.run(getKey(mpiJob, t), gpuResourceName) } func TestLauncherNotControlledByUs(t *testing.T) { @@ -448,7 +473,7 @@ func TestLauncherNotControlledByUs(t *testing.T) { launcher.OwnerReferences = nil f.setUpLauncher(launcher) - f.runExpectError(getKey(mpiJob, t)) + f.runExpectError(getKey(mpiJob, t), gpuResourceName) } func TestLauncherSucceeded(t *testing.T) { @@ -465,7 +490,7 @@ func TestLauncherSucceeded(t *testing.T) { mpiJobCopy.Status.LauncherStatus = kubeflow.LauncherSucceeded f.expectUpdateMPIJobStatusAction(mpiJobCopy) - f.run(getKey(mpiJob, t)) + f.run(getKey(mpiJob, t), gpuResourceName) } func TestLauncherFailed(t *testing.T) { @@ -482,7 +507,7 @@ func TestLauncherFailed(t *testing.T) { mpiJobCopy.Status.LauncherStatus = kubeflow.LauncherFailed f.expectUpdateMPIJobStatusAction(mpiJobCopy) - f.run(getKey(mpiJob, t)) + f.run(getKey(mpiJob, t), gpuResourceName) } func TestLauncherDoesNotExist(t *testing.T) { @@ -503,42 +528,45 @@ func TestLauncherDoesNotExist(t *testing.T) { expRoleBinding := newLauncherRoleBinding(mpiJob) f.expectCreateRoleBindingAction(expRoleBinding) - expWorker := newWorker(mpiJob, 8, 8) + expWorker := newWorker(mpiJob, 8, 8, gpuResourceName) f.expectCreateStatefulSetAction(expWorker) mpiJobCopy := mpiJob.DeepCopy() mpiJobCopy.Status.WorkerReplicas = 0 f.expectUpdateMPIJobStatusAction(mpiJobCopy) - f.run(getKey(mpiJob, t)) + f.run(getKey(mpiJob, t), gpuResourceName) } func TestLauncherDoesNotExistWithCustomResources(t *testing.T) { - f := newFixture(t) + resourceNames := []string{cpuResourceName, gpuResourceName} + for _, resourceName := range resourceNames { + f := newFixture(t) - mpiJob := newMPIJobWithCustomResources("test", int32Ptr(4), 4) - f.setUpMPIJob(mpiJob) + mpiJob := newMPIJobWithCustomResources("test", int32Ptr(4), 4, resourceName) + f.setUpMPIJob(mpiJob) - expConfigMap := newConfigMap(mpiJob, 4, 4) - f.expectCreateConfigMapAction(expConfigMap) + expConfigMap := newConfigMap(mpiJob, 4, 4) + f.expectCreateConfigMapAction(expConfigMap) - expServiceAccount := newLauncherServiceAccount(mpiJob) - f.expectCreateServiceAccountAction(expServiceAccount) + expServiceAccount := newLauncherServiceAccount(mpiJob) + f.expectCreateServiceAccountAction(expServiceAccount) - expRole := newLauncherRole(mpiJob, 4) - f.expectCreateRoleAction(expRole) + expRole := newLauncherRole(mpiJob, 4) + f.expectCreateRoleAction(expRole) - expRoleBinding := newLauncherRoleBinding(mpiJob) - f.expectCreateRoleBindingAction(expRoleBinding) + expRoleBinding := newLauncherRoleBinding(mpiJob) + f.expectCreateRoleBindingAction(expRoleBinding) - expWorker := newWorker(mpiJob, 4, 4) - f.expectCreateStatefulSetAction(expWorker) + expWorker := newWorker(mpiJob, 4, 4, resourceName) + f.expectCreateStatefulSetAction(expWorker) - mpiJobCopy := mpiJob.DeepCopy() - mpiJobCopy.Status.WorkerReplicas = 0 - f.expectUpdateMPIJobStatusAction(mpiJobCopy) + mpiJobCopy := mpiJob.DeepCopy() + mpiJobCopy.Status.WorkerReplicas = 0 + f.expectUpdateMPIJobStatusAction(mpiJobCopy) - f.run(getKey(mpiJob, t)) + f.run(getKey(mpiJob, t), resourceName) + } } func TestConfigMapNotControlledByUs(t *testing.T) { @@ -551,7 +579,7 @@ func TestConfigMapNotControlledByUs(t *testing.T) { configMap.OwnerReferences = nil f.setUpConfigMap(configMap) - f.runExpectError(getKey(mpiJob, t)) + f.runExpectError(getKey(mpiJob, t), gpuResourceName) } func TestServiceAccountNotControlledByUs(t *testing.T) { @@ -566,7 +594,7 @@ func TestServiceAccountNotControlledByUs(t *testing.T) { serviceAccount.OwnerReferences = nil f.setUpServiceAccount(serviceAccount) - f.runExpectError(getKey(mpiJob, t)) + f.runExpectError(getKey(mpiJob, t), gpuResourceName) } func TestRoleNotControlledByUs(t *testing.T) { @@ -582,7 +610,7 @@ func TestRoleNotControlledByUs(t *testing.T) { role.OwnerReferences = nil f.setUpRole(role) - f.runExpectError(getKey(mpiJob, t)) + f.runExpectError(getKey(mpiJob, t), gpuResourceName) } func TestRoleBindingNotControlledByUs(t *testing.T) { @@ -599,7 +627,7 @@ func TestRoleBindingNotControlledByUs(t *testing.T) { roleBinding.OwnerReferences = nil f.setUpRoleBinding(roleBinding) - f.runExpectError(getKey(mpiJob, t)) + f.runExpectError(getKey(mpiJob, t), gpuResourceName) } func TestShutdownWorker(t *testing.T) { @@ -612,10 +640,10 @@ func TestShutdownWorker(t *testing.T) { launcher.Status.Succeeded = 1 f.setUpLauncher(launcher) - worker := newWorker(mpiJob, 8, 8) + worker := newWorker(mpiJob, 8, 8, gpuResourceName) f.setUpWorker(worker) - expWorker := newWorker(mpiJob, 0, 8) + expWorker := newWorker(mpiJob, 0, 8, gpuResourceName) f.expectUpdateStatefulSetAction(expWorker) mpiJobCopy := mpiJob.DeepCopy() @@ -623,7 +651,7 @@ func TestShutdownWorker(t *testing.T) { mpiJobCopy.Status.LauncherStatus = kubeflow.LauncherSucceeded f.expectUpdateMPIJobStatusAction(mpiJobCopy) - f.run(getKey(mpiJob, t)) + f.run(getKey(mpiJob, t), gpuResourceName) } func TestWorkerNotControlledByUs(t *testing.T) { @@ -635,11 +663,11 @@ func TestWorkerNotControlledByUs(t *testing.T) { f.setUpConfigMap(newConfigMap(mpiJob, 8, 8)) f.setUpRbac(mpiJob, 8) - worker := newWorker(mpiJob, 8, 8) + worker := newWorker(mpiJob, 8, 8, gpuResourceName) worker.OwnerReferences = nil f.setUpWorker(worker) - f.runExpectError(getKey(mpiJob, t)) + f.runExpectError(getKey(mpiJob, t), gpuResourceName) } func TestLauncherActive(t *testing.T) { @@ -655,14 +683,14 @@ func TestLauncherActive(t *testing.T) { launcher.Status.Active = 1 f.setUpLauncher(launcher) - worker := newWorker(mpiJob, 1, 8) + worker := newWorker(mpiJob, 1, 8, gpuResourceName) f.setUpWorker(worker) mpiJobCopy := mpiJob.DeepCopy() mpiJobCopy.Status.LauncherStatus = kubeflow.LauncherActive f.expectUpdateMPIJobStatusAction(mpiJobCopy) - f.run(getKey(mpiJob, t)) + f.run(getKey(mpiJob, t), gpuResourceName) } func TestWorkerReady(t *testing.T) { @@ -674,7 +702,30 @@ func TestWorkerReady(t *testing.T) { f.setUpConfigMap(newConfigMap(mpiJob, 2, 8)) f.setUpRbac(mpiJob, 2) - worker := newWorker(mpiJob, 2, 8) + worker := newWorker(mpiJob, 2, 8, gpuResourceName) + worker.Status.ReadyReplicas = 2 + f.setUpWorker(worker) + + expLauncher := newLauncher(mpiJob, "kubectl-delivery") + f.expectCreateJobAction(expLauncher) + + mpiJobCopy := mpiJob.DeepCopy() + mpiJobCopy.Status.WorkerReplicas = 2 + f.expectUpdateMPIJobStatusAction(mpiJobCopy) + + f.run(getKey(mpiJob, t), gpuResourceName) +} + +func TestWorkerReadyWithCPUs(t *testing.T) { + f := newFixture(t) + + mpiJob := newMPIJobWithCPUs("test", int32Ptr(16)) + f.setUpMPIJob(mpiJob) + + f.setUpConfigMap(newConfigMap(mpiJob, 2, 8)) + f.setUpRbac(mpiJob, 2) + + worker := newWorker(mpiJob, 2, 8, cpuResourceName) worker.Status.ReadyReplicas = 2 f.setUpWorker(worker) @@ -685,7 +736,7 @@ func TestWorkerReady(t *testing.T) { mpiJobCopy.Status.WorkerReplicas = 2 f.expectUpdateMPIJobStatusAction(mpiJobCopy) - f.run(getKey(mpiJob, t)) + f.run(getKey(mpiJob, t), cpuResourceName) } func int32Ptr(i int32) *int32 { return &i }