Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix intel MPI E2E test image #417

Merged
merged 1 commit into from
Aug 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ images:

.PHONY: test_images
test_images:
${IMG_BUILDER} build -t kubeflow/mpi-pi:openmpi examples/pi
${IMG_BUILDER} build -t mpioperator/mpi-pi:openmpi examples/pi
${IMG_BUILDER} build -t mpioperator/mpi-pi:intel examples/pi -f examples/pi/intel.Dockerfile

.PHONY: tidy
tidy:
Expand Down
2 changes: 1 addition & 1 deletion examples/pi/intel-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fi
function resolve_host() {
host="$1"
check="nslookup $host"
max_retry=5
max_retry=10
counter=0
backoff=0.1
until $check > /dev/null
Expand Down
5 changes: 3 additions & 2 deletions v2/test/e2e/e2e_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ const (

defaultMPIOperatorImage = "kubeflow/mpi-operator:local"
defaultKindImage = "kindest/node:v1.21.2"
openMPIImage = "kubeflow/mpi-pi:openmpi"
openMPIImage = "mpioperator/mpi-pi:openmpi"
intelMPIImage = "mpioperator/mpi-pi:intel"
rootPath = "../../.."
kubectlPath = rootPath + "/bin/kubectl"
operatorManifestsPath = rootPath + "/manifests/overlays/dev"
Expand Down Expand Up @@ -131,7 +132,7 @@ func bootstrapKindCluster() error {
if err != nil {
return fmt.Errorf("creating kind cluster: %w", err)
}
err = runCommand(kindPath, "load", "docker-image", mpiOperatorImage, openMPIImage)
err = runCommand(kindPath, "load", "docker-image", mpiOperatorImage, openMPIImage, intelMPIImage)
if err != nil {
return fmt.Errorf("loading container images: %w", err)
}
Expand Down
97 changes: 85 additions & 12 deletions v2/test/e2e/mpi_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package e2e

import (
"context"
"fmt"
"io"

common "github.com/kubeflow/common/pkg/apis/common/v1"
kubeflow "github.com/kubeflow/mpi-operator/v2/pkg/apis/kubeflow/v2beta1"
Expand Down Expand Up @@ -63,8 +65,13 @@ var _ = ginkgo.Describe("MPIJob", func() {
Namespace: namespace,
},
Spec: kubeflow.MPIJobSpec{
RunPolicy: common.RunPolicy{
BackoffLimit: newInt32(10),
},
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{
kubeflow.MPIReplicaTypeLauncher: {},
kubeflow.MPIReplicaTypeLauncher: {
RestartPolicy: common.RestartPolicyOnFailure,
},
kubeflow.MPIReplicaTypeWorker: {
Replicas: newInt32(2),
},
Expand Down Expand Up @@ -100,7 +107,7 @@ var _ = ginkgo.Describe("MPIJob", func() {
mpiJob.Spec.RunPolicy.BackoffLimit = newInt32(1)
})
ginkgo.It("should fail", func() {
mpiJob := createJobAndWaitForCompletion(namespace, mpiJob)
mpiJob := createJobAndWaitForCompletion(mpiJob)
expectConditionToBeTrue(mpiJob, common.JobFailed)
})
})
Expand All @@ -112,7 +119,7 @@ var _ = ginkgo.Describe("MPIJob", func() {
})

ginkgo.It("should succeed", func() {
mpiJob := createJobAndWaitForCompletion(namespace, mpiJob)
mpiJob := createJobAndWaitForCompletion(mpiJob)
expectConditionToBeTrue(mpiJob, common.JobSucceeded)
})
})
Expand All @@ -135,7 +142,7 @@ var _ = ginkgo.Describe("MPIJob", func() {
})

ginkgo.It("should succeed", func() {
mpiJob := createJobAndWaitForCompletion(namespace, mpiJob)
mpiJob := createJobAndWaitForCompletion(mpiJob)
expectConditionToBeTrue(mpiJob, common.JobSucceeded)
})
})
Expand All @@ -149,21 +156,21 @@ var _ = ginkgo.Describe("MPIJob", func() {
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers = []corev1.Container{
{
Name: "launcher",
Image: openMPIImage,
Image: intelMPIImage,
Command: []string{}, // uses entrypoint.
Args: []string{
"mpirun",
"--allow-run-as-root",
"-n",
"2",
"1",
"/home/mpiuser/pi",
},
},
}
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Replicas = newInt32(1)
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Template.Spec.Containers = []corev1.Container{
{
Name: "worker",
Image: openMPIImage,
Image: intelMPIImage,
Command: []string{}, // uses entrypoint.
Args: []string{
"/usr/sbin/sshd",
Expand All @@ -174,34 +181,100 @@ var _ = ginkgo.Describe("MPIJob", func() {
})

ginkgo.It("should succeed", func() {
mpiJob := createJobAndWaitForCompletion(namespace, mpiJob)
mpiJob := createJobAndWaitForCompletion(mpiJob)
expectConditionToBeTrue(mpiJob, common.JobSucceeded)
})
})

})
})

func createJobAndWaitForCompletion(ns string, mpiJob *kubeflow.MPIJob) *kubeflow.MPIJob {
func createJobAndWaitForCompletion(mpiJob *kubeflow.MPIJob) *kubeflow.MPIJob {
ctx := context.Background()
var err error
ginkgo.By("Creating MPIJob")
mpiJob, err = mpiClient.KubeflowV2beta1().MPIJobs(ns).Create(ctx, mpiJob, metav1.CreateOptions{})
mpiJob, err = mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Create(ctx, mpiJob, metav1.CreateOptions{})
gomega.Expect(err).ToNot(gomega.HaveOccurred())

ginkgo.By("Waiting for MPIJob to finish")
err = wait.Poll(waitInterval, foreverTimeout, func() (bool, error) {
updatedJob, err := mpiClient.KubeflowV2beta1().MPIJobs(ns).Get(ctx, mpiJob.Name, metav1.GetOptions{})
updatedJob, err := mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Get(ctx, mpiJob.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
mpiJob = updatedJob
return mpiJob.Status.CompletionTime != nil, nil
})
if err != nil {
err := debugJob(ctx, mpiJob)
if err != nil {
fmt.Fprintf(ginkgo.GinkgoWriter, "Failed to debug job: %v\n", err)
}
}
gomega.Expect(err).ToNot(gomega.HaveOccurred())
return mpiJob
}

func debugJob(ctx context.Context, mpiJob *kubeflow.MPIJob) error {
selector := metav1.LabelSelector{
MatchLabels: map[string]string{
common.OperatorNameLabel: kubeflow.OperatorName,
common.JobNameLabel: mpiJob.Name,
common.JobRoleLabel: "launcher",
},
}
launcherPods, err := k8sClient.CoreV1().Pods(mpiJob.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: metav1.FormatLabelSelector(&selector),
})
if err != nil {
return fmt.Errorf("getting launcher Pods: %w", err)
}
if len(launcherPods.Items) == 0 {
return fmt.Errorf("no launcher Pods found")
}
lastPod := launcherPods.Items[0]
for _, p := range launcherPods.Items[1:] {
if p.CreationTimestamp.After(p.CreationTimestamp.Time) {
lastPod = p
}
}
err = podLogs(ctx, &lastPod)
if err != nil {
return fmt.Errorf("obtaining launcher logs: %w", err)
}

selector.MatchLabels[common.JobRoleLabel] = "worker"
workerPods, err := k8sClient.CoreV1().Pods(mpiJob.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: metav1.FormatLabelSelector(&selector),
})
if err != nil {
return fmt.Errorf("getting worker Pods: %w", err)
}
for _, p := range workerPods.Items {
err = podLogs(ctx, &p)
if err != nil {
return fmt.Errorf("obtaining worker logs: %w", err)
}
}
return nil
}

func podLogs(ctx context.Context, p *corev1.Pod) error {
req := k8sClient.CoreV1().Pods(p.Namespace).GetLogs(p.Name, &corev1.PodLogOptions{})
stream, err := req.Stream(ctx)
if err != nil {
return fmt.Errorf("reading logs: %v", err)
}
defer stream.Close()
fmt.Fprintf(ginkgo.GinkgoWriter, "== BEGIN %s pod logs ==\n", p.Name)
_, err = io.Copy(ginkgo.GinkgoWriter, stream)
if err != nil {
return fmt.Errorf("writing logs: %v", err)
}
fmt.Fprintf(ginkgo.GinkgoWriter, "\n== END %s pod logs ==\n", p.Name)
return nil
}

func expectConditionToBeTrue(mpiJob *kubeflow.MPIJob, condType common.JobConditionType) {
var condition *common.JobCondition
for _, cond := range mpiJob.Status.Conditions {
Expand Down