Skip to content

Commit

Permalink
Add support for Intel MPI
Browse files Browse the repository at this point in the history
Adds the field .spec.mpiImplementation, defaults to OpenMPI

The Intel implementation requires a Service fronting the launcher.
  • Loading branch information
alculquicondor committed Jul 29, 2021
1 parent 108a697 commit d1c3b7b
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 124 deletions.
3 changes: 3 additions & 0 deletions manifests/base/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ spec:
description: "Defines which Pods must be deleted after the Job completes"
sshAuthMountPath:
type: string
mpiImplementation:
type: string
enum: ["OpenMPI", "Intel"]
mpiReplicaSpecs:
type: object
properties:
Expand Down
3 changes: 3 additions & 0 deletions v2/pkg/apis/kubeflow/v2beta1/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func SetDefaults_MPIJob(mpiJob *MPIJob) {
if mpiJob.Spec.SSHAuthMountPath == "" {
mpiJob.Spec.SSHAuthMountPath = "/root/.ssh"
}
if mpiJob.Spec.MPIImplementation == "" {
mpiJob.Spec.MPIImplementation = MPIImplementationOpenMPI
}

// set default to Launcher
setDefaultsTypeLauncher(mpiJob.Spec.MPIReplicaSpecs[MPIReplicaTypeLauncher])
Expand Down
35 changes: 20 additions & 15 deletions v2/pkg/apis/kubeflow/v2beta1/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,28 @@ func TestSetDefaults_MPIJob(t *testing.T) {
"base defaults": {
want: MPIJob{
Spec: MPIJobSpec{
SlotsPerWorker: newInt32(1),
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone),
SSHAuthMountPath: "/root/.ssh",
SlotsPerWorker: newInt32(1),
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone),
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: MPIImplementationOpenMPI,
},
},
},
"base defaults overridden": {
job: MPIJob{
Spec: MPIJobSpec{
SlotsPerWorker: newInt32(10),
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning),
SSHAuthMountPath: "/home/mpiuser/.ssh",
SlotsPerWorker: newInt32(10),
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning),
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: MPIImplementationIntel,
},
},
want: MPIJob{
Spec: MPIJobSpec{
SlotsPerWorker: newInt32(10),
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning),
SSHAuthMountPath: "/home/mpiuser/.ssh",
SlotsPerWorker: newInt32(10),
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning),
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: MPIImplementationIntel,
},
},
},
Expand All @@ -61,9 +64,10 @@ func TestSetDefaults_MPIJob(t *testing.T) {
},
want: MPIJob{
Spec: MPIJobSpec{
SlotsPerWorker: newInt32(1),
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone),
SSHAuthMountPath: "/root/.ssh",
SlotsPerWorker: newInt32(1),
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone),
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: MPIImplementationOpenMPI,
MPIReplicaSpecs: map[MPIReplicaType]*common.ReplicaSpec{
MPIReplicaTypeLauncher: {
Replicas: newInt32(1),
Expand All @@ -83,9 +87,10 @@ func TestSetDefaults_MPIJob(t *testing.T) {
},
want: MPIJob{
Spec: MPIJobSpec{
SlotsPerWorker: newInt32(1),
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone),
SSHAuthMountPath: "/root/.ssh",
SlotsPerWorker: newInt32(1),
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone),
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: MPIImplementationOpenMPI,
MPIReplicaSpecs: map[MPIReplicaType]*common.ReplicaSpec{
MPIReplicaTypeWorker: {
Replicas: newInt32(0),
Expand Down
11 changes: 11 additions & 0 deletions v2/pkg/apis/kubeflow/v2beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ type MPIJobSpec struct {
// SSHAuthMountPath is the directory where SSH keys are mounted.
// Defaults to "/root/.ssh".
SSHAuthMountPath string `json:"sshAuthMountPath,omitempty"`

// MPIImplementation is the MPI implementation.
// Options are "OpenMPI" (default) and "Intel".
MPIImplementation MPIImplementation `json:"mpiImplementation,omitempty"`
}

// MPIReplicaType is the type for MPIReplica.
Expand All @@ -67,3 +71,10 @@ const (
// MPIReplicaTypeWorker is the type for worker replicas.
MPIReplicaTypeWorker MPIReplicaType = "Worker"
)

type MPIImplementation string

const (
MPIImplementationOpenMPI MPIImplementation = "OpenMPI"
MPIImplementationIntel MPIImplementation = "Intel"
)
17 changes: 13 additions & 4 deletions v2/pkg/apis/kubeflow/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,16 @@ import (
kubeflow "github.com/kubeflow/mpi-operator/v2/pkg/apis/kubeflow/v2beta1"
)

var validCleanPolicies = sets.NewString(
string(common.CleanPodPolicyNone),
string(common.CleanPodPolicyRunning),
string(common.CleanPodPolicyAll))
var (
validCleanPolicies = sets.NewString(
string(common.CleanPodPolicyNone),
string(common.CleanPodPolicyRunning),
string(common.CleanPodPolicyAll))

validMPIImplementations = sets.NewString(
string(kubeflow.MPIImplementationOpenMPI),
string(kubeflow.MPIImplementationIntel))
)

func ValidateMPIJob(job *kubeflow.MPIJob) field.ErrorList {
errs := validateMPIJobName(job)
Expand Down Expand Up @@ -68,6 +74,9 @@ func validateMPIJobSpec(spec *kubeflow.MPIJobSpec, path *field.Path) field.Error
if spec.SSHAuthMountPath == "" {
errs = append(errs, field.Required(path.Child("sshAuthMountPath"), "must have a mount path for SSH credentials"))
}
if !validMPIImplementations.Has(string(spec.MPIImplementation)) {
errs = append(errs, field.NotSupported(path.Child("mpiImplementation"), spec.MPIImplementation, validMPIImplementations.List()))
}
return errs
}

Expand Down
52 changes: 33 additions & 19 deletions v2/pkg/apis/kubeflow/validation/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ func TestValidateMPIJob(t *testing.T) {
Name: "foo",
},
Spec: v2beta1.MPIJobSpec{
SlotsPerWorker: newInt32(2),
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning),
SSHAuthMountPath: "/home/mpiuser/.ssh",
SlotsPerWorker: newInt32(2),
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning),
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: v2beta1.MPIImplementationIntel,
MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{
v2beta1.MPIReplicaTypeLauncher: {
Replicas: newInt32(1),
Expand All @@ -59,9 +60,10 @@ func TestValidateMPIJob(t *testing.T) {
Name: "foo",
},
Spec: v2beta1.MPIJobSpec{
SlotsPerWorker: newInt32(2),
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning),
SSHAuthMountPath: "/home/mpiuser/.ssh",
SlotsPerWorker: newInt32(2),
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning),
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: v2beta1.MPIImplementationIntel,
MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{
v2beta1.MPIReplicaTypeLauncher: {
Replicas: newInt32(1),
Expand Down Expand Up @@ -105,6 +107,10 @@ func TestValidateMPIJob(t *testing.T) {
Type: field.ErrorTypeRequired,
Field: "spec.sshAuthMountPath",
},
&field.Error{
Type: field.ErrorTypeNotSupported,
Field: "spec.mpiImplementation",
},
},
},
"invalid fields": {
Expand All @@ -113,9 +119,10 @@ func TestValidateMPIJob(t *testing.T) {
Name: "this-name-is-waaaaaaaay-too-long-for-a-worker-hostname",
},
Spec: v2beta1.MPIJobSpec{
SlotsPerWorker: newInt32(2),
CleanPodPolicy: newCleanPodPolicy("unknown"),
SSHAuthMountPath: "/root/.ssh",
SlotsPerWorker: newInt32(2),
CleanPodPolicy: newCleanPodPolicy("unknown"),
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: v2beta1.MPIImplementation("Unknown"),
MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{
v2beta1.MPIReplicaTypeLauncher: {
Replicas: newInt32(1),
Expand Down Expand Up @@ -145,6 +152,10 @@ func TestValidateMPIJob(t *testing.T) {
Type: field.ErrorTypeNotSupported,
Field: "spec.cleanPodPolicy",
},
{
Type: field.ErrorTypeNotSupported,
Field: "spec.mpiImplementation",
},
},
},
"empty replica specs": {
Expand All @@ -153,10 +164,11 @@ func TestValidateMPIJob(t *testing.T) {
Name: "foo",
},
Spec: v2beta1.MPIJobSpec{
SlotsPerWorker: newInt32(2),
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning),
SSHAuthMountPath: "/root/.ssh",
MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{},
SlotsPerWorker: newInt32(2),
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning),
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: v2beta1.MPIImplementationOpenMPI,
MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{},
},
},
wantErrs: field.ErrorList{
Expand All @@ -172,9 +184,10 @@ func TestValidateMPIJob(t *testing.T) {
Name: "foo",
},
Spec: v2beta1.MPIJobSpec{
SlotsPerWorker: newInt32(2),
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning),
SSHAuthMountPath: "/root/.ssh",
SlotsPerWorker: newInt32(2),
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning),
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: v2beta1.MPIImplementationOpenMPI,
MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{
v2beta1.MPIReplicaTypeLauncher: {},
v2beta1.MPIReplicaTypeWorker: {},
Expand Down Expand Up @@ -206,9 +219,10 @@ func TestValidateMPIJob(t *testing.T) {
Name: "foo",
},
Spec: v2beta1.MPIJobSpec{
SlotsPerWorker: newInt32(2),
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning),
SSHAuthMountPath: "/root/.ssh",
SlotsPerWorker: newInt32(2),
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning),
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: v2beta1.MPIImplementationOpenMPI,
MPIReplicaSpecs: map[v2beta1.MPIReplicaType]*common.ReplicaSpec{
v2beta1.MPIReplicaTypeLauncher: {
Replicas: newInt32(2),
Expand Down
Loading

0 comments on commit d1c3b7b

Please sign in to comment.