diff --git a/examples/pi/Dockerfile b/examples/pi/Dockerfile new file mode 100644 index 000000000..9e0f9ad3e --- /dev/null +++ b/examples/pi/Dockerfile @@ -0,0 +1,22 @@ +FROM debian:buster + +RUN apt update && apt install -y --no-install-recommends \ + build-essential \ + libopenmpi-dev \ + openmpi-bin \ + openssh-server \ + openssh-client \ + && rm -rf /var/lib/apt/lists/* +# Add priviledge separation directoy to run sshd as root. +RUN mkdir -p /var/run/sshd +# Add capability to run sshd as non-root. +RUN setcap CAP_NET_BIND_SERVICE=+eip /usr/sbin/sshd + +RUN useradd -m mpiuser +WORKDIR /home/mpiuser +COPY --chown=mpiuser sshd_config .sshd_config +RUN cat /etc/ssh/ssh_config | grep -Ev 'StrictHostKeyChecking' > /etc/ssh/ssh_config.new && \ + echo " StrictHostKeyChecking no" >> /etc/ssh/ssh_config.new && \ + mv /etc/ssh/ssh_config.new /etc/ssh/ssh_config +COPY pi.cc /src/pi.cc +RUN mpic++ /src/pi.cc -o /home/mpiuser/pi \ No newline at end of file diff --git a/examples/pi/README.md b/examples/pi/README.md new file mode 100644 index 000000000..ad36a656d --- /dev/null +++ b/examples/pi/README.md @@ -0,0 +1,24 @@ +# Pure MPI example + +This example shows to run a pure MPI application. + +The program prints some basic information about the workers. +Then, it calculates an approximate value for pi. + +## How to build Image + +```bash +docker build -t mpi-pi . +``` + +## Create MPIJob + +Modify `pi.yaml` to set up the image name from your own registry. + +Then, run: + +``` +kubectl create -f pi.yaml +``` + +The YAML shows how to run the binaries as a non-root user. \ No newline at end of file diff --git a/examples/pi/pi.cc b/examples/pi/pi.cc new file mode 100644 index 000000000..eab92990f --- /dev/null +++ b/examples/pi/pi.cc @@ -0,0 +1,51 @@ +// Copyright 2021 The Kubeflow Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "mpi.h" +#include +#include + +int main(int argc, char *argv[]) { + int rank, workers, proc_name_size; + MPI_Init(&argc, &argv); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &workers); + if (rank == 0) { + printf("Workers: %d\n", workers); + } + char hostname[MPI_MAX_PROCESSOR_NAME]; + MPI_Get_processor_name(hostname, &proc_name_size); + printf("Rank %d on host %s\n", rank, hostname); + + std::minstd_rand generator(rank); + std::uniform_real_distribution distribution(-1.0, 1.0); + double x, y; + long long worker_count = 0; + int worker_tests = 100000000; + for (int i = 0; i < worker_tests; i++) { + x = distribution(generator); + y = distribution(generator); + if (x * x + y * y <= 1.0) { + worker_count++; + } + } + long long total_count = 0; + MPI_Reduce(&worker_count, &total_count, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD); + if (rank == 0) { + double pi = 4 * (double)total_count / (double)(worker_tests) / (double)(workers); + printf("pi is approximately %.16lf\n", pi); + } + MPI_Finalize(); + return 0; +} diff --git a/examples/pi/pi.yaml b/examples/pi/pi.yaml new file mode 100644 index 000000000..becc10259 --- /dev/null +++ b/examples/pi/pi.yaml @@ -0,0 +1,50 @@ +apiVersion: kubeflow.org/v2 +kind: MPIJob +metadata: + name: pi +spec: + slotsPerWorker: 1 + cleanPodPolicy: Running + sshAuthMountPath: /home/mpiuser/.ssh + mpiReplicaSpecs: + Launcher: + replicas: 1 + template: + spec: + containers: + - image: docker.io/kubeflow/mpi-pi + name: mpi-launcher + securityContext: + runAsUser: 1000 + command: + - mpirun + args: + - -np + - "2" + - /home/mpiuser/pi + resources: + limits: + cpu: 1 + memory: 2Gi + Worker: + replicas: 2 + template: + spec: + containers: + - image: docker.io/kubeflow/mpi-pi + name: mpi-worker + securityContext: + runAsUser: 1000 + capabilities: + add: + - NET_BIND_SERVICE + command: + - /usr/sbin/sshd + args: + - -De + - -f + - /home/mpiuser/.sshd_config + resources: + limits: + cpu: 2 + memory: 4Gi diff --git a/examples/pi/sshd_config b/examples/pi/sshd_config new file mode 100644 index 000000000..de843f229 --- /dev/null +++ b/examples/pi/sshd_config @@ -0,0 +1,2 @@ +PidFile /home/mpiuser/sshd.pid +HostKey /home/mpiuser/.ssh/id_rsa diff --git a/manifests/base/crd.yaml b/manifests/base/crd.yaml index 3f59b36ab..865f04358 100644 --- a/manifests/base/crd.yaml +++ b/manifests/base/crd.yaml @@ -104,6 +104,8 @@ spec: type: string enum: ["None", "Running", "All"] description: "Defines which Pods must be deleted after the Job completes" + sshAuthMountPath: + type: string mpiReplicaSpecs: type: object properties: diff --git a/v2/pkg/apis/kubeflow/v2beta1/default.go b/v2/pkg/apis/kubeflow/v2beta1/default.go index 659314dd1..3ad1e035f 100644 --- a/v2/pkg/apis/kubeflow/v2beta1/default.go +++ b/v2/pkg/apis/kubeflow/v2beta1/default.go @@ -56,6 +56,9 @@ func SetDefaults_MPIJob(mpiJob *MPIJob) { if mpiJob.Spec.SlotsPerWorker == nil { mpiJob.Spec.SlotsPerWorker = newInt32(1) } + if mpiJob.Spec.SSHAuthMountPath == "" { + mpiJob.Spec.SSHAuthMountPath = "/root/.ssh" + } // set default to Launcher setDefaultsTypeLauncher(mpiJob.Spec.MPIReplicaSpecs[MPIReplicaTypeLauncher]) diff --git a/v2/pkg/apis/kubeflow/v2beta1/default_test.go b/v2/pkg/apis/kubeflow/v2beta1/default_test.go index 0f45287b5..ac746c51f 100644 --- a/v2/pkg/apis/kubeflow/v2beta1/default_test.go +++ b/v2/pkg/apis/kubeflow/v2beta1/default_test.go @@ -29,22 +29,25 @@ func TestSetDefaults_MPIJob(t *testing.T) { "base defaults": { want: MPIJob{ Spec: MPIJobSpec{ - SlotsPerWorker: newInt32(1), - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone), + SlotsPerWorker: newInt32(1), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone), + SSHAuthMountPath: "/root/.ssh", }, }, }, "base defaults overridden": { job: MPIJob{ Spec: MPIJobSpec{ - SlotsPerWorker: newInt32(10), - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + SlotsPerWorker: newInt32(10), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + SSHAuthMountPath: "/home/mpiuser/.ssh", }, }, want: MPIJob{ Spec: MPIJobSpec{ - SlotsPerWorker: newInt32(10), - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + SlotsPerWorker: newInt32(10), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + SSHAuthMountPath: "/home/mpiuser/.ssh", }, }, }, @@ -58,8 +61,9 @@ func TestSetDefaults_MPIJob(t *testing.T) { }, want: MPIJob{ Spec: MPIJobSpec{ - SlotsPerWorker: newInt32(1), - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone), + SlotsPerWorker: newInt32(1), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone), + SSHAuthMountPath: "/root/.ssh", MPIReplicaSpecs: map[MPIReplicaType]*common.ReplicaSpec{ MPIReplicaTypeLauncher: { Replicas: newInt32(1), @@ -79,8 +83,9 @@ func TestSetDefaults_MPIJob(t *testing.T) { }, want: MPIJob{ Spec: MPIJobSpec{ - SlotsPerWorker: newInt32(1), - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone), + SlotsPerWorker: newInt32(1), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone), + SSHAuthMountPath: "/root/.ssh", MPIReplicaSpecs: map[MPIReplicaType]*common.ReplicaSpec{ MPIReplicaTypeWorker: { Replicas: newInt32(0), diff --git a/v2/pkg/apis/kubeflow/v2beta1/types.go b/v2/pkg/apis/kubeflow/v2beta1/types.go index 305e5f66f..27f100e53 100644 --- a/v2/pkg/apis/kubeflow/v2beta1/types.go +++ b/v2/pkg/apis/kubeflow/v2beta1/types.go @@ -48,9 +48,13 @@ type MPIJobSpec struct { // Defaults to None. CleanPodPolicy *common.CleanPodPolicy `json:"cleanPodPolicy,omitempty"` - // `MPIReplicaSpecs` contains maps from `MPIReplicaType` to `ReplicaSpec` that + // MPIReplicaSpecs contains maps from `MPIReplicaType` to `ReplicaSpec` that // specify the MPI replicas to run. MPIReplicaSpecs map[MPIReplicaType]*common.ReplicaSpec `json:"mpiReplicaSpecs"` + + // SSHAuthMountPath is the directory where SSH keys are mounted. + // Defaults to "/root/.ssh". + SSHAuthMountPath string `json:"sshAuthMountPath,omitempty"` } // MPIReplicaType is the type for MPIReplica. diff --git a/v2/pkg/apis/kubeflow/validation/validation.go b/v2/pkg/apis/kubeflow/validation/validation.go index 6e27247d4..f6f564ac5 100644 --- a/v2/pkg/apis/kubeflow/validation/validation.go +++ b/v2/pkg/apis/kubeflow/validation/validation.go @@ -46,6 +46,9 @@ func validateMPIJobSpec(spec *kubeflow.MPIJobSpec, path *field.Path) field.Error } else if !validCleanPolicies.Has(string(*spec.CleanPodPolicy)) { errs = append(errs, field.NotSupported(path.Child("cleanPodPolicy"), *spec.CleanPodPolicy, validCleanPolicies.List())) } + if spec.SSHAuthMountPath == "" { + errs = append(errs, field.Required(path.Child("sshAuthMountPath"), "must have a mount path for SSH credentials")) + } return errs } diff --git a/v2/pkg/apis/kubeflow/validation/validation_test.go b/v2/pkg/apis/kubeflow/validation/validation_test.go index 5a19ad535..e675a2e05 100644 --- a/v2/pkg/apis/kubeflow/validation/validation_test.go +++ b/v2/pkg/apis/kubeflow/validation/validation_test.go @@ -33,8 +33,9 @@ func TestValidateMPIJob(t *testing.T) { "valid": { job: v2beta1.MPIJob{ Spec: v2beta1.MPIJobSpec{ - SlotsPerWorker: newInt32(2), - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + SlotsPerWorker: newInt32(2), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + SSHAuthMountPath: "/home/mpiuser/.ssh", MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{ v2beta1.MPIReplicaTypeLauncher: { Replicas: newInt32(1), @@ -51,8 +52,9 @@ func TestValidateMPIJob(t *testing.T) { "valid with worker": { job: v2beta1.MPIJob{ Spec: v2beta1.MPIJobSpec{ - SlotsPerWorker: newInt32(2), - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + SlotsPerWorker: newInt32(2), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + SSHAuthMountPath: "/home/mpiuser/.ssh", MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{ v2beta1.MPIReplicaTypeLauncher: { Replicas: newInt32(1), @@ -88,14 +90,19 @@ func TestValidateMPIJob(t *testing.T) { Type: field.ErrorTypeRequired, Field: "spec.cleanPodPolicy", }, + &field.Error{ + Type: field.ErrorTypeRequired, + Field: "spec.sshAuthMountPath", + }, }, }, "empty replica specs": { job: v2beta1.MPIJob{ Spec: v2beta1.MPIJobSpec{ - SlotsPerWorker: newInt32(2), - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), - MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{}, + SlotsPerWorker: newInt32(2), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + SSHAuthMountPath: "/root/.ssh", + MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{}, }, }, wantErrs: field.ErrorList{ @@ -108,8 +115,9 @@ func TestValidateMPIJob(t *testing.T) { "missing replica spec fields": { job: v2beta1.MPIJob{ Spec: v2beta1.MPIJobSpec{ - SlotsPerWorker: newInt32(2), - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + SlotsPerWorker: newInt32(2), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + SSHAuthMountPath: "/root/.ssh", MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{ v2beta1.MPIReplicaTypeLauncher: {}, v2beta1.MPIReplicaTypeWorker: {}, @@ -138,8 +146,9 @@ func TestValidateMPIJob(t *testing.T) { "invalid replica counts": { job: v2beta1.MPIJob{ Spec: v2beta1.MPIJobSpec{ - SlotsPerWorker: newInt32(2), - CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + SlotsPerWorker: newInt32(2), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + SSHAuthMountPath: "/root/.ssh", MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{ v2beta1.MPIReplicaTypeLauncher: { Replicas: newInt32(2), diff --git a/v2/pkg/controller/mpi_job_controller.go b/v2/pkg/controller/mpi_job_controller.go index f49e73267..ef374332f 100644 --- a/v2/pkg/controller/mpi_job_controller.go +++ b/v2/pkg/controller/mpi_job_controller.go @@ -74,21 +74,19 @@ const ( sshAuthVolume = "ssh-auth" sshAuthMountPath = "/mnt/ssh" sshHomeVolume = "ssh-home" - // TODO(alculquicondor): Make home directory configurable through the API. - sshHomeMountPath = "/root/.ssh" - launcher = "launcher" - worker = "worker" - launcherSuffix = "-launcher" - workerSuffix = "-worker" - gpuResourceNameSuffix = ".com/gpu" - gpuResourceNamePattern = "gpu" - labelGroupName = "group-name" - labelMPIJobName = "mpi-job-name" - labelMPIRoleType = "mpi-job-role" - sshPublicKey = "ssh-publickey" - sshPrivateKeyFile = "id_rsa" - sshPublicKeyFile = sshPrivateKeyFile + ".pub" - sshAuthorizedKeysFile = "authorized_keys" + launcher = "launcher" + worker = "worker" + launcherSuffix = "-launcher" + workerSuffix = "-worker" + gpuResourceNameSuffix = ".com/gpu" + gpuResourceNamePattern = "gpu" + labelGroupName = "group-name" + labelMPIJobName = "mpi-job-name" + labelMPIRoleType = "mpi-job-role" + sshPublicKey = "ssh-publickey" + sshPrivateKeyFile = "id_rsa" + sshPublicKeyFile = sshPrivateKeyFile + ".pub" + sshAuthorizedKeysFile = "authorized_keys" ) const ( @@ -105,10 +103,6 @@ const ( // validate an MPIJob. ValidationError = "ValidationError" - // MessageResourceDoesNotExist is used for Events when some - // resource is missing in yaml - MessageResourceDoesNotExist = "Resource %q is missing in yaml" - // podTemplateRestartPolicyReason is the warning reason when the restart // policy is set in pod template. podTemplateRestartPolicyReason = "SetPodTemplateRestartPolicy" @@ -135,6 +129,50 @@ var ( Name: "mpi_operator_job_info", Help: "Information about MPIJob", }, []string{"launcher", "namespace"}) + + sshVolumeItems = []corev1.KeyToPath{ + { + Key: corev1.SSHAuthPrivateKey, + Path: sshPrivateKeyFile, + }, + { + Key: sshPublicKey, + Path: sshPublicKeyFile, + }, + { + Key: sshPublicKey, + Path: sshAuthorizedKeysFile, + }, + } + configVolumeItems = []corev1.KeyToPath{ + { + Key: hostfileName, + Path: hostfileName, + Mode: newInt32(0444), + }, + { + Key: discoverHostsScriptName, + Path: discoverHostsScriptName, + Mode: newInt32(0555), + }, + } + + ompiEnvVars = []corev1.EnvVar{ + // Allows driver to reach workers through the Service. + { + Name: "OMPI_MCA_orte_keep_fqdn_hostnames", + Value: "true", + }, + { + Name: "OMPI_MCA_orte_default_hostfile", + Value: fmt.Sprintf("%s/%s", configMountPath, hostfileName), + }, + } + + nvidiaDisableEnvVars = []corev1.EnvVar{ + {Name: "NVIDIA_VISIBLE_DEVICES"}, + {Name: "NVIDIA_DRIVER_CAPABILITIES"}, + } ) // MPIJobController is the controller implementation for MPIJob resources. @@ -743,11 +781,7 @@ func keysFromData(data map[string][]byte) []string { // getOrCreateWorkerStatefulSet gets the worker StatefulSet controlled by this // MPIJob, or creates one if it doesn't exist. func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1.Pod, error) { - var ( - workerPrefix = mpiJob.Name + workerSuffix - workerPods []*corev1.Pod - i int32 = 0 - ) + var workerPods []*corev1.Pod worker := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker] if worker == nil { return workerPods, nil @@ -780,15 +814,12 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1 } } - for ; i < *worker.Replicas; i++ { - name := fmt.Sprintf("%s-%d", workerPrefix, i) - pod, err := c.podLister.Pods(mpiJob.Namespace).Get(name) + for i := 0; i < int(*worker.Replicas); i++ { + pod, err := c.podLister.Pods(mpiJob.Namespace).Get(workerName(mpiJob, i)) // If the worker Pod doesn't exist, we'll create it. if errors.IsNotFound(err) { - worker := newWorker(mpiJob, name, c.gangSchedulerName) - // Insert ReplicaIndexLabel - worker.Labels[common.ReplicaIndexLabel] = strconv.Itoa(int(i)) + worker := newWorker(mpiJob, i, c.gangSchedulerName) pod, err = c.kubeClient.CoreV1().Pods(mpiJob.Namespace).Create(context.TODO(), worker, metav1.CreateOptions{}) } // If an error occurs during Get/Create, we'll requeue the item so we @@ -1167,11 +1198,15 @@ func newPodGroup(mpiJob *kubeflow.MPIJob, minAvailableReplicas int32) *podgroupv } } +func workerName(mpiJob *kubeflow.MPIJob, index int) string { + return fmt.Sprintf("%s%s-%d", mpiJob.Name, workerSuffix, index) +} + // newWorker creates a new worker StatefulSet for an MPIJob resource. It also // sets the appropriate OwnerReferences on the resource so handleObject can // discover the MPIJob resource that 'owns' it. -func newWorker(mpiJob *kubeflow.MPIJob, name, gangSchedulerName string) *corev1.Pod { - defaultLabels := defaultWorkerLabels(mpiJob.Name) +func newWorker(mpiJob *kubeflow.MPIJob, index int, gangSchedulerName string) *corev1.Pod { + name := workerName(mpiJob, index) podTemplate := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Template.DeepCopy() @@ -1179,10 +1214,10 @@ func newWorker(mpiJob *kubeflow.MPIJob, name, gangSchedulerName string) *corev1. if len(podTemplate.Labels) == 0 { podTemplate.Labels = make(map[string]string) } - - for key, value := range defaultLabels { + for key, value := range defaultWorkerLabels(mpiJob.Name) { podTemplate.Labels[key] = value } + podTemplate.Labels[common.ReplicaIndexLabel] = strconv.Itoa(index) podTemplate.Spec.Hostname = name podTemplate.Spec.Subdomain = mpiJob.Name + workerSuffix // Matches workers' Service name. setRestartPolicy(podTemplate, mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker]) @@ -1191,11 +1226,7 @@ func newWorker(mpiJob *kubeflow.MPIJob, name, gangSchedulerName string) *corev1. if len(container.Command) == 0 && len(container.Args) == 0 { container.Command = []string{"/usr/sbin/sshd", "-De"} } - - sshVolume, sshVolumeMount := podSSHAuthVolume(mpiJob.Name) - podTemplate.Spec.Volumes = append(podTemplate.Spec.Volumes, sshVolume...) - container.VolumeMounts = append(container.VolumeMounts, sshVolumeMount...) - podTemplate.Spec.InitContainers = append(podTemplate.Spec.InitContainers, sshInitContainer(sshVolumeMount)) + setupSSHOnPod(&podTemplate.Spec, mpiJob) // add SchedulerName to podSpec if gangSchedulerName != "" { @@ -1236,108 +1267,89 @@ func (c *MPIJobController) newLauncher(mpiJob *kubeflow.MPIJob, isGPULauncher bo labelMPIRoleType: launcher, } - podSpec := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.DeepCopy() + podTemplate := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.DeepCopy() // copy the labels and annotations to pod from PodTemplate - if len(podSpec.Labels) == 0 { - podSpec.Labels = make(map[string]string) + if len(podTemplate.Labels) == 0 { + podTemplate.Labels = make(map[string]string) } for key, value := range defaultLabels { - podSpec.Labels[key] = value + podTemplate.Labels[key] = value } // add SchedulerName to podSpec if c.gangSchedulerName != "" { - if podSpec.Spec.SchedulerName != "" && podSpec.Spec.SchedulerName != c.gangSchedulerName { - klog.Warningf("%s scheduler is specified when gang-scheduling is enabled and it will be overwritten", podSpec.Spec.SchedulerName) + if podTemplate.Spec.SchedulerName != "" && podTemplate.Spec.SchedulerName != c.gangSchedulerName { + klog.Warningf("%s scheduler is specified when gang-scheduling is enabled and it will be overwritten", podTemplate.Spec.SchedulerName) } - podSpec.Spec.SchedulerName = c.gangSchedulerName + podTemplate.Spec.SchedulerName = c.gangSchedulerName - if podSpec.Annotations == nil { - podSpec.Annotations = map[string]string{} + if podTemplate.Annotations == nil { + podTemplate.Annotations = map[string]string{} } // we create the podGroup with the same name as the mpijob - podSpec.Annotations[podgroupv1beta1.KubeGroupNameAnnotationKey] = mpiJob.Name + podTemplate.Annotations[podgroupv1beta1.KubeGroupNameAnnotationKey] = mpiJob.Name } - podSpec.Spec.Hostname = launcherName - podSpec.Spec.Subdomain = mpiJob.Name + workerSuffix // Matches workers' Service name. - container := &podSpec.Spec.Containers[0] - container.Env = append(container.Env, - // Allows driver to reach workers through the Service. - corev1.EnvVar{ - Name: "OMPI_MCA_orte_keep_fqdn_hostnames", - Value: "true", - }, - corev1.EnvVar{ - Name: "OMPI_MCA_orte_default_hostfile", - Value: fmt.Sprintf("%s/%s", configMountPath, hostfileName), - }, - ) + podTemplate.Spec.Hostname = launcherName + podTemplate.Spec.Subdomain = mpiJob.Name + workerSuffix // Matches workers' Service name. + container := &podTemplate.Spec.Containers[0] + container.Env = append(container.Env, ompiEnvVars...) if !isGPULauncher { container.Env = append(container.Env, // We overwrite these environment variables so that users will not // be mistakenly using GPU resources for launcher due to potential // issues with scheduler/container technologies. - corev1.EnvVar{ - Name: "NVIDIA_VISIBLE_DEVICES", - Value: "", - }, - corev1.EnvVar{ - Name: "NVIDIA_DRIVER_CAPABILITIES", - Value: "", - }) + nvidiaDisableEnvVars...) } - sshVolume, sshVolumeMount := podSSHAuthVolume(mpiJob.Name) - - container.VolumeMounts = append(container.VolumeMounts, sshVolumeMount...) - container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{ - Name: configVolumeName, - MountPath: configMountPath, - }) - podSpec.Spec.InitContainers = append(podSpec.Spec.InitContainers, sshInitContainer(sshVolumeMount)) + setupSSHOnPod(&podTemplate.Spec, mpiJob) // Submit a warning event if the user specifies restart policy for // the pod template. We recommend to set it from the replica level. - if podSpec.Spec.RestartPolicy != "" { + if podTemplate.Spec.RestartPolicy != "" { errMsg := "Restart policy in pod template overridden by restart policy in replica spec" klog.Warning(errMsg) c.recorder.Event(mpiJob, corev1.EventTypeWarning, podTemplateRestartPolicyReason, errMsg) } - setRestartPolicy(podSpec, mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher]) + setRestartPolicy(podTemplate, mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher]) - podSpec.Spec.Volumes = append(podSpec.Spec.Volumes, sshVolume...) - podSpec.Spec.Volumes = append(podSpec.Spec.Volumes, corev1.Volume{ - Name: configVolumeName, - VolumeSource: corev1.VolumeSource{ - ConfigMap: &corev1.ConfigMapVolumeSource{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: mpiJob.Name + configSuffix, - }, - Items: []corev1.KeyToPath{ - { - Key: hostfileName, - Path: hostfileName, - Mode: newInt32(0444), + podTemplate.Spec.Volumes = append(podTemplate.Spec.Volumes, + corev1.Volume{ + Name: configVolumeName, + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: mpiJob.Name + configSuffix, }, - { - Key: discoverHostsScriptName, - Path: discoverHostsScriptName, - Mode: newInt32(0555), + Items: []corev1.KeyToPath{ + { + Key: hostfileName, + Path: hostfileName, + Mode: newInt32(0444), + }, + { + Key: discoverHostsScriptName, + Path: discoverHostsScriptName, + Mode: newInt32(0555), + }, }, }, }, - }, + }) + container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{ + Name: configVolumeName, + MountPath: configMountPath, }) + return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: launcherName, Namespace: mpiJob.Namespace, - Labels: podSpec.Labels, - Annotations: podSpec.Annotations, + Labels: podTemplate.Labels, + Annotations: podTemplate.Annotations, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(mpiJob, kubeflow.SchemeGroupVersionKind), }, }, - Spec: podSpec.Spec, + Spec: podTemplate.Spec, } } @@ -1412,61 +1424,58 @@ func workerReplicas(job *kubeflow.MPIJob) int32 { return 0 } -func podSSHAuthVolume(jobName string) ([]corev1.Volume, []corev1.VolumeMount) { - return []corev1.Volume{ - { - Name: sshAuthVolume, - VolumeSource: corev1.VolumeSource{ - Secret: &corev1.SecretVolumeSource{ - SecretName: jobName + sshAuthSecretSuffix, - DefaultMode: newInt32(0660), - Items: []corev1.KeyToPath{ - { - Key: corev1.SSHAuthPrivateKey, - Path: sshPrivateKeyFile, - }, - { - Key: sshPublicKey, - Path: sshPublicKeyFile, - }, - { - Key: sshPublicKey, - Path: sshAuthorizedKeysFile, - }, - }, - }, +func setupSSHOnPod(podSpec *corev1.PodSpec, job *kubeflow.MPIJob) { + podSpec.Volumes = append(podSpec.Volumes, + corev1.Volume{ + Name: sshAuthVolume, + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: job.Name + sshAuthSecretSuffix, + Items: sshVolumeItems, }, }, - { - Name: sshHomeVolume, - VolumeSource: corev1.VolumeSource{ - EmptyDir: &corev1.EmptyDirVolumeSource{}, - }, + }, + corev1.Volume{ + Name: sshHomeVolume, + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, }, - }, []corev1.VolumeMount{ + }) + + mainContainer := &podSpec.Containers[0] + mainContainer.VolumeMounts = append(mainContainer.VolumeMounts, + corev1.VolumeMount{ + Name: sshHomeVolume, + MountPath: job.Spec.SSHAuthMountPath, + }) + + // The init script sets the permissions of the ssh folder in the user's home + // directory. The ownership is set based on the security context of the + // launcher's first container. + launcherSecurityCtx := job.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers[0].SecurityContext + initScript := "" + + "cp -RL /mnt/ssh/* /mnt/home-ssh && " + + "chmod 700 /mnt/home-ssh && " + + "chmod 600 /mnt/home-ssh/*" + if launcherSecurityCtx != nil && launcherSecurityCtx.RunAsUser != nil { + initScript += fmt.Sprintf(" && chown %d -R /mnt/home-ssh", *launcherSecurityCtx.RunAsUser) + } + podSpec.InitContainers = append(podSpec.InitContainers, corev1.Container{ + Name: "init-ssh", + Image: "alpine:3.14", + VolumeMounts: []corev1.VolumeMount{ { Name: sshAuthVolume, MountPath: sshAuthMountPath, - }, { + }, + { Name: sshHomeVolume, - MountPath: sshHomeMountPath, + MountPath: "/mnt/home-ssh", }, - } -} - -func sshInitContainer(mounts []corev1.VolumeMount) corev1.Container { - return corev1.Container{ - Name: "init-ssh", - Image: "alpine:3.14", - VolumeMounts: mounts, - Command: []string{ - "/bin/sh", - "-c", - "" + - "cp -RL /mnt/ssh/* /root/.ssh &&" + - "chmod 600 -R /root/.ssh", }, - } + Command: []string{"/bin/sh"}, + Args: []string{"-c", initScript}, + }) } func newInt32(v int32) *int32 { diff --git a/v2/pkg/controller/mpi_job_controller_test.go b/v2/pkg/controller/mpi_job_controller_test.go index 0c4c83a9e..54fecd262 100644 --- a/v2/pkg/controller/mpi_job_controller_test.go +++ b/v2/pkg/controller/mpi_job_controller_test.go @@ -20,6 +20,8 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -156,7 +158,7 @@ func newMPIJob(name string, replicas *int32, pusPerReplica int64, resourceName s func newMPIJobWithLauncher(name string, replicas *int32, pusPerReplica int64, resourceName string, startTime, completionTime *metav1.Time) *kubeflow.MPIJob { mpiJob := newMPIJob(name, replicas, pusPerReplica, resourceName, startTime, completionTime) - mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Replicas = int32Ptr(1) + mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Replicas = newInt32(1) launcherContainers := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers for i := range launcherContainers { @@ -465,7 +467,7 @@ func TestDoNothingWithNonexistentMPIJob(t *testing.T) { f := newFixture(t) startTime := metav1.Now() completionTime := metav1.Now() - mpiJob := newMPIJob("test", int32Ptr(64), 1, gpuResourceName, &startTime, &completionTime) + mpiJob := newMPIJob("test", newInt32(64), 1, gpuResourceName, &startTime, &completionTime) f.run(getKey(mpiJob, t)) } @@ -474,11 +476,13 @@ func TestLauncherNotControlledByUs(t *testing.T) { startTime := metav1.Now() completionTime := metav1.Now() - mpiJob := newMPIJob("test", int32Ptr(64), 1, gpuResourceName, &startTime, &completionTime) + mpiJob := newMPIJob("test", newInt32(64), 1, gpuResourceName, &startTime, &completionTime) f.setUpMPIJob(mpiJob) fmjc := f.newFakeMPIJobController() - launcher := fmjc.newLauncher(mpiJob, isGPULauncher(mpiJob)) + mpiJobCopy := mpiJob.DeepCopy() + scheme.Scheme.Default(mpiJobCopy) + launcher := fmjc.newLauncher(mpiJobCopy, isGPULauncher(mpiJobCopy)) launcher.OwnerReferences = nil f.setUpLauncher(launcher) @@ -509,7 +513,7 @@ func TestIsGPULauncher(t *testing.T) { }, } for testName, testCase := range testCases { - mpiJob := newMPIJobWithLauncher("test", int32Ptr(64), 1, testCase.gpu, &startTime, &completionTime) + mpiJob := newMPIJobWithLauncher("test", newInt32(64), 1, testCase.gpu, &startTime, &completionTime) f.setUpMPIJob(mpiJob) if result := isGPULauncher(mpiJob); result != testCase.expected { t.Errorf("%s expected: %v, actual: %v, gpu=%v", testName, testCase.expected, result, testCase.gpu) @@ -523,16 +527,16 @@ func TestLauncherSucceeded(t *testing.T) { startTime := metav1.Now() completionTime := metav1.Now() - mpiJob := newMPIJob("test", int32Ptr(64), 1, gpuResourceName, &startTime, &completionTime) + mpiJob := newMPIJob("test", newInt32(64), 1, gpuResourceName, &startTime, &completionTime) f.setUpMPIJob(mpiJob) fmjc := f.newFakeMPIJobController() - launcher := fmjc.newLauncher(mpiJob, isGPULauncher(mpiJob)) + mpiJobCopy := mpiJob.DeepCopy() + scheme.Scheme.Default(mpiJobCopy) + launcher := fmjc.newLauncher(mpiJobCopy, isGPULauncher(mpiJobCopy)) launcher.Status.Phase = corev1.PodSucceeded f.setUpLauncher(launcher) - mpiJobCopy := mpiJob.DeepCopy() - scheme.Scheme.Default(mpiJobCopy) mpiJobCopy.Status.ReplicaStatuses = map[common.ReplicaType]*common.ReplicaStatus{ common.ReplicaType(kubeflow.MPIReplicaTypeLauncher): { Active: 0, @@ -558,16 +562,16 @@ func TestLauncherFailed(t *testing.T) { startTime := metav1.Now() completionTime := metav1.Now() - mpiJob := newMPIJob("test", int32Ptr(64), 1, gpuResourceName, &startTime, &completionTime) + mpiJob := newMPIJob("test", newInt32(64), 1, gpuResourceName, &startTime, &completionTime) f.setUpMPIJob(mpiJob) fmjc := f.newFakeMPIJobController() - launcher := fmjc.newLauncher(mpiJob, isGPULauncher(mpiJob)) + mpiJobCopy := mpiJob.DeepCopy() + scheme.Scheme.Default(mpiJobCopy) + launcher := fmjc.newLauncher(mpiJobCopy, isGPULauncher(mpiJobCopy)) launcher.Status.Phase = corev1.PodFailed f.setUpLauncher(launcher) - mpiJobCopy := mpiJob.DeepCopy() - scheme.Scheme.Default(mpiJobCopy) mpiJobCopy.Status.ReplicaStatuses = map[common.ReplicaType]*common.ReplicaStatus{ common.ReplicaType(kubeflow.MPIReplicaTypeLauncher): { Active: 0, @@ -658,13 +662,14 @@ func TestShutdownWorker(t *testing.T) { f.setUpMPIJob(mpiJob) fmjc := f.newFakeMPIJobController() - launcher := fmjc.newLauncher(mpiJob, isGPULauncher(mpiJob)) + mpiJobCopy := mpiJob.DeepCopy() + scheme.Scheme.Default(mpiJobCopy) + launcher := fmjc.newLauncher(mpiJobCopy, isGPULauncher(mpiJobCopy)) launcher.Status.Phase = corev1.PodSucceeded f.setUpLauncher(launcher) for i := 0; i < int(replicas); i++ { - name := fmt.Sprintf("%s-%d", mpiJob.Name+workerSuffix, i) - worker := newWorker(mpiJob, name, "") + worker := newWorker(mpiJobCopy, i, "") f.setUpWorker(worker) } @@ -678,8 +683,6 @@ func TestShutdownWorker(t *testing.T) { f.kubeActions = append(f.kubeActions, core.NewDeleteAction(schema.GroupVersionResource{Resource: "pods"}, mpiJob.Namespace, name)) } - mpiJobCopy := mpiJob.DeepCopy() - scheme.Scheme.Default(mpiJobCopy) mpiJobCopy.Status.ReplicaStatuses = map[common.ReplicaType]*common.ReplicaStatus{ common.ReplicaType(kubeflow.MPIReplicaTypeWorker): { Active: 0, @@ -712,9 +715,10 @@ func TestWorkerNotControlledByUs(t *testing.T) { } f.setUpSecret(secret) + mpiJobCopy := mpiJob.DeepCopy() + scheme.Scheme.Default(mpiJobCopy) for i := 0; i < int(replicas); i++ { - name := fmt.Sprintf("%s-%d", mpiJob.Name+workerSuffix, i) - worker := newWorker(mpiJob, name, "") + worker := newWorker(mpiJobCopy, i, "") worker.OwnerReferences = nil f.setUpWorker(worker) } @@ -742,18 +746,17 @@ func TestLauncherActiveWorkerNotReady(t *testing.T) { f.setUpSecret(secret) fmjc := f.newFakeMPIJobController() - launcher := fmjc.newLauncher(mpiJob, isGPULauncher(mpiJob)) + mpiJobCopy := mpiJob.DeepCopy() + scheme.Scheme.Default(mpiJobCopy) + launcher := fmjc.newLauncher(mpiJobCopy, isGPULauncher(mpiJobCopy)) launcher.Status.Phase = corev1.PodRunning f.setUpLauncher(launcher) for i := 0; i < int(replicas); i++ { - name := fmt.Sprintf("%s-%d", mpiJob.Name+workerSuffix, i) - worker := newWorker(mpiJob, name, "") + worker := newWorker(mpiJobCopy, i, "") worker.Status.Phase = corev1.PodPending f.setUpWorker(worker) } - mpiJobCopy := mpiJob.DeepCopy() - scheme.Scheme.Default(mpiJobCopy) msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name) updateMPIJobConditions(mpiJobCopy, common.JobCreated, mpiJobCreatedReason, msg) mpiJobCopy.Status.ReplicaStatuses = map[common.ReplicaType]*common.ReplicaStatus{ @@ -790,14 +793,15 @@ func TestLauncherActiveWorkerReady(t *testing.T) { f.setUpSecret(secret) fmjc := f.newFakeMPIJobController() - launcher := fmjc.newLauncher(mpiJob, isGPULauncher(mpiJob)) + mpiJobCopy := mpiJob.DeepCopy() + scheme.Scheme.Default(mpiJobCopy) + launcher := fmjc.newLauncher(mpiJobCopy, isGPULauncher(mpiJobCopy)) launcher.Status.Phase = corev1.PodRunning f.setUpLauncher(launcher) var runningPodList []*corev1.Pod for i := 0; i < int(replicas); i++ { - name := fmt.Sprintf("%s-%d", mpiJob.Name+workerSuffix, i) - worker := newWorker(mpiJob, name, "") + worker := newWorker(mpiJobCopy, i, "") worker.Status.Phase = corev1.PodRunning runningPodList = append(runningPodList, worker) f.setUpWorker(worker) @@ -807,8 +811,6 @@ func TestLauncherActiveWorkerReady(t *testing.T) { updateDiscoverHostsInConfigMap(configMap, mpiJob, runningPodList, isGPULauncher(mpiJob)) f.setUpConfigMap(configMap) - mpiJobCopy := mpiJob.DeepCopy() - scheme.Scheme.Default(mpiJobCopy) mpiJobCopy.Status.ReplicaStatuses = map[common.ReplicaType]*common.ReplicaStatus{ common.ReplicaType(kubeflow.MPIReplicaTypeLauncher): { Active: 1, @@ -849,10 +851,12 @@ func TestWorkerReady(t *testing.T) { } f.setUpSecret(secret) + mpiJobCopy := mpiJob.DeepCopy() + scheme.Scheme.Default(mpiJobCopy) + var runningPodList []*corev1.Pod for i := 0; i < 16; i++ { - name := fmt.Sprintf("%s-%d", mpiJob.Name+workerSuffix, i) - worker := newWorker(mpiJob, name, "") + worker := newWorker(mpiJobCopy, i, "") worker.Status.Phase = corev1.PodRunning runningPodList = append(runningPodList, worker) f.setUpWorker(worker) @@ -862,9 +866,6 @@ func TestWorkerReady(t *testing.T) { updateDiscoverHostsInConfigMap(configMap, mpiJob, runningPodList, isGPULauncher(mpiJob)) f.setUpConfigMap(configMap) - mpiJobCopy := mpiJob.DeepCopy() - scheme.Scheme.Default(mpiJobCopy) - fmjc := f.newFakeMPIJobController() expLauncher := fmjc.newLauncher(mpiJobCopy, isGPULauncher(mpiJob)) f.expectCreateJobAction(expLauncher) @@ -889,7 +890,386 @@ func TestWorkerReady(t *testing.T) { f.run(getKey(mpiJob, t)) } -func int32Ptr(i int32) *int32 { return &i } +func TestNewLauncherAndWorker(t *testing.T) { + cases := map[string]struct { + job kubeflow.MPIJob + workerIndex int + wantLauncher corev1.Pod + wantWorker corev1.Pod + }{ + "defaults": { + job: kubeflow.MPIJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + }, + Spec: kubeflow.MPIJobSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + kubeflow.MPIReplicaTypeLauncher: { + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{}}, + }, + }, + }, + kubeflow.MPIReplicaTypeWorker: { + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{}}, + }, + }, + }, + }, + }, + }, + wantLauncher: corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo-launcher", + Namespace: "bar", + Labels: map[string]string{ + "group-name": "kubeflow.org", + "mpi-job-name": "foo", + "mpi-job-role": "launcher", + }, + }, + Spec: corev1.PodSpec{ + Hostname: "foo-launcher", + Subdomain: "foo-worker", + RestartPolicy: corev1.RestartPolicyNever, + Containers: []corev1.Container{ + { + Env: func() []corev1.EnvVar { + var v []corev1.EnvVar + v = append(v, ompiEnvVars...) + v = append(v, nvidiaDisableEnvVars...) + return v + }(), + VolumeMounts: []corev1.VolumeMount{ + {Name: "ssh-home", MountPath: "/root/.ssh"}, + {Name: "mpi-job-config", MountPath: "/etc/mpi"}, + }, + }, + }, + InitContainers: []corev1.Container{ + { + Name: "init-ssh", + Image: "alpine:3.14", + Command: []string{"/bin/sh"}, + Args: []string{ + "-c", + "cp -RL /mnt/ssh/* /mnt/home-ssh && chmod 700 /mnt/home-ssh && chmod 600 /mnt/home-ssh/*", + }, + VolumeMounts: []corev1.VolumeMount{ + {Name: "ssh-auth", MountPath: "/mnt/ssh"}, + {Name: "ssh-home", MountPath: "/mnt/home-ssh"}, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "ssh-auth", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: "foo-ssh", + Items: sshVolumeItems, + }, + }, + }, + { + Name: "ssh-home", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + { + Name: "mpi-job-config", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "foo-config", + }, + Items: configVolumeItems, + }, + }, + }, + }, + }, + }, + wantWorker: corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo-worker-0", + Namespace: "bar", + Labels: map[string]string{ + "group-name": "kubeflow.org", + "mpi-job-name": "foo", + "mpi-job-role": "worker", + "replica-index": "0", + }, + }, + Spec: corev1.PodSpec{ + Hostname: "foo-worker-0", + Subdomain: "foo-worker", + RestartPolicy: corev1.RestartPolicyNever, + Containers: []corev1.Container{ + { + Command: []string{"/usr/sbin/sshd", "-De"}, + VolumeMounts: []corev1.VolumeMount{ + {Name: "ssh-home", MountPath: "/root/.ssh"}, + }, + }, + }, + InitContainers: []corev1.Container{ + { + Name: "init-ssh", + Image: "alpine:3.14", + Command: []string{"/bin/sh"}, + Args: []string{ + "-c", + "cp -RL /mnt/ssh/* /mnt/home-ssh && chmod 700 /mnt/home-ssh && chmod 600 /mnt/home-ssh/*", + }, + VolumeMounts: []corev1.VolumeMount{ + {Name: "ssh-auth", MountPath: "/mnt/ssh"}, + {Name: "ssh-home", MountPath: "/mnt/home-ssh"}, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "ssh-auth", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: "foo-ssh", + Items: sshVolumeItems, + }, + }, + }, + { + Name: "ssh-home", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + }, + }, + }, + "overrides": { + job: kubeflow.MPIJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bar", + Namespace: "foo", + }, + Spec: kubeflow.MPIJobSpec{ + SSHAuthMountPath: "/home/mpiuser/.ssh", + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + kubeflow.MPIReplicaTypeLauncher: { + RestartPolicy: common.RestartPolicyOnFailure, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"foo": "bar"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Env: []corev1.EnvVar{ + {Name: "FOO", Value: "bar"}, + }, + SecurityContext: &corev1.SecurityContext{ + RunAsUser: newInt64(1000), + }, + VolumeMounts: []corev1.VolumeMount{ + {Name: "fool-vol", MountPath: "/mnt/foo"}, + }, + }, + {}, + }, + Volumes: []corev1.Volume{ + {Name: "foo-vol"}, + }, + }, + }, + }, + kubeflow.MPIReplicaTypeWorker: { + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Command: []string{"/entrypoint.sh"}, + }, + }, + }, + }, + }, + }, + }, + }, + workerIndex: 12, + wantLauncher: corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bar-launcher", + Namespace: "foo", + Labels: map[string]string{ + "foo": "bar", + "group-name": "kubeflow.org", + "mpi-job-name": "bar", + "mpi-job-role": "launcher", + }, + }, + Spec: corev1.PodSpec{ + Hostname: "bar-launcher", + Subdomain: "bar-worker", + RestartPolicy: corev1.RestartPolicyOnFailure, + Containers: []corev1.Container{ + { + SecurityContext: &corev1.SecurityContext{ + RunAsUser: newInt64(1000), + }, + Env: func() []corev1.EnvVar { + v := []corev1.EnvVar{{Name: "FOO", Value: "bar"}} + v = append(v, ompiEnvVars...) + v = append(v, nvidiaDisableEnvVars...) + return v + }(), + VolumeMounts: []corev1.VolumeMount{ + {Name: "fool-vol", MountPath: "/mnt/foo"}, + {Name: "ssh-home", MountPath: "/home/mpiuser/.ssh"}, + {Name: "mpi-job-config", MountPath: "/etc/mpi"}, + }, + }, + {}, + }, + InitContainers: []corev1.Container{ + { + Name: "init-ssh", + Image: "alpine:3.14", + Command: []string{"/bin/sh"}, + Args: []string{ + "-c", + "cp -RL /mnt/ssh/* /mnt/home-ssh && chmod 700 /mnt/home-ssh && chmod 600 /mnt/home-ssh/* && chown 1000 -R /mnt/home-ssh", + }, + VolumeMounts: []corev1.VolumeMount{ + {Name: "ssh-auth", MountPath: "/mnt/ssh"}, + {Name: "ssh-home", MountPath: "/mnt/home-ssh"}, + }, + }, + }, + Volumes: []corev1.Volume{ + {Name: "foo-vol"}, + { + Name: "ssh-auth", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: "bar-ssh", + Items: sshVolumeItems, + }, + }, + }, + { + Name: "ssh-home", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + { + Name: "mpi-job-config", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "bar-config", + }, + Items: configVolumeItems, + }, + }, + }, + }, + }, + }, + wantWorker: corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bar-worker-12", + Namespace: "foo", + Labels: map[string]string{ + "group-name": "kubeflow.org", + "mpi-job-name": "bar", + "mpi-job-role": "worker", + "replica-index": "12", + }, + }, + Spec: corev1.PodSpec{ + Hostname: "bar-worker-12", + Subdomain: "bar-worker", + RestartPolicy: corev1.RestartPolicyNever, + Containers: []corev1.Container{ + { + Command: []string{"/entrypoint.sh"}, + VolumeMounts: []corev1.VolumeMount{ + {Name: "ssh-home", MountPath: "/home/mpiuser/.ssh"}, + }, + }, + }, + InitContainers: []corev1.Container{ + { + Name: "init-ssh", + Image: "alpine:3.14", + Command: []string{"/bin/sh"}, + Args: []string{ + "-c", + "cp -RL /mnt/ssh/* /mnt/home-ssh && chmod 700 /mnt/home-ssh && chmod 600 /mnt/home-ssh/* && chown 1000 -R /mnt/home-ssh", + }, + VolumeMounts: []corev1.VolumeMount{ + {Name: "ssh-auth", MountPath: "/mnt/ssh"}, + {Name: "ssh-home", MountPath: "/mnt/home-ssh"}, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "ssh-auth", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: "bar-ssh", + Items: sshVolumeItems, + }, + }, + }, + { + Name: "ssh-home", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + }, + }, + }, + } + ignoreReferences := cmpopts.IgnoreFields(metav1.ObjectMeta{}, "OwnerReferences") + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + job := tc.job.DeepCopy() + scheme.Scheme.Default(job) + ctrl := &MPIJobController{} + launcher := ctrl.newLauncher(job, isGPULauncher(job)) + if !metav1.IsControlledBy(launcher, job) { + t.Errorf("Created launcher Pod is not controlled by Job") + } + if diff := cmp.Diff(&tc.wantLauncher, launcher, ignoreReferences); diff != "" { + t.Errorf("Unexpected launcher pod (-want,+got):\n%s", diff) + } + worker := newWorker(job, tc.workerIndex, "") + if !metav1.IsControlledBy(worker, job) { + t.Errorf("Created worker Pod is not controlled by Job") + } + if diff := cmp.Diff(&tc.wantWorker, worker, ignoreReferences); diff != "" { + t.Errorf("Unexpected launcher pod (-want,+got):\n%s", diff) + } + }) + } +} + +func newInt64(v int64) *int64 { + return &v +} func (f *fixture) newFakeMPIJobController() *MPIJobController { kubeClient := k8sfake.NewSimpleClientset(f.kubeObjects...)