Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for using Intel MPI(2019.7) and MVAPICH2 #283

Merged
merged 15 commits into from
Aug 3, 2020
5 changes: 5 additions & 0 deletions pkg/apis/kubeflow/v1alpha2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ type MPIJobSpec struct {
// active. The policies specified in `RunPolicy` take precedence over
// the following fields: `BackoffLimit` and `ActiveDeadlineSeconds`.
RunPolicy *common.RunPolicy `json:"runPolicy,omitempty"`

// MPIDistribution specifies name of the MPI framwork which is used
// Defaults to "open_mpi"
// Options includes "open_mpi", "intel_mpi" and "mpich"
MPIDistribution string `json:"mpiDistribution,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we define it as a type instead of a string? (PS, personally, prefer OpenMPI, IntelMPI and MPICH here)

type MPIDistribution string

const (
    MPIDistributionOpenMPI = "openMPI"
...
)

}

// MPIReplicaType is the type for MPIReplica.
Expand Down
38 changes: 35 additions & 3 deletions pkg/controllers/v1alpha2/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,9 +958,17 @@ func (c *MPIJobController) doUpdateJobStatus(mpiJob *kubeflow.MPIJob) error {
// resource. It also sets the appropriate OwnerReferences on the resource so
// handleObject can discover the MPIJob resource that 'owns' it.
func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32) *corev1.ConfigMap {
// This part closely related to specific ssh commands.
// It is very likely to fail due to the version change of the MPI framework.
// Attempt to automatically filter prefix parameters by detecting "-" matches.
kubexec := fmt.Sprintf(`#!/bin/sh
set -x
POD_NAME=$1
while [ ${POD_NAME:0:1} = "-" ]
do
shift
POD_NAME=$1
done
shift
%s/kubectl exec ${POD_NAME}`, kubectlMountPath)
if len(mpiJob.Spec.MainContainer) > 0 {
Expand All @@ -974,8 +982,16 @@ shift
slots = int(*mpiJob.Spec.SlotsPerWorker)
}
var buffer bytes.Buffer
// According to the specified MPI framework, construct a host file that meets the format requirements.
// For Intel MPI and MVAPICH2 (Basically follow the MPICH standard),
// use ":" syntax to indicate how many operating slots the current node has.
// But for Open MPI, use "slots=" syntax to achieve this function.
for i := 0; i < int(workerReplicas); i++ {
buffer.WriteString(fmt.Sprintf("%s%s-%d slots=%d\n", mpiJob.Name, workerSuffix, i, slots))
if mpiJob.Spec.MPIDistribution == "intel_mpi" || mpiJob.Spec.MPIDistribution == "mpich" {
buffer.WriteString(fmt.Sprintf("%s%s-%d:%d\n", mpiJob.Name, workerSuffix, i, slots))
}else{
buffer.WriteString(fmt.Sprintf("%s%s-%d slots=%d\n", mpiJob.Name, workerSuffix, i, slots))
}
}

return &corev1.ConfigMap{
Expand Down Expand Up @@ -1278,13 +1294,29 @@ func (c *MPIJobController) newLauncher(mpiJob *kubeflow.MPIJob, kubectlDeliveryI
return nil
}
container := podSpec.Spec.Containers[0]
// Different MPI frameworks use different environment variables
// to specify the path of the remote task launcher and hostfile file.
var mpiRshExecPathEnvName string
var mpiHostfilePathEnvName string
if mpiJob.Spec.MPIDistribution == "intel_mpi" {
mpiRshExecPathEnvName = "I_MPI_HYDRA_BOOTSTRAP_EXEC"
mpiHostfilePathEnvName = "I_MPI_HYDRA_HOST_FILE"
}else if mpiJob.Spec.MPIDistribution == "mpich" {
mpiRshExecPathEnvName = "HYDRA_LAUNCHER_EXEC"
mpiHostfilePathEnvName = "HYDRA_HOST_FILE"
}else{
// If the MPIDistribution is not specificed as the "intel_mpi" or "mpich",
// then think that the default "open_mpi" will be used.
mpiRshExecPathEnvName = "OMPI_MCA_plm_rsh_agent"
mpiHostfilePathEnvName = "OMPI_MCA_orte_default_hostfile"
}
container.Env = append(container.Env,
corev1.EnvVar{
Name: "OMPI_MCA_plm_rsh_agent",
Name: mpiRshExecPathEnvName,
Value: fmt.Sprintf("%s/%s", configMountPath, kubexecScriptName),
},
corev1.EnvVar{
Name: "OMPI_MCA_orte_default_hostfile",
Name: mpiHostfilePathEnvName,
Value: fmt.Sprintf("%s/%s", configMountPath, hostfileName),
},
// We overwrite these environment variables so that users will not
Expand Down