Skip to content

Commit

Permalink
add support for using Intel MPI(2019.7) and MVAPICH2 (#283)
Browse files Browse the repository at this point in the history
* + support for IntelMPI and MPICH
+ local minikube test pass
+ add new Spec "mpiDistribution"
@ 2020/7/27

* * fix ineffectual assignment
* change email address

* * update variable name

* * fix some spelling and naming problems

* + add more notes

* + auto filter prefix parameters

* * fix some spelling problem
* update notes about hostfile generating

* + mpich-format hostfile split

* + generate hosts for hydra to resolve hostname

* * update notes

* * fix sh script
+ move hosts sending and merging here
* use special type instaed of string

* * check return value

* * update options' name

* + add unit test for generateHosts

* ^ fixed lint reported errors
  • Loading branch information
milkybird98 authored Aug 3, 2020
1 parent 0edc69b commit 07bbb45
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 5 deletions.
4 changes: 4 additions & 0 deletions cmd/kubectl-delivery/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ func Run(opt *options.ServerOption) error {
continue
}
lines := strings.SplitN(string(line), " ", 2)
// When using Intel MPI or MPICH, the hostfile format is hostname:slots, so need spliting the line by colon.
if strings.Contains(lines[0], ":") {
lines = strings.SplitN(string(line), ":", 2)
}
if !strings.HasSuffix(lines[0], launcherPodSuffix) {
pods = append(pods, lines[0])
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/apis/kubeflow/v1alpha2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ type MPIJobSpec struct {
// active. The policies specified in `RunPolicy` take precedence over
// the following fields: `BackoffLimit` and `ActiveDeadlineSeconds`.
RunPolicy *common.RunPolicy `json:"runPolicy,omitempty"`

// MPIDistribution specifies name of the MPI framwork which is used
// Defaults to "OpenMPI"
// Options includes "OpenMPI", "IntelMPI" and "MPICH"
MPIDistribution *MPIDistributionType `json:"mpiDistribution,omitempty"`
}

// MPIReplicaType is the type for MPIReplica.
Expand All @@ -84,3 +89,17 @@ const (
// MPIReplicaTypeWorker is the type for worker replicas.
MPIReplicaTypeWorker MPIReplicaType = "Worker"
)

// MPIDistributionType is the type for MPIDistribution.
type MPIDistributionType string

const(
// MPIDistributionTypeOpenMPI is the type for Open MPI.
MPIDistributionTypeOpenMPI MPIDistributionType = "OpenMPI"

// MPIDistributionTypeIntelMPI is the type for Intel MPI.
MPIDistributionTypeIntelMPI MPIDistributionType = "IntelMPI"

// MPIDistributionTypeMPICH is the type for MPICh.
MPIDistributionTypeMPICH MPIDistributionType = "MPICH"
)
45 changes: 45 additions & 0 deletions pkg/controllers/kubectl_delivery/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package kubectl_delivery

import (
"bufio"
"fmt"
"os"
"sync"
"time"

Expand Down Expand Up @@ -114,7 +116,10 @@ func (c *KubectlDeliveryController) Run(threadiness int, stopCh <-chan struct{})
if ok := cache.WaitForCacheSync(stopCh, c.podSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
// Copy a list of pods to get their ip address
var workerPods []string
for name := range c.watchedPods {
workerPods = append(workerPods, name)
pod, err := c.podLister.Pods(c.namespace).Get(name)
if err != nil {
continue
Expand All @@ -139,6 +144,9 @@ func (c *KubectlDeliveryController) Run(threadiness int, stopCh <-chan struct{})
return nil
case <-ticker.C:
if len(c.watchedPods) == 0 {
if err := c.generateHosts("/etc/hosts", "/opt/kube/hosts", workerPods); err != nil {
return fmt.Errorf("Error generating hosts file: %v", err)
}
klog.Info("Shutting down workers")
return nil
}
Expand All @@ -147,6 +155,43 @@ func (c *KubectlDeliveryController) Run(threadiness int, stopCh <-chan struct{})
}
}

// generateHosts will get and record all workers' ip address in a hosts-format
// file, which would be sent to each worker pod before launching the remote
// process manager. It will create and write file to filePath, and will use
// pod lister to get ip address, so syncing is required before this.
func (c *KubectlDeliveryController) generateHosts(localHostsPath string, filePath string, workerPods []string) error {
var hosts string
// First, open local hosts file to read launcher pod ip
fd, err := os.Open(localHostsPath)
if err != nil {
return fmt.Errorf("can't open file[%s]: %v", localHostsPath, err)
}
defer fd.Close()
// Read the last line of hosts file -- the ip address of localhost
scanner := bufio.NewScanner(fd)
for scanner.Scan() {
hosts = scanner.Text()
}
// Use client-go to find up ip addresses of each node
for index := range workerPods {
pod, err := c.podLister.Pods(c.namespace).Get(workerPods[index])
if err != nil {
return fmt.Errorf("can't get IP address of node[%s]", workerPods[index])
}
hosts = fmt.Sprintf("%s\n%s\t%s", hosts, pod.Status.PodIP, pod.Name)
}
// Write the hosts-format ip record to volume, and will be sent to worker later.
fp, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return fmt.Errorf("can't create file[%s]: %v", filePath, err)
}
defer fp.Close()
if _, err := fp.WriteString(hosts); err != nil {
return fmt.Errorf("can't write file[%s]: %v", filePath, err)
}
return nil
}

// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// work queue.
Expand Down
58 changes: 58 additions & 0 deletions pkg/controllers/kubectl_delivery/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
package kubectl_delivery

import (
"io/ioutil"
"os"
"path/filepath"
"strconv"
"testing"

corev1 "k8s.io/api/core/v1"
Expand All @@ -38,3 +42,57 @@ func TestWaitingPodRunning(t *testing.T) {
f.setUpPods(fakePod)
f.run(namespace, podName)
}

// Test hosts file generating function
func TestGeneratingHostsFile(t *testing.T) {
namespace := "default"
podNames := []string{"test", "tester-2", "worker_3", "pod4"}
// content of fake local hosts file
content := []byte(`# this line is a comment
127.0.0.1 localhost
::1 localhost
234.98.76.54 uselesshost
10.234.56.78 launcher.fullname.test launcher`)

// content of excepted outputs
exceptedHosts := make(map[string]string)
exceptedHosts["launcher"] = "10.234.56.78"
exceptedHosts["launcher.fullname.test"] = "10.234.56.78"
f := newFixture(t)
// add fake worker pods
fakePod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
},
}
fakePod.Status.Phase = corev1.PodRunning
for index := range podNames {
fakePod.Status.PodIP = "123.123.123." + strconv.Itoa(index)
fakePod.ObjectMeta.Name = podNames[index]
exceptedHosts[fakePod.ObjectMeta.Name] = fakePod.Status.PodIP
f.setUpPods(fakePod.DeepCopy())
}
// set up temp directory for testing about files
p, tmphf := f.setUpTmpDir("hostsGenerating", content)
defer os.RemoveAll(p) // clean up the temp directory
tmpof := filepath.Join(p, "output")
c, _ := f.newController(namespace, podNames)
// generate new hosts file
err := c.generateHosts(tmphf, tmpof, podNames)
if err != nil {
t.Errorf("Error, cannot generating hosts of worker pods, errs: %v", err)
}
// get the output file content
outputContent, err := ioutil.ReadFile(tmpof)
if err != nil {
t.Fatal(err)
}
// slice the content of output to avoid space/tab interference
generatedHosts := f.getResolvedHosts(outputContent)
// check the output
for hostname, exceptedIP := range exceptedHosts {
if resolvedIP, ok := generatedHosts[hostname]; !ok || resolvedIP != exceptedIP {
t.Errorf("Error, generated hosts file incorrect. Host: %s, excepted: %s, resolved: %s", hostname, exceptedIP, resolvedIP)
}
}
}
46 changes: 46 additions & 0 deletions pkg/controllers/kubectl_delivery/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
package kubectl_delivery

import (
"bufio"
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -107,3 +112,44 @@ func (f *fixture) setUpPods(p *corev1.Pod) {
f.podLister = append(f.podLister, p)
f.kubeObjects = append(f.kubeObjects, p)
}

// getResolvedHosts will resolve the hosts file to a map object,
// with the hostname as key and IP address as value
func (f *fixture) getResolvedHosts(contentBytes []byte) map[string]string {
// create a scanner to read content line by line
hostRecords := make(map[string]string)
contentStrReader := strings.NewReader(string(contentBytes))
scanner := bufio.NewScanner(contentStrReader)
for scanner.Scan() {
line := scanner.Text()
if line[0] == '#' { // skip the comment line
continue
}
lines := strings.Fields(line)
if len(lines) == 0 { // skip the space line
continue
}
if len(lines) == 1 { // the format must has some mistakes
f.t.Error("Error, generated hosts file has wrong format.")
continue
}
for i := 1; i < len(lines); i++ {
hostRecords[lines[i]] = lines[0] // use map to record hosts
}
}
return hostRecords
}

// setUpTmpDir will create a temp directory and create a temp hosts file
// with provided file content, and return the path of the directory.
func (f *fixture) setUpTmpDir(dirName string, content []byte) (string, string) {
p, err := ioutil.TempDir(os.TempDir(), "hosts")
if err != nil {
f.t.Fatal(err)
}
tmphf := filepath.Join(p, "hosts")
if err := ioutil.WriteFile(tmphf, content, 0644); err != nil {
f.t.Fatal(err)
}
return p, tmphf
}
45 changes: 40 additions & 5 deletions pkg/controllers/v1alpha2/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,24 +958,44 @@ func (c *MPIJobController) doUpdateJobStatus(mpiJob *kubeflow.MPIJob) error {
// resource. It also sets the appropriate OwnerReferences on the resource so
// handleObject can discover the MPIJob resource that 'owns' it.
func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32) *corev1.ConfigMap {
// This part closely related to specific ssh commands.
// It is very likely to fail due to the version change of the MPI framework.
// Attempt to automatically filter prefix parameters by detecting "-" matches.
// In order to enable IntelMPI and MVAPICH2 to parse pod names, in the Init container,
// a hosts file containing all workers is generated based on the pods list.
// Will use kubectl to send it to the workers and append it to the end of the original hosts file.
kubexec := fmt.Sprintf(`#!/bin/sh
set -x
POD_NAME=$1
while [ ${POD_NAME%%${POD_NAME#?}} = "-" ]
do
shift
%s/kubectl exec ${POD_NAME}`, kubectlMountPath)
POD_NAME=$1
done
shift
%s/kubectl cp %s/hosts ${POD_NAME}:/etc/hosts_of_nodes
%s/kubectl exec ${POD_NAME}`, kubectlMountPath, kubectlMountPath, kubectlMountPath)
if len(mpiJob.Spec.MainContainer) > 0 {
kubexec = fmt.Sprintf("%s --container %s", kubexec, mpiJob.Spec.MainContainer)
}
kubexec = fmt.Sprintf("%s -- /bin/sh -c \"$*\"", kubexec)
kubexec = fmt.Sprintf("%s -- /bin/sh -c \"cat /etc/hosts_of_nodes >> /etc/hosts && $*\"", kubexec)

// If no processing unit is specified, default to 1 slot.
slots := 1
if mpiJob.Spec.SlotsPerWorker != nil {
slots = int(*mpiJob.Spec.SlotsPerWorker)
}
var buffer bytes.Buffer
// For the different MPI frameworks, the format of the hostfile file is inconsistent.
// For Intel MPI and MVAPICH2, use ":" syntax to indicate how many operating slots the current node has.
// But for Open MPI, use "slots=" syntax to achieve this function.
for i := 0; i < int(workerReplicas); i++ {
buffer.WriteString(fmt.Sprintf("%s%s-%d slots=%d\n", mpiJob.Name, workerSuffix, i, slots))
mpiDistribution := mpiJob.Spec.MPIDistribution
if mpiDistribution != nil && (*mpiDistribution == kubeflow.MPIDistributionTypeIntelMPI || *mpiDistribution == kubeflow.MPIDistributionTypeMPICH) {
buffer.WriteString(fmt.Sprintf("%s%s-%d:%d\n", mpiJob.Name, workerSuffix, i, slots))
} else {
buffer.WriteString(fmt.Sprintf("%s%s-%d slots=%d\n", mpiJob.Name, workerSuffix, i, slots))
}
}

return &corev1.ConfigMap{
Expand Down Expand Up @@ -1278,13 +1298,28 @@ func (c *MPIJobController) newLauncher(mpiJob *kubeflow.MPIJob, kubectlDeliveryI
return nil
}
container := podSpec.Spec.Containers[0]
// Different MPI frameworks use different environment variables
// to specify the path of the remote task launcher and hostfile file.
mpiRshExecPathEnvName := "OMPI_MCA_plm_rsh_agent"
mpiHostfilePathEnvName := "OMPI_MCA_orte_default_hostfile"
// If the MPIDistribution is not specificed as the "IntelMPI" or "MPICH",
// then think that the default "OpenMPI" will be used.
if mpiJob.Spec.MPIDistribution != nil {
if *mpiJob.Spec.MPIDistribution == kubeflow.MPIDistributionTypeIntelMPI {
mpiRshExecPathEnvName = "I_MPI_HYDRA_BOOTSTRAP_EXEC"
mpiHostfilePathEnvName = "I_MPI_HYDRA_HOST_FILE"
} else if *mpiJob.Spec.MPIDistribution == kubeflow.MPIDistributionTypeMPICH {
mpiRshExecPathEnvName = "HYDRA_LAUNCHER_EXEC"
mpiHostfilePathEnvName = "HYDRA_HOST_FILE"
}
}
container.Env = append(container.Env,
corev1.EnvVar{
Name: "OMPI_MCA_plm_rsh_agent",
Name: mpiRshExecPathEnvName,
Value: fmt.Sprintf("%s/%s", configMountPath, kubexecScriptName),
},
corev1.EnvVar{
Name: "OMPI_MCA_orte_default_hostfile",
Name: mpiHostfilePathEnvName,
Value: fmt.Sprintf("%s/%s", configMountPath, hostfileName),
},
// We overwrite these environment variables so that users will not
Expand Down

0 comments on commit 07bbb45

Please sign in to comment.