diff --git a/manifests/base/crd.yaml b/manifests/base/crd.yaml index 865f04358..491892ca1 100644 --- a/manifests/base/crd.yaml +++ b/manifests/base/crd.yaml @@ -106,6 +106,9 @@ spec: description: "Defines which Pods must be deleted after the Job completes" sshAuthMountPath: type: string + mpiImplementation: + type: string + enum: ["OpenMPI", "Intel"] mpiReplicaSpecs: type: object properties: diff --git a/v2/pkg/apis/kubeflow/v2beta1/default.go b/v2/pkg/apis/kubeflow/v2beta1/default.go index 3ad1e035f..724f9cb34 100644 --- a/v2/pkg/apis/kubeflow/v2beta1/default.go +++ b/v2/pkg/apis/kubeflow/v2beta1/default.go @@ -59,6 +59,9 @@ func SetDefaults_MPIJob(mpiJob *MPIJob) { if mpiJob.Spec.SSHAuthMountPath == "" { mpiJob.Spec.SSHAuthMountPath = "/root/.ssh" } + if mpiJob.Spec.MPIImplementation == "" { + mpiJob.Spec.MPIImplementation = MPIImplementationOpenMPI + } // set default to Launcher setDefaultsTypeLauncher(mpiJob.Spec.MPIReplicaSpecs[MPIReplicaTypeLauncher]) diff --git a/v2/pkg/apis/kubeflow/v2beta1/default_test.go b/v2/pkg/apis/kubeflow/v2beta1/default_test.go index ac746c51f..f6fb0e097 100644 --- a/v2/pkg/apis/kubeflow/v2beta1/default_test.go +++ b/v2/pkg/apis/kubeflow/v2beta1/default_test.go @@ -29,25 +29,28 @@ func TestSetDefaults_MPIJob(t *testing.T) { "base defaults": { want: MPIJob{ Spec: MPIJobSpec{ - SlotsPerWorker: newInt32(1), - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone), - SSHAuthMountPath: "/root/.ssh", + SlotsPerWorker: newInt32(1), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone), + SSHAuthMountPath: "/root/.ssh", + MPIImplementation: MPIImplementationOpenMPI, }, }, }, "base defaults overridden": { job: MPIJob{ Spec: MPIJobSpec{ - SlotsPerWorker: newInt32(10), - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), - SSHAuthMountPath: "/home/mpiuser/.ssh", + SlotsPerWorker: newInt32(10), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + SSHAuthMountPath: "/home/mpiuser/.ssh", + MPIImplementation: MPIImplementationIntel, }, }, want: MPIJob{ Spec: MPIJobSpec{ - SlotsPerWorker: newInt32(10), - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), - SSHAuthMountPath: "/home/mpiuser/.ssh", + SlotsPerWorker: newInt32(10), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + SSHAuthMountPath: "/home/mpiuser/.ssh", + MPIImplementation: MPIImplementationIntel, }, }, }, @@ -61,9 +64,10 @@ func TestSetDefaults_MPIJob(t *testing.T) { }, want: MPIJob{ Spec: MPIJobSpec{ - SlotsPerWorker: newInt32(1), - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone), - SSHAuthMountPath: "/root/.ssh", + SlotsPerWorker: newInt32(1), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone), + SSHAuthMountPath: "/root/.ssh", + MPIImplementation: MPIImplementationOpenMPI, MPIReplicaSpecs: map[MPIReplicaType]*common.ReplicaSpec{ MPIReplicaTypeLauncher: { Replicas: newInt32(1), @@ -83,9 +87,10 @@ func TestSetDefaults_MPIJob(t *testing.T) { }, want: MPIJob{ Spec: MPIJobSpec{ - SlotsPerWorker: newInt32(1), - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone), - SSHAuthMountPath: "/root/.ssh", + SlotsPerWorker: newInt32(1), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone), + SSHAuthMountPath: "/root/.ssh", + MPIImplementation: MPIImplementationOpenMPI, MPIReplicaSpecs: map[MPIReplicaType]*common.ReplicaSpec{ MPIReplicaTypeWorker: { Replicas: newInt32(0), diff --git a/v2/pkg/apis/kubeflow/v2beta1/types.go b/v2/pkg/apis/kubeflow/v2beta1/types.go index 27f100e53..ee258e4e9 100644 --- a/v2/pkg/apis/kubeflow/v2beta1/types.go +++ b/v2/pkg/apis/kubeflow/v2beta1/types.go @@ -55,6 +55,10 @@ type MPIJobSpec struct { // SSHAuthMountPath is the directory where SSH keys are mounted. // Defaults to "/root/.ssh". SSHAuthMountPath string `json:"sshAuthMountPath,omitempty"` + + // MPIImplementation is the MPI implementation. + // Options are "OpenMPI" (default) and "Intel". + MPIImplementation MPIImplementation `json:"mpiImplementation,omitempty"` } // MPIReplicaType is the type for MPIReplica. @@ -67,3 +71,10 @@ const ( // MPIReplicaTypeWorker is the type for worker replicas. MPIReplicaTypeWorker MPIReplicaType = "Worker" ) + +type MPIImplementation string + +const ( + MPIImplementationOpenMPI MPIImplementation = "OpenMPI" + MPIImplementationIntel MPIImplementation = "Intel" +) diff --git a/v2/pkg/apis/kubeflow/validation/validation.go b/v2/pkg/apis/kubeflow/validation/validation.go index abc7140a8..92a3c0335 100644 --- a/v2/pkg/apis/kubeflow/validation/validation.go +++ b/v2/pkg/apis/kubeflow/validation/validation.go @@ -27,10 +27,16 @@ import ( kubeflow "github.com/kubeflow/mpi-operator/v2/pkg/apis/kubeflow/v2beta1" ) -var validCleanPolicies = sets.NewString( - string(common.CleanPodPolicyNone), - string(common.CleanPodPolicyRunning), - string(common.CleanPodPolicyAll)) +var ( + validCleanPolicies = sets.NewString( + string(common.CleanPodPolicyNone), + string(common.CleanPodPolicyRunning), + string(common.CleanPodPolicyAll)) + + validMPIImplementations = sets.NewString( + string(kubeflow.MPIImplementationOpenMPI), + string(kubeflow.MPIImplementationIntel)) +) func ValidateMPIJob(job *kubeflow.MPIJob) field.ErrorList { errs := validateMPIJobName(job) @@ -68,6 +74,9 @@ func validateMPIJobSpec(spec *kubeflow.MPIJobSpec, path *field.Path) field.Error if spec.SSHAuthMountPath == "" { errs = append(errs, field.Required(path.Child("sshAuthMountPath"), "must have a mount path for SSH credentials")) } + if !validMPIImplementations.Has(string(spec.MPIImplementation)) { + errs = append(errs, field.NotSupported(path.Child("mpiImplementation"), spec.MPIImplementation, validMPIImplementations.List())) + } return errs } diff --git a/v2/pkg/apis/kubeflow/validation/validation_test.go b/v2/pkg/apis/kubeflow/validation/validation_test.go index d4999965f..8c8ef2618 100644 --- a/v2/pkg/apis/kubeflow/validation/validation_test.go +++ b/v2/pkg/apis/kubeflow/validation/validation_test.go @@ -37,9 +37,10 @@ func TestValidateMPIJob(t *testing.T) { Name: "foo", }, Spec: v2beta1.MPIJobSpec{ - SlotsPerWorker: newInt32(2), - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), - SSHAuthMountPath: "/home/mpiuser/.ssh", + SlotsPerWorker: newInt32(2), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + SSHAuthMountPath: "/home/mpiuser/.ssh", + MPIImplementation: v2beta1.MPIImplementationIntel, MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{ v2beta1.MPIReplicaTypeLauncher: { Replicas: newInt32(1), @@ -59,9 +60,10 @@ func TestValidateMPIJob(t *testing.T) { Name: "foo", }, Spec: v2beta1.MPIJobSpec{ - SlotsPerWorker: newInt32(2), - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), - SSHAuthMountPath: "/home/mpiuser/.ssh", + SlotsPerWorker: newInt32(2), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + SSHAuthMountPath: "/home/mpiuser/.ssh", + MPIImplementation: v2beta1.MPIImplementationIntel, MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{ v2beta1.MPIReplicaTypeLauncher: { Replicas: newInt32(1), @@ -105,6 +107,10 @@ func TestValidateMPIJob(t *testing.T) { Type: field.ErrorTypeRequired, Field: "spec.sshAuthMountPath", }, + &field.Error{ + Type: field.ErrorTypeNotSupported, + Field: "spec.mpiImplementation", + }, }, }, "invalid fields": { @@ -113,9 +119,10 @@ func TestValidateMPIJob(t *testing.T) { Name: "this-name-is-waaaaaaaay-too-long-for-a-worker-hostname", }, Spec: v2beta1.MPIJobSpec{ - SlotsPerWorker: newInt32(2), - CleanPodPolicy: newCleanPodPolicy("unknown"), - SSHAuthMountPath: "/root/.ssh", + SlotsPerWorker: newInt32(2), + CleanPodPolicy: newCleanPodPolicy("unknown"), + SSHAuthMountPath: "/root/.ssh", + MPIImplementation: v2beta1.MPIImplementation("Unknown"), MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{ v2beta1.MPIReplicaTypeLauncher: { Replicas: newInt32(1), @@ -145,6 +152,10 @@ func TestValidateMPIJob(t *testing.T) { Type: field.ErrorTypeNotSupported, Field: "spec.cleanPodPolicy", }, + { + Type: field.ErrorTypeNotSupported, + Field: "spec.mpiImplementation", + }, }, }, "empty replica specs": { @@ -153,10 +164,11 @@ func TestValidateMPIJob(t *testing.T) { Name: "foo", }, Spec: v2beta1.MPIJobSpec{ - SlotsPerWorker: newInt32(2), - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), - SSHAuthMountPath: "/root/.ssh", - MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{}, + SlotsPerWorker: newInt32(2), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + SSHAuthMountPath: "/root/.ssh", + MPIImplementation: v2beta1.MPIImplementationOpenMPI, + MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{}, }, }, wantErrs: field.ErrorList{ @@ -172,9 +184,10 @@ func TestValidateMPIJob(t *testing.T) { Name: "foo", }, Spec: v2beta1.MPIJobSpec{ - SlotsPerWorker: newInt32(2), - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), - SSHAuthMountPath: "/root/.ssh", + SlotsPerWorker: newInt32(2), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + SSHAuthMountPath: "/root/.ssh", + MPIImplementation: v2beta1.MPIImplementationOpenMPI, MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{ v2beta1.MPIReplicaTypeLauncher: {}, v2beta1.MPIReplicaTypeWorker: {}, @@ -206,9 +219,10 @@ func TestValidateMPIJob(t *testing.T) { Name: "foo", }, Spec: v2beta1.MPIJobSpec{ - SlotsPerWorker: newInt32(2), - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), - SSHAuthMountPath: "/root/.ssh", + SlotsPerWorker: newInt32(2), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + SSHAuthMountPath: "/root/.ssh", + MPIImplementation: v2beta1.MPIImplementationOpenMPI, MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{ v2beta1.MPIReplicaTypeLauncher: { Replicas: newInt32(2), diff --git a/v2/pkg/controller/mpi_job_controller.go b/v2/pkg/controller/mpi_job_controller.go index 237c2902b..48ef5f68f 100644 --- a/v2/pkg/controller/mpi_job_controller.go +++ b/v2/pkg/controller/mpi_job_controller.go @@ -111,6 +111,9 @@ const ( // eventMessageLimit is the maximum size of an Event's message. // From: k8s.io/kubernetes/pkg/apis/core/validation/events.go eventMessageLimit = 1024 + + openMPISlotsEnv = "OMPI_MCA_orte_set_default_slots" + intelMPISlotsEnv = "I_MPI_PERHOST" ) var ( @@ -158,6 +161,18 @@ var ( }, } + launcherEnvVars = []corev1.EnvVar{ + { + Name: "K_MPI_JOB_ROLE", + Value: launcher, + }, + } + workerEnvVars = []corev1.EnvVar{ + { + Name: "K_MPI_JOB_ROLE", + Value: worker, + }, + } ompiEnvVars = []corev1.EnvVar{ // Allows driver to reach workers through the Service. { @@ -168,8 +183,21 @@ var ( Name: "OMPI_MCA_orte_default_hostfile", Value: fmt.Sprintf("%s/%s", configMountPath, hostfileName), }, + { + Name: "OMPI_MCA_plm_rsh_args", + Value: "-o ConnectionAttempts=10", + }, + } + intelEnvVars = []corev1.EnvVar{ + { + Name: "I_MPI_HYDRA_HOST_FILE", + Value: fmt.Sprintf("%s/%s", configMountPath, hostfileName), + }, + { + Name: "I_MPI_HYDRA_BOOTSTRAP_EXEC_EXTRA_ARGS", + Value: "-o ConnectionAttempts=10", + }, } - nvidiaDisableEnvVars = []corev1.EnvVar{ {Name: "NVIDIA_VISIBLE_DEVICES"}, {Name: "NVIDIA_DRIVER_CAPABILITIES"}, @@ -526,7 +554,7 @@ func (c *MPIJobController) syncHandler(key string) error { if !done { isGPULauncher := isGPULauncher(mpiJob) - _, err := c.getOrCreateWorkersService(mpiJob) + _, err := c.getOrCreateService(mpiJob, newWorkersService(mpiJob)) if err != nil { return fmt.Errorf("getting or creating Service to front workers: %w", err) } @@ -551,6 +579,15 @@ func (c *MPIJobController) syncHandler(key string) error { if err != nil { return err } + if mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationIntel { + // The Intel implementation requires workers to communicate with the + // launcher through its hostname. For that, we create a Service which + // has the same name as the launcher's hostname. + _, err := c.getOrCreateService(mpiJob, newLauncherService(mpiJob)) + if err != nil { + return fmt.Errorf("getting or creating Service to front launcher: %w", err) + } + } if launcher == nil { launcher, err = c.kubeClient.CoreV1().Pods(namespace).Create(context.TODO(), c.newLauncher(mpiJob, isGPULauncher), metav1.CreateOptions{}) if err != nil { @@ -709,27 +746,20 @@ func (c *MPIJobController) getOrCreateConfigMap(mpiJob *kubeflow.MPIJob, isGPULa return cm, nil } -// getOrCreateWorkerService gets the workers' Service controlled by this MPIJob, -// or creates one if it doesn't exist. -func (c *MPIJobController) getOrCreateWorkersService(mpiJob *kubeflow.MPIJob) (*corev1.Service, error) { - svc, err := c.serviceLister.Services(mpiJob.Namespace).Get(mpiJob.Name + workerSuffix) +func (c *MPIJobController) getOrCreateService(job *kubeflow.MPIJob, newSvc *corev1.Service) (*corev1.Service, error) { + svc, err := c.serviceLister.Services(job.Namespace).Get(newSvc.Name) if errors.IsNotFound(err) { - return c.kubeClient.CoreV1().Services(mpiJob.Namespace).Create(context.TODO(), newWorkersService(mpiJob), metav1.CreateOptions{}) + return c.kubeClient.CoreV1().Services(job.Namespace).Create(context.TODO(), newSvc, metav1.CreateOptions{}) } - // If an error occurs during Get/Create, we'll requeue the item so we - // can attempt processing again later. This could have been caused by a - // temporary network failure, or any other transient reason. if err != nil { return nil, err } - // If the worker Service is not controlled by this MPIJob resource, we - // should log a warning to the event recorder and return. - if !metav1.IsControlledBy(svc, mpiJob) { + if !metav1.IsControlledBy(svc, job) { msg := fmt.Sprintf(MessageResourceExists, svc.Name, svc.Kind) - c.recorder.Event(mpiJob, corev1.EventTypeWarning, ErrResourceExists, msg) + c.recorder.Event(job, corev1.EventTypeWarning, ErrResourceExists, msg) return nil, fmt.Errorf(msg) } - newSvc := newWorkersService(mpiJob) + // If the Service selector is changed, update it. if !equality.Semantic.DeepEqual(svc.Spec.Selector, newSvc.Spec.Selector) { svc = svc.DeepCopy() @@ -1056,18 +1086,13 @@ func (c *MPIJobController) doUpdateJobStatus(mpiJob *kubeflow.MPIJob) error { // resource. It also sets the appropriate OwnerReferences on the resource so // handleObject can discover the MPIJob resource that 'owns' it. func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32, isGPULauncher bool) *corev1.ConfigMap { - // If no processing unit is specified, default to 1 slot. - slots := 1 - if mpiJob.Spec.SlotsPerWorker != nil { - slots = int(*mpiJob.Spec.SlotsPerWorker) - } var buffer bytes.Buffer workersService := mpiJob.Name + workerSuffix if isGPULauncher { - buffer.WriteString(fmt.Sprintf("%s%s.%s slots=%d\n", mpiJob.Name, launcherSuffix, workersService, slots)) + buffer.WriteString(fmt.Sprintf("%s%s.%s\n", mpiJob.Name, launcherSuffix, workersService)) } for i := 0; i < int(workerReplicas); i++ { - buffer.WriteString(fmt.Sprintf("%s%s-%d.%s slots=%d\n", mpiJob.Name, workerSuffix, i, workersService, slots)) + buffer.WriteString(fmt.Sprintf("%s%s-%d.%s\n", mpiJob.Name, workerSuffix, i, workersService)) } return &corev1.ConfigMap{ @@ -1112,26 +1137,35 @@ func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflo configMap.Data[discoverHostsScriptName] = buffer.String() } -// newWorkersService creates a new workers' Service for an MPIJob -// resource. -func newWorkersService(mpiJob *kubeflow.MPIJob) *corev1.Service { +// newWorkersService creates a new workers' Service for an MPIJob resource. +func newWorkersService(job *kubeflow.MPIJob) *corev1.Service { + return newService(job, job.Name+workerSuffix, map[string]string{ + // Selector doesn't include the role because the launcher could host ranks. + labelGroupName: "kubeflow.org", + labelMPIJobName: job.Name, + }) +} + +// newLauncherService creates a new launcher's Service for an MPIJob resource. +func newLauncherService(job *kubeflow.MPIJob) *corev1.Service { + return newService(job, job.Name+launcherSuffix, defaultLabels(job.Name, launcher)) +} + +func newService(job *kubeflow.MPIJob, name string, selector map[string]string) *corev1.Service { return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: mpiJob.Name + workerSuffix, - Namespace: mpiJob.Namespace, + Name: name, + Namespace: job.Namespace, Labels: map[string]string{ - "app": mpiJob.Name, + "app": job.Name, }, OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(mpiJob, kubeflow.SchemeGroupVersionKind), + *metav1.NewControllerRef(job, kubeflow.SchemeGroupVersionKind), }, }, Spec: corev1.ServiceSpec{ ClusterIP: corev1.ClusterIPNone, - Selector: map[string]string{ - labelGroupName: "kubeflow.org", - labelMPIJobName: mpiJob.Name, - }, + Selector: selector, }, } } @@ -1218,7 +1252,7 @@ func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1 if len(podTemplate.Labels) == 0 { podTemplate.Labels = make(map[string]string) } - for key, value := range defaultWorkerLabels(mpiJob.Name) { + for key, value := range defaultLabels(mpiJob.Name, worker) { podTemplate.Labels[key] = value } podTemplate.Labels[common.ReplicaIndexLabel] = strconv.Itoa(index) @@ -1230,6 +1264,7 @@ func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1 if len(container.Command) == 0 && len(container.Args) == 0 { container.Command = []string{"/usr/sbin/sshd", "-De"} } + container.Env = append(container.Env, workerEnvVars...) c.setupSSHOnPod(&podTemplate.Spec, mpiJob) // add SchedulerName to podSpec @@ -1265,18 +1300,13 @@ func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1 // the MPIJob resource that 'owns' it. func (c *MPIJobController) newLauncher(mpiJob *kubeflow.MPIJob, isGPULauncher bool) *corev1.Pod { launcherName := mpiJob.Name + launcherSuffix - defaultLabels := map[string]string{ - labelGroupName: "kubeflow.org", - labelMPIJobName: mpiJob.Name, - labelMPIRoleType: launcher, - } podTemplate := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.DeepCopy() // copy the labels and annotations to pod from PodTemplate if len(podTemplate.Labels) == 0 { podTemplate.Labels = make(map[string]string) } - for key, value := range defaultLabels { + for key, value := range defaultLabels(mpiJob.Name, launcher) { podTemplate.Labels[key] = value } // add SchedulerName to podSpec @@ -1295,7 +1325,22 @@ func (c *MPIJobController) newLauncher(mpiJob *kubeflow.MPIJob, isGPULauncher bo podTemplate.Spec.Hostname = launcherName podTemplate.Spec.Subdomain = mpiJob.Name + workerSuffix // Matches workers' Service name. container := &podTemplate.Spec.Containers[0] - container.Env = append(container.Env, ompiEnvVars...) + container.Env = append(container.Env, launcherEnvVars...) + slotsStr := strconv.Itoa(int(*mpiJob.Spec.SlotsPerWorker)) + switch mpiJob.Spec.MPIImplementation { + case kubeflow.MPIImplementationOpenMPI: + container.Env = append(container.Env, ompiEnvVars...) + container.Env = append(container.Env, corev1.EnvVar{ + Name: openMPISlotsEnv, + Value: slotsStr, + }) + case kubeflow.MPIImplementationIntel: + container.Env = append(container.Env, intelEnvVars...) + container.Env = append(container.Env, corev1.EnvVar{ + Name: intelMPISlotsEnv, + Value: slotsStr, + }) + } if !isGPULauncher { container.Env = append(container.Env, @@ -1396,16 +1441,16 @@ func isGPULauncher(mpiJob *kubeflow.MPIJob) bool { return false } -func defaultWorkerLabels(mpiJobName string) map[string]string { +func defaultLabels(jobName, role string) map[string]string { return map[string]string{ labelGroupName: "kubeflow.org", - labelMPIJobName: mpiJobName, - labelMPIRoleType: worker, + labelMPIJobName: jobName, + labelMPIRoleType: role, } } func workerSelector(mpiJobName string) (labels.Selector, error) { - set := defaultWorkerLabels(mpiJobName) + set := defaultLabels(mpiJobName, worker) return labels.ValidatedSelectorFromSet(set) } diff --git a/v2/pkg/controller/mpi_job_controller_test.go b/v2/pkg/controller/mpi_job_controller_test.go index 20c9cccfd..c3e24e486 100644 --- a/v2/pkg/controller/mpi_job_controller_test.go +++ b/v2/pkg/controller/mpi_job_controller_test.go @@ -391,7 +391,7 @@ func filterInformerActions(actions []core.Action) []core.Action { return ret } -func (f *fixture) expectCreateJobAction(d *corev1.Pod) { +func (f *fixture) expectCreatePodAction(d *corev1.Pod) { f.kubeActions = append(f.kubeActions, core.NewCreateAction(schema.GroupVersionResource{Resource: "pods"}, d.Namespace, d)) } @@ -612,16 +612,53 @@ func TestConfigMapNotControlledByUs(t *testing.T) { f.runExpectError(getKey(mpiJob, t)) } -func TestServiceNotControlledByUs(t *testing.T) { +func TestWorkerServiceNotControlledByUs(t *testing.T) { f := newFixture(t) startTime := metav1.Now() completionTime := metav1.Now() - var replicas int32 = 64 + var replicas int32 = 2 mpiJob := newMPIJob("test", &replicas, 1, gpuResourceName, &startTime, &completionTime) f.setUpMPIJob(mpiJob) - service := newWorkersService(mpiJob) + mpiJobCopy := mpiJob.DeepCopy() + scheme.Scheme.Default(mpiJobCopy) + service := newWorkersService(mpiJobCopy) + service.OwnerReferences = nil + f.setUpService(service) + + f.runExpectError(getKey(mpiJob, t)) +} + +func TestLauncherServiceNotControlledByUs(t *testing.T) { + f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() + + var replicas int32 = 2 + mpiJob := newMPIJob("test", &replicas, 1, gpuResourceName, &startTime, &completionTime) + mpiJob.Spec.MPIImplementation = kubeflow.MPIImplementationIntel + f.setUpMPIJob(mpiJob) + + mpiJobCopy := mpiJob.DeepCopy() + scheme.Scheme.Default(mpiJobCopy) + service := newWorkersService(mpiJobCopy) + f.setUpService(service) + configMap := newConfigMap(mpiJobCopy, replicas, isGPULauncher(mpiJob)) + secret, err := newSSHAuthSecret(mpiJobCopy) + if err != nil { + t.Fatalf("Creating SSH auth Secret: %v", err) + } + f.setUpSecret(secret) + updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil, isGPULauncher(mpiJob)) + f.setUpConfigMap(configMap) + fmjc := f.newFakeMPIJobController() + for i := 0; i < int(replicas); i++ { + worker := fmjc.newWorker(mpiJobCopy, i) + f.setUpWorker(worker) + } + + service = newLauncherService(mpiJobCopy) service.OwnerReferences = nil f.setUpService(service) @@ -637,12 +674,14 @@ func TestSecretNotControlledByUs(t *testing.T) { mpiJob := newMPIJob("test", &replicas, 1, gpuResourceName, &startTime, &completionTime) f.setUpMPIJob(mpiJob) - configMap := newConfigMap(mpiJob, replicas, isGPULauncher(mpiJob)) - updateDiscoverHostsInConfigMap(configMap, mpiJob, nil, isGPULauncher(mpiJob)) + mpiJobCopy := mpiJob.DeepCopy() + scheme.Scheme.Default(mpiJobCopy) + configMap := newConfigMap(mpiJobCopy, replicas, isGPULauncher(mpiJob)) + updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil, isGPULauncher(mpiJob)) f.setUpConfigMap(configMap) - f.setUpService(newWorkersService(mpiJob)) + f.setUpService(newWorkersService(mpiJobCopy)) - secret, err := newSSHAuthSecret(mpiJob) + secret, err := newSSHAuthSecret(mpiJobCopy) if err != nil { t.Fatalf("Creating SSH auth Secret: %v", err) } @@ -707,19 +746,19 @@ func TestWorkerNotControlledByUs(t *testing.T) { mpiJob := newMPIJob("test", &replicas, 1, gpuResourceName, &startTime, &completionTime) f.setUpMPIJob(mpiJob) - configMap := newConfigMap(mpiJob, replicas, isGPULauncher(mpiJob)) - updateDiscoverHostsInConfigMap(configMap, mpiJob, nil, isGPULauncher(mpiJob)) + mpiJobCopy := mpiJob.DeepCopy() + scheme.Scheme.Default(mpiJobCopy) + configMap := newConfigMap(mpiJobCopy, replicas, isGPULauncher(mpiJob)) + updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil, isGPULauncher(mpiJob)) f.setUpConfigMap(configMap) - f.setUpService(newWorkersService(mpiJob)) - secret, err := newSSHAuthSecret(mpiJob) + f.setUpService(newWorkersService(mpiJobCopy)) + secret, err := newSSHAuthSecret(mpiJobCopy) if err != nil { t.Fatalf("Creating SSH auth secret: %v", err) } f.setUpSecret(secret) fmjc := f.newFakeMPIJobController() - mpiJobCopy := mpiJob.DeepCopy() - scheme.Scheme.Default(mpiJobCopy) for i := 0; i < int(replicas); i++ { worker := fmjc.newWorker(mpiJobCopy, i) worker.OwnerReferences = nil @@ -738,19 +777,19 @@ func TestLauncherActiveWorkerNotReady(t *testing.T) { mpiJob := newMPIJob("test", &replicas, 1, gpuResourceName, &startTime, &completionTime) f.setUpMPIJob(mpiJob) - configMap := newConfigMap(mpiJob, replicas, isGPULauncher(mpiJob)) - updateDiscoverHostsInConfigMap(configMap, mpiJob, nil, isGPULauncher(mpiJob)) + mpiJobCopy := mpiJob.DeepCopy() + scheme.Scheme.Default(mpiJobCopy) + configMap := newConfigMap(mpiJobCopy, replicas, isGPULauncher(mpiJob)) + updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil, isGPULauncher(mpiJob)) f.setUpConfigMap(configMap) - f.setUpService(newWorkersService(mpiJob)) - secret, err := newSSHAuthSecret(mpiJob) + f.setUpService(newWorkersService(mpiJobCopy)) + secret, err := newSSHAuthSecret(mpiJobCopy) if err != nil { t.Fatalf("Creating SSH auth secret: %v", err) } f.setUpSecret(secret) fmjc := f.newFakeMPIJobController() - mpiJobCopy := mpiJob.DeepCopy() - scheme.Scheme.Default(mpiJobCopy) launcher := fmjc.newLauncher(mpiJobCopy, isGPULauncher(mpiJobCopy)) launcher.Status.Phase = corev1.PodRunning f.setUpLauncher(launcher) @@ -788,16 +827,17 @@ func TestLauncherActiveWorkerReady(t *testing.T) { var replicas int32 = 8 mpiJob := newMPIJob("test", &replicas, 1, gpuResourceName, &startTime, &completionTime) f.setUpMPIJob(mpiJob) - f.setUpService(newWorkersService(mpiJob)) - secret, err := newSSHAuthSecret(mpiJob) + + mpiJobCopy := mpiJob.DeepCopy() + scheme.Scheme.Default(mpiJobCopy) + f.setUpService(newWorkersService(mpiJobCopy)) + secret, err := newSSHAuthSecret(mpiJobCopy) if err != nil { t.Fatalf("Creating SSH auth secret: %v", err) } f.setUpSecret(secret) fmjc := f.newFakeMPIJobController() - mpiJobCopy := mpiJob.DeepCopy() - scheme.Scheme.Default(mpiJobCopy) launcher := fmjc.newLauncher(mpiJobCopy, isGPULauncher(mpiJobCopy)) launcher.Status.Phase = corev1.PodRunning f.setUpLauncher(launcher) @@ -810,8 +850,8 @@ func TestLauncherActiveWorkerReady(t *testing.T) { f.setUpWorker(worker) } - configMap := newConfigMap(mpiJob, replicas, isGPULauncher(mpiJob)) - updateDiscoverHostsInConfigMap(configMap, mpiJob, runningPodList, isGPULauncher(mpiJob)) + configMap := newConfigMap(mpiJobCopy, replicas, isGPULauncher(mpiJobCopy)) + updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, runningPodList, isGPULauncher(mpiJobCopy)) f.setUpConfigMap(configMap) mpiJobCopy.Status.ReplicaStatuses = map[common.ReplicaType]*common.ReplicaStatus{ @@ -847,16 +887,17 @@ func TestWorkerReady(t *testing.T) { var replicas int32 = 16 mpiJob := newMPIJob("test", &replicas, 1, gpuResourceName, &startTime, &completionTime) f.setUpMPIJob(mpiJob) - f.setUpService(newWorkersService(mpiJob)) - secret, err := newSSHAuthSecret(mpiJob) + + mpiJobCopy := mpiJob.DeepCopy() + scheme.Scheme.Default(mpiJobCopy) + f.setUpService(newWorkersService(mpiJobCopy)) + secret, err := newSSHAuthSecret(mpiJobCopy) if err != nil { t.Fatalf("Creating SSH auth secret: %v", err) } f.setUpSecret(secret) fmjc := f.newFakeMPIJobController() - mpiJobCopy := mpiJob.DeepCopy() - scheme.Scheme.Default(mpiJobCopy) var runningPodList []*corev1.Pod for i := 0; i < 16; i++ { @@ -866,12 +907,12 @@ func TestWorkerReady(t *testing.T) { f.setUpWorker(worker) } - configMap := newConfigMap(mpiJob, replicas, isGPULauncher(mpiJob)) - updateDiscoverHostsInConfigMap(configMap, mpiJob, runningPodList, isGPULauncher(mpiJob)) + configMap := newConfigMap(mpiJobCopy, replicas, isGPULauncher(mpiJobCopy)) + updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, runningPodList, isGPULauncher(mpiJobCopy)) f.setUpConfigMap(configMap) - expLauncher := fmjc.newLauncher(mpiJobCopy, isGPULauncher(mpiJob)) - f.expectCreateJobAction(expLauncher) + expLauncher := fmjc.newLauncher(mpiJobCopy, isGPULauncher(mpiJobCopy)) + f.expectCreatePodAction(expLauncher) mpiJobCopy.Status.ReplicaStatuses = map[common.ReplicaType]*common.ReplicaStatus{ common.ReplicaType(kubeflow.MPIReplicaTypeLauncher): { @@ -941,7 +982,11 @@ func TestNewLauncherAndWorker(t *testing.T) { RestartPolicy: corev1.RestartPolicyNever, Containers: []corev1.Container{ { - Env: appendEnvVars(nil, ompiEnvVars, nvidiaDisableEnvVars), + Env: joinEnvVars( + launcherEnvVars, + ompiEnvVars, + corev1.EnvVar{Name: openMPISlotsEnv, Value: "1"}, + nvidiaDisableEnvVars), VolumeMounts: []corev1.VolumeMount{ {Name: "ssh-home", MountPath: "/root/.ssh"}, {Name: "mpi-job-config", MountPath: "/etc/mpi"}, @@ -1014,6 +1059,7 @@ func TestNewLauncherAndWorker(t *testing.T) { VolumeMounts: []corev1.VolumeMount{ {Name: "ssh-home", MountPath: "/root/.ssh"}, }, + Env: workerEnvVars, }, }, InitContainers: []corev1.Container{ @@ -1058,7 +1104,9 @@ func TestNewLauncherAndWorker(t *testing.T) { Namespace: "foo", }, Spec: kubeflow.MPIJobSpec{ - SSHAuthMountPath: "/home/mpiuser/.ssh", + SSHAuthMountPath: "/home/mpiuser/.ssh", + SlotsPerWorker: newInt32(5), + MPIImplementation: kubeflow.MPIImplementationIntel, MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { RestartPolicy: common.RestartPolicyOnFailure, @@ -1093,6 +1141,9 @@ func TestNewLauncherAndWorker(t *testing.T) { Containers: []corev1.Container{ { Command: []string{"/entrypoint.sh"}, + Env: []corev1.EnvVar{ + {Name: "FOO", Value: "bar"}, + }, }, }, }, @@ -1122,7 +1173,12 @@ func TestNewLauncherAndWorker(t *testing.T) { SecurityContext: &corev1.SecurityContext{ RunAsUser: newInt64(1000), }, - Env: appendEnvVars([]corev1.EnvVar{{Name: "FOO", Value: "bar"}}, ompiEnvVars, nvidiaDisableEnvVars), + Env: joinEnvVars( + corev1.EnvVar{Name: "FOO", Value: "bar"}, + launcherEnvVars, + intelEnvVars, + corev1.EnvVar{Name: "I_MPI_PERHOST", Value: "5"}, + nvidiaDisableEnvVars), VolumeMounts: []corev1.VolumeMount{ {Name: "fool-vol", MountPath: "/mnt/foo"}, {Name: "ssh-home", MountPath: "/home/mpiuser/.ssh"}, @@ -1198,6 +1254,7 @@ func TestNewLauncherAndWorker(t *testing.T) { VolumeMounts: []corev1.VolumeMount{ {Name: "ssh-home", MountPath: "/home/mpiuser/.ssh"}, }, + Env: joinEnvVars(corev1.EnvVar{Name: "FOO", Value: "bar"}, workerEnvVars), }, }, InitContainers: []corev1.Container{ @@ -1266,11 +1323,19 @@ func newInt64(v int64) *int64 { return &v } -func appendEnvVars(v []corev1.EnvVar, others ...[]corev1.EnvVar) []corev1.EnvVar { - for _, other := range others { - v = append(v, other...) +func joinEnvVars(evs ...interface{}) []corev1.EnvVar { + var result []corev1.EnvVar + for _, ev := range evs { + switch v := ev.(type) { + case corev1.EnvVar: + result = append(result, v) + case []corev1.EnvVar: + result = append(result, v...) + default: + panic("must by of type EnvVar or []EnvVar") + } } - return v + return result } func (f *fixture) newFakeMPIJobController() *MPIJobController {