Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for using Intel MPI(2019.7) and MVAPICH2 #283

Merged
merged 15 commits into from
Aug 3, 2020
4 changes: 4 additions & 0 deletions cmd/kubectl-delivery/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ func Run(opt *options.ServerOption) error {
continue
}
lines := strings.SplitN(string(line), " ", 2)
// When using Intel MPI or MPICH, the hostfile format is hostname:slots, so need spliting the line by colon.
if strings.Contains(lines[0], ":") {
lines = strings.SplitN(string(line), ":", 2)
}
if !strings.HasSuffix(lines[0], launcherPodSuffix) {
pods = append(pods, lines[0])
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/apis/kubeflow/v1alpha2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ type MPIJobSpec struct {
// active. The policies specified in `RunPolicy` take precedence over
// the following fields: `BackoffLimit` and `ActiveDeadlineSeconds`.
RunPolicy *common.RunPolicy `json:"runPolicy,omitempty"`

// MPIDistribution specifies name of the MPI framwork which is used
// Defaults to "open_mpi"
// Options includes "open_mpi", "intel_mpi" and "mpich"
MPIDistribution *MPIDistributionType `json:"mpiDistribution,omitempty"`
}

// MPIReplicaType is the type for MPIReplica.
Expand All @@ -84,3 +89,15 @@ const (
// MPIReplicaTypeWorker is the type for worker replicas.
MPIReplicaTypeWorker MPIReplicaType = "Worker"
)

// MPIDistributionType is the type for MPIDistribution.
type MPIDistributionType string

const(
// MPIDistributionTypeOpenMPI is the type for Open MPI.
MPIDistributionTypeOpenMPI MPIDistributionType = "open_mpi"
gaocegege marked this conversation as resolved.
Show resolved Hide resolved
// MPIDistributionTypeIntelMPI is the type for Intel MPI.
MPIDistributionTypeIntelMPI MPIDistributionType = "intel_mpi"
// MPIDistributionTypeMPICH is the type for MPICh.
MPIDistributionTypeMPICH MPIDistributionType = "mpich"
)
gaocegege marked this conversation as resolved.
Show resolved Hide resolved
33 changes: 33 additions & 0 deletions pkg/controllers/kubectl_delivery/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package kubectl_delivery

import (
"bufio"
"fmt"
"os"
"sync"
"time"

Expand Down Expand Up @@ -114,7 +116,10 @@ func (c *KubectlDeliveryController) Run(threadiness int, stopCh <-chan struct{})
if ok := cache.WaitForCacheSync(stopCh, c.podSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
// Copy a list of pods to get their ip address
var workerPods []string
for name := range c.watchedPods {
workerPods = append(workerPods, name)
pod, err := c.podLister.Pods(c.namespace).Get(name)
if err != nil {
continue
Expand All @@ -139,6 +144,34 @@ func (c *KubectlDeliveryController) Run(threadiness int, stopCh <-chan struct{})
return nil
case <-ticker.C:
if len(c.watchedPods) == 0 {
// Get and record all workers' ip address in a hosts file.
var hosts string
// First, open local hosts file to read launcher pod ip
fd, err := os.Open("/etc/hosts")
if err != nil {
klog.Fatalf("Error read file[%s]: %v", "/etc/hosts", err)
}
defer fd.Close()
// Read the last line of hosts file -- the ip address of localhost
scanner := bufio.NewScanner(fd)
for scanner.Scan() {
hosts = scanner.Text()
}
// Use client-go to find up ip addresses of each node
for index := range workerPods {
pod, err := c.podLister.Pods(c.namespace).Get(workerPods[index])
if err != nil {
continue
}
hosts = fmt.Sprintf("%s\n%s\t%s", hosts, pod.Status.PodIP, pod.Name)
}
// Write the hosts-format ip record to volume, and will be sent to worker later.
fp, err := os.OpenFile("/opt/kube/hosts", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
klog.Fatalf("Error write file[%s]: %v", "/opt/kube/hosts", err)
}
defer fp.Close()
fp.WriteString(hosts)
klog.Info("Shutting down workers")
return nil
}
Expand Down
45 changes: 40 additions & 5 deletions pkg/controllers/v1alpha2/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,24 +958,44 @@ func (c *MPIJobController) doUpdateJobStatus(mpiJob *kubeflow.MPIJob) error {
// resource. It also sets the appropriate OwnerReferences on the resource so
// handleObject can discover the MPIJob resource that 'owns' it.
func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32) *corev1.ConfigMap {
// This part closely related to specific ssh commands.
// It is very likely to fail due to the version change of the MPI framework.
// Attempt to automatically filter prefix parameters by detecting "-" matches.
// In order to enable IntelMPI and MVAPICH2 to parse pod names, in the Init container,
// a hosts file containing all workers is generated based on the pods list.
// Will use kubectl to send it to the workers and append it to the end of the original hosts file.
kubexec := fmt.Sprintf(`#!/bin/sh
set -x
POD_NAME=$1
while [ ${POD_NAME%%${POD_NAME#?}} = "-" ]
do
shift
%s/kubectl exec ${POD_NAME}`, kubectlMountPath)
POD_NAME=$1
done
shift
%s/kubectl cp %s/hosts ${POD_NAME}:/etc/hosts_of_nodes
%s/kubectl exec ${POD_NAME}`, kubectlMountPath, kubectlMountPath, kubectlMountPath)
if len(mpiJob.Spec.MainContainer) > 0 {
kubexec = fmt.Sprintf("%s --container %s", kubexec, mpiJob.Spec.MainContainer)
}
kubexec = fmt.Sprintf("%s -- /bin/sh -c \"$*\"", kubexec)
kubexec = fmt.Sprintf("%s -- /bin/sh -c \"cat /etc/hosts_of_nodes >> /etc/hosts && $*\"", kubexec)

// If no processing unit is specified, default to 1 slot.
slots := 1
if mpiJob.Spec.SlotsPerWorker != nil {
slots = int(*mpiJob.Spec.SlotsPerWorker)
}
var buffer bytes.Buffer
// For the different MPI frameworks, the format of the hostfile file is inconsistent.
// For Intel MPI and MVAPICH2, use ":" syntax to indicate how many operating slots the current node has.
// But for Open MPI, use "slots=" syntax to achieve this function.
for i := 0; i < int(workerReplicas); i++ {
buffer.WriteString(fmt.Sprintf("%s%s-%d slots=%d\n", mpiJob.Name, workerSuffix, i, slots))
mpiDistribution := mpiJob.Spec.MPIDistribution
if mpiDistribution != nil && (*mpiDistribution == kubeflow.MPIDistributionTypeIntelMPI || *mpiDistribution == kubeflow.MPIDistributionTypeMPICH) {
buffer.WriteString(fmt.Sprintf("%s%s-%d:%d\n", mpiJob.Name, workerSuffix, i, slots))
} else {
buffer.WriteString(fmt.Sprintf("%s%s-%d slots=%d\n", mpiJob.Name, workerSuffix, i, slots))
}
}

return &corev1.ConfigMap{
Expand Down Expand Up @@ -1278,13 +1298,28 @@ func (c *MPIJobController) newLauncher(mpiJob *kubeflow.MPIJob, kubectlDeliveryI
return nil
}
container := podSpec.Spec.Containers[0]
// Different MPI frameworks use different environment variables
// to specify the path of the remote task launcher and hostfile file.
mpiRshExecPathEnvName := "OMPI_MCA_plm_rsh_agent"
mpiHostfilePathEnvName := "OMPI_MCA_orte_default_hostfile"
// If the MPIDistribution is not specificed as the kubeflow.MPIDistributionTypeIntelMPI or "mpich",
// then think that the default "open_mpi" will be used.
if mpiJob.Spec.MPIDistribution != nil {
if *mpiJob.Spec.MPIDistribution == kubeflow.MPIDistributionTypeIntelMPI {
mpiRshExecPathEnvName = "I_MPI_HYDRA_BOOTSTRAP_EXEC"
mpiHostfilePathEnvName = "I_MPI_HYDRA_HOST_FILE"
} else if *mpiJob.Spec.MPIDistribution == kubeflow.MPIDistributionTypeMPICH {
mpiRshExecPathEnvName = "HYDRA_LAUNCHER_EXEC"
mpiHostfilePathEnvName = "HYDRA_HOST_FILE"
}
}
container.Env = append(container.Env,
corev1.EnvVar{
Name: "OMPI_MCA_plm_rsh_agent",
Name: mpiRshExecPathEnvName,
Value: fmt.Sprintf("%s/%s", configMountPath, kubexecScriptName),
},
corev1.EnvVar{
Name: "OMPI_MCA_orte_default_hostfile",
Name: mpiHostfilePathEnvName,
Value: fmt.Sprintf("%s/%s", configMountPath, hostfileName),
},
// We overwrite these environment variables so that users will not
Expand Down