Skip to content

Commit

Permalink
Add slots to hostfile
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Iwai <[email protected]>
  • Loading branch information
tenzen-y committed Feb 10, 2023
1 parent efbba01 commit 8b93cd0
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 16 deletions.
6 changes: 3 additions & 3 deletions build/base/intel-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ function resolve_host() {

if [ "$K_MPI_JOB_ROLE" == "launcher" ]; then
resolve_host "$HOSTNAME"
cat /etc/mpi/hostfile | while read host
cut -d ':' -f 1 /etc/mpi/hostfile | while read -r host
do
resolve_host $host
resolve_host "$host"
done
fi

exec "$@"
exec "$@"
11 changes: 10 additions & 1 deletion pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1182,8 +1182,17 @@ func (c *MPIJobController) doUpdateJobStatus(mpiJob *kubeflow.MPIJob) error {
func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32) *corev1.ConfigMap {
var buffer bytes.Buffer
workersService := mpiJob.Name + workerSuffix
slots := 1
if mpiJob.Spec.SlotsPerWorker != nil {
slots = int(*mpiJob.Spec.SlotsPerWorker)
}
for i := 0; i < int(workerReplicas); i++ {
buffer.WriteString(fmt.Sprintf("%s%s-%d.%s.%s.svc\n", mpiJob.Name, workerSuffix, i, workersService, mpiJob.Namespace))
switch mpiJob.Spec.MPIImplementation {
case kubeflow.MPIImplementationOpenMPI:
buffer.WriteString(fmt.Sprintf("%s%s-%d.%s.%s.svc slots=%d\n", mpiJob.Name, workerSuffix, i, workersService, mpiJob.Namespace, slots))
case kubeflow.MPIImplementationIntel:
buffer.WriteString(fmt.Sprintf("%s%s-%d.%s.%s.svc:%d\n", mpiJob.Name, workerSuffix, i, workersService, mpiJob.Namespace, slots))
}
}

return &corev1.ConfigMap{
Expand Down
71 changes: 70 additions & 1 deletion pkg/controller/mpi_job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ var (

ignoreConditionTimes = cmpopts.IgnoreFields(kubeflow.JobCondition{}, "LastUpdateTime", "LastTransitionTime")
ignoreSecretEntries = cmpopts.IgnoreMapEntries(func(k string, v []uint8) bool { return true })
ignoreReferences = cmpopts.IgnoreFields(metav1.ObjectMeta{}, "OwnerReferences")
)

type fixture struct {
Expand Down Expand Up @@ -1476,7 +1477,6 @@ func TestNewLauncherAndWorker(t *testing.T) {
},
},
}
ignoreReferences := cmpopts.IgnoreFields(metav1.ObjectMeta{}, "OwnerReferences")
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
job := tc.job.DeepCopy()
Expand All @@ -1500,6 +1500,75 @@ func TestNewLauncherAndWorker(t *testing.T) {
}
}

func TestNewConfigMap(t *testing.T) {
testCases := map[string]struct {
mpiJob *kubeflow.MPIJob
workerReplicas int32
wantCM *corev1.ConfigMap
}{
"OpenMPI without slots": {
mpiJob: &kubeflow.MPIJob{
ObjectMeta: metav1.ObjectMeta{
Name: "openmpi-without-slots",
Namespace: "tenant-a",
},
Spec: kubeflow.MPIJobSpec{
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
},
},
workerReplicas: 2,
wantCM: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "openmpi-without-slots-config",
Namespace: "tenant-a",
Labels: map[string]string{
"app": "openmpi-without-slots",
},
},
Data: map[string]string{
"hostfile": "openmpi-without-slots-worker-0.openmpi-without-slots-worker.tenant-a.svc slots=1\nopenmpi-without-slots-worker-1.openmpi-without-slots-worker.tenant-a.svc slots=1\n",
},
},
},
"IntelMPI with slots": {
mpiJob: &kubeflow.MPIJob{
ObjectMeta: metav1.ObjectMeta{
Name: "intelmpi-with-slots",
Namespace: "project-x",
},
Spec: kubeflow.MPIJobSpec{
SlotsPerWorker: pointer.Int32(10),
MPIImplementation: kubeflow.MPIImplementationIntel,
},
},
workerReplicas: 1,
wantCM: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "intelmpi-with-slots-config",
Namespace: "project-x",
Labels: map[string]string{
"app": "intelmpi-with-slots",
},
},
Data: map[string]string{
"hostfile": "intelmpi-with-slots-worker-0.intelmpi-with-slots-worker.project-x.svc:10\n",
},
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
cm := newConfigMap(tc.mpiJob, tc.workerReplicas)
if !metav1.IsControlledBy(cm, tc.mpiJob) {
t.Errorf("Created configMap is not controlled by MPIJob")
}
if diff := cmp.Diff(tc.wantCM, cm, ignoreReferences); len(diff) != 0 {
t.Errorf("Unexpected configMap (-want,+got):\n%s", diff)
}
})
}
}

func newInt64(v int64) *int64 {
return &v
}
Expand Down
26 changes: 15 additions & 11 deletions test/e2e/mpi_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@ var _ = ginkgo.Describe("MPIJob", func() {
ginkgo.BeforeEach(func() {
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers = []corev1.Container{
{
Name: "launcher",
Image: openMPIImage,
Command: []string{"mpirun"},
Name: "launcher",
Image: openMPIImage,
ImagePullPolicy: corev1.PullIfNotPresent, // use locally built image.
Command: []string{"mpirun"},
Args: []string{
"-n",
"2",
Expand All @@ -96,8 +97,9 @@ var _ = ginkgo.Describe("MPIJob", func() {
}
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Template.Spec.Containers = []corev1.Container{
{
Name: "worker",
Image: openMPIImage,
Name: "worker",
Image: openMPIImage,
ImagePullPolicy: corev1.PullIfNotPresent, // use locally built image.
},
}
})
Expand Down Expand Up @@ -190,9 +192,10 @@ var _ = ginkgo.Describe("MPIJob", func() {
mpiJob.Spec.MPIImplementation = kubeflow.MPIImplementationIntel
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers = []corev1.Container{
{
Name: "launcher",
Image: intelMPIImage,
Command: []string{}, // uses entrypoint.
Name: "launcher",
Image: intelMPIImage,
ImagePullPolicy: corev1.PullIfNotPresent, // use locally built image.
Command: []string{}, // uses entrypoint.
Args: []string{
"mpirun",
"-n",
Expand All @@ -203,9 +206,10 @@ var _ = ginkgo.Describe("MPIJob", func() {
}
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Template.Spec.Containers = []corev1.Container{
{
Name: "worker",
Image: intelMPIImage,
Command: []string{}, // uses entrypoint.
Name: "worker",
Image: intelMPIImage,
ImagePullPolicy: corev1.PullIfNotPresent, // use locally built image.
Command: []string{}, // uses entrypoint.
Args: []string{
"/usr/sbin/sshd",
"-De",
Expand Down

0 comments on commit 8b93cd0

Please sign in to comment.