Skip to content
This repository has been archived by the owner on Sep 19, 2022. It is now read-only.

use init container for worker pod to wait master pod ready #187

Merged
merged 1 commit into from
Jul 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions pkg/common/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package config

import (
"io/ioutil"

log "github.com/sirupsen/logrus"
)

var initContainerTemplate = `
- name: init-pytorch
image: busybox:1.31.0
imagePullPolicy: IfNotPresent
command: ['sh', '-c', 'until nslookup {{.MasterAddr}}; do echo waiting for master; sleep 2; done;']`

func init() {
bytes, err := ioutil.ReadFile("/etc/config/initContainer.yaml")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why read init container template from a file of hard coded path? Is it better to add a new arg to the cmd to accept the init container template file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modified

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to expose InitContainerTemplate to user? Any particular use case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least, the image used in init container will be configured in the private cloud. And to avoid the other use case not in mind, expose InitContainerTemplate.

if err != nil {
log.Warningf("error while read initContainerTemplate, use default. error: %s", err)
} else {
initContainerTemplate = string(bytes)
}
}

func GetInitContainerTemplate() string {
return initContainerTemplate
}
48 changes: 5 additions & 43 deletions pkg/controller.v1/pytorch/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,23 +122,22 @@ func TestNormalPath(t *testing.T) {
nil, "",
false,
},
//Only master pod expectations will be created
"Distributed PyTorchJob (4 workers, 1 master) is created": {
4, 1,
nil, true,
0, 0, 0, 0,
0, 0, 0, 0,
0, 0,
1, 0, 5,
5, 0, 5,
0, 0, 0,
0, 0, 0,
nil, "",
false,
},
"Distributed PyTorchJob (4 workers, 1 master) is created, 1 master is pending": {
"Distributed PyTorchJob (4 workers, 1 master) is created, 1 master and 4 workers are pending": {
4, 1,
nil, true,
0, 0, 0, 0,
4, 0, 0, 0,
1, 0, 0, 0,
4, 1,
0, 0, 0,
Expand All @@ -147,50 +146,13 @@ func TestNormalPath(t *testing.T) {
nil, "",
false,
},
//Worker pod expecations are created when master is in running state
"Distributed PyTorchJob (4 workers, 1 master) is created, 1 master is running": {
4, 1,
nil, true,
0, 0, 0, 0,
0, 1, 0, 0,
4, 1,
4, 0, 0,
0, 0, 0,
1, 0, 0,
&jobRunning, pytorchJobRunningReason,
false,
},
"Distributed PyTorchJob (4 workers, 1 master) is created, 1 master running 1 worker pending": {
4, 1,
nil, true,
1, 0, 0, 0,
0, 1, 0, 0,
4, 1,
3, 0, 0,
0, 0, 0,
1, 0, 0,
&jobRunning, pytorchJobRunningReason,
false,
},
"Distributed PyTorchJob (4 workers, 1 master) is created, 1 master 1 worker are running": {
4, 1,
nil, true,
0, 1, 0, 0,
0, 1, 0, 0,
4, 1,
3, 0, 0,
1, 0, 0,
1, 0, 0,
&jobRunning, pytorchJobRunningReason,
false,
},
"Distributed PyTorchJob (4 workers, 1 master) is created, 2 workers pending, 1 master 1 worker are running": {
4, 1,
nil, true,
2, 1, 0, 0,
3, 1, 0, 0,
0, 1, 0, 0,
4, 1,
1, 0, 0,
0, 0, 0,
1, 0, 0,
1, 0, 0,
&jobRunning, pytorchJobRunningReason,
Expand Down
32 changes: 8 additions & 24 deletions pkg/controller.v1/pytorch/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,30 +56,7 @@ func (pc *PyTorchController) reconcilePods(
// Convert PyTorchReplicaType to lower string.
rt := strings.ToLower(string(rtype))
logger := pylogger.LoggerForReplica(job, rt)
// Workers are started only when master pod is in running state
if rtype == pyv1.PyTorchReplicaTypeWorker {
if ContainMasterSpec(job) {
masterPod, err := pc.FilterPodsForReplicaType(pods, strings.ToLower(string(pyv1.PyTorchReplicaTypeMaster)))
if err != nil {
return err
}
if len(masterPod) > 1 {
pylogger.LoggerForJob(job).Info("Invalid config: Job must contain only one master pod")
return errors.New("invalid config: Job must contain only one master pod")
} else if len(masterPod) == 1 {
if masterPod[0].Status.Phase != v1.PodRunning {
pylogger.LoggerForJob(job).Info("Master Pod is created but not yet in running phase")
return nil
}
} else {
pylogger.LoggerForJob(job).Info("Master Pod is not yet created")
return nil
}
} else {
pylogger.LoggerForJob(job).Info("Invalid config: Job must contain master replica spec")
return errors.New("invalid config: Job must contain master replica spec")
}
}

// Get all pods for the type rt.
pods, err := pc.FilterPodsForReplicaType(pods, rt)
if err != nil {
Expand Down Expand Up @@ -210,6 +187,13 @@ func (pc *PyTorchController) createNewPod(job *pyv1.PyTorchJob, rtype pyv1.PyTor
pc.Recorder.Event(job, v1.EventTypeWarning, podTemplateRestartPolicyReason, errMsg)
}
setRestartPolicy(podTemplate, spec)
if !masterRole {
masterAddr := jobcontroller.GenGeneralName(job.Name, strings.ToLower(string(pyv1.PyTorchReplicaTypeMaster)), strconv.Itoa(0))
err := AddInitContainerForWorkerPod(podTemplate, InitContainerParam{masterAddr})
if err != nil {
return err
}
}

// if gang-scheduling is enabled:
// 1. if user has specified other scheduler, we report a warning without overriding any fields.
Expand Down
38 changes: 38 additions & 0 deletions pkg/controller.v1/pytorch/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ package pytorch
import (
"fmt"

"bytes"
"html/template"

pyv1 "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1"
"github.com/kubeflow/pytorch-operator/pkg/common/config"
"gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
)

var (
Expand All @@ -40,9 +46,41 @@ func GetPortFromPyTorchJob(job *pyv1.PyTorchJob, rtype pyv1.PyTorchReplicaType)
return -1, errPortNotFound
}

type InitContainerParam struct {
MasterAddr string
}

func ContainMasterSpec(job *pyv1.PyTorchJob) bool {
if _, ok := job.Spec.PyTorchReplicaSpecs[pyv1.PyTorchReplicaTypeMaster]; ok {
return true
}
return false
}

func GetInitContainer(containerTemplate string, param InitContainerParam) ([]v1.Container, error) {
var buf bytes.Buffer
tpl, err := template.New("container").Parse(containerTemplate)
if err != nil {
return nil, err
}
if err := tpl.Execute(&buf, param); err != nil {
return nil, err
}

var result []v1.Container
err = yaml.Unmarshal(buf.Bytes(), &result)
if err != nil {
return nil, err
}

return result, nil
}

func AddInitContainerForWorkerPod(podTemplate *v1.PodTemplateSpec, param InitContainerParam) error {
containers, err := GetInitContainer(config.GetInitContainerTemplate(), param)
if err != nil {
return err
}
podTemplate.Spec.InitContainers = append(podTemplate.Spec.InitContainers, containers...)
return nil
}
17 changes: 17 additions & 0 deletions pkg/controller.v1/pytorch/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,20 @@ func TestConvertPyTorchJobToUnstructured(t *testing.T) {
t.Errorf("Expected error to be nil while got %v", err)
}
}

func TestGetInitContainer(t *testing.T) {
template := `
- name: init-pytorch
image: busybox
command: ['sh', '-c', 'until nslookup {{.MasterAddr}}; do echo waiting for master; sleep 2; done;']`

initContainer, err := GetInitContainer(template, InitContainerParam{"svc"})
if err != nil {
t.Errorf("Expected error to be nil while got %v", err)
}

expectedCMD := "until nslookup svc; do echo waiting for master; sleep 2; done;"
if initContainer[0].Command[2] != expectedCMD {
t.Errorf("Expected %s , got %s", expectedCMD, initContainer[0].Command[2])
}
}