From a762f6ddf3d4f962d0cd3ae519a4b901f4f92647 Mon Sep 17 00:00:00 2001 From: zlc Date: Tue, 9 Jul 2019 13:58:35 +0800 Subject: [PATCH] use init container for worker pod to wait master pod ready fix [186](https://github.com/kubeflow/pytorch-operator/issues/186) --- pkg/common/config/config.go | 26 +++++++++++ pkg/controller.v1/pytorch/controller_test.go | 48 ++------------------ pkg/controller.v1/pytorch/pod.go | 32 ++++--------- pkg/controller.v1/pytorch/util.go | 38 ++++++++++++++++ pkg/controller.v1/pytorch/util_test.go | 17 +++++++ 5 files changed, 94 insertions(+), 67 deletions(-) create mode 100644 pkg/common/config/config.go diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go new file mode 100644 index 000000000..5e138fb2e --- /dev/null +++ b/pkg/common/config/config.go @@ -0,0 +1,26 @@ +package config + +import ( + "io/ioutil" + + log "github.com/sirupsen/logrus" +) + +var initContainerTemplate = ` +- name: init-pytorch + image: busybox:1.31.0 + imagePullPolicy: IfNotPresent + command: ['sh', '-c', 'until nslookup {{.MasterAddr}}; do echo waiting for master; sleep 2; done;']` + +func init() { + bytes, err := ioutil.ReadFile("/etc/config/initContainer.yaml") + if err != nil { + log.Warningf("error while read initContainerTemplate, use default. error: %s", err) + } else { + initContainerTemplate = string(bytes) + } +} + +func GetInitContainerTemplate() string { + return initContainerTemplate +} diff --git a/pkg/controller.v1/pytorch/controller_test.go b/pkg/controller.v1/pytorch/controller_test.go index 165799cef..dc2ad24de 100644 --- a/pkg/controller.v1/pytorch/controller_test.go +++ b/pkg/controller.v1/pytorch/controller_test.go @@ -122,23 +122,22 @@ func TestNormalPath(t *testing.T) { nil, "", false, }, - //Only master pod expectations will be created "Distributed PyTorchJob (4 workers, 1 master) is created": { 4, 1, nil, true, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 1, 0, 5, + 5, 0, 5, 0, 0, 0, 0, 0, 0, nil, "", false, }, - "Distributed PyTorchJob (4 workers, 1 master) is created, 1 master is pending": { + "Distributed PyTorchJob (4 workers, 1 master) is created, 1 master and 4 workers are pending": { 4, 1, nil, true, - 0, 0, 0, 0, + 4, 0, 0, 0, 1, 0, 0, 0, 4, 1, 0, 0, 0, @@ -147,50 +146,13 @@ func TestNormalPath(t *testing.T) { nil, "", false, }, - //Worker pod expecations are created when master is in running state - "Distributed PyTorchJob (4 workers, 1 master) is created, 1 master is running": { - 4, 1, - nil, true, - 0, 0, 0, 0, - 0, 1, 0, 0, - 4, 1, - 4, 0, 0, - 0, 0, 0, - 1, 0, 0, - &jobRunning, pytorchJobRunningReason, - false, - }, - "Distributed PyTorchJob (4 workers, 1 master) is created, 1 master running 1 worker pending": { - 4, 1, - nil, true, - 1, 0, 0, 0, - 0, 1, 0, 0, - 4, 1, - 3, 0, 0, - 0, 0, 0, - 1, 0, 0, - &jobRunning, pytorchJobRunningReason, - false, - }, - "Distributed PyTorchJob (4 workers, 1 master) is created, 1 master 1 worker are running": { - 4, 1, - nil, true, - 0, 1, 0, 0, - 0, 1, 0, 0, - 4, 1, - 3, 0, 0, - 1, 0, 0, - 1, 0, 0, - &jobRunning, pytorchJobRunningReason, - false, - }, "Distributed PyTorchJob (4 workers, 1 master) is created, 2 workers pending, 1 master 1 worker are running": { 4, 1, nil, true, - 2, 1, 0, 0, + 3, 1, 0, 0, 0, 1, 0, 0, 4, 1, - 1, 0, 0, + 0, 0, 0, 1, 0, 0, 1, 0, 0, &jobRunning, pytorchJobRunningReason, diff --git a/pkg/controller.v1/pytorch/pod.go b/pkg/controller.v1/pytorch/pod.go index 33a10e94a..2cb26fb50 100644 --- a/pkg/controller.v1/pytorch/pod.go +++ b/pkg/controller.v1/pytorch/pod.go @@ -56,30 +56,7 @@ func (pc *PyTorchController) reconcilePods( // Convert PyTorchReplicaType to lower string. rt := strings.ToLower(string(rtype)) logger := pylogger.LoggerForReplica(job, rt) - // Workers are started only when master pod is in running state - if rtype == pyv1.PyTorchReplicaTypeWorker { - if ContainMasterSpec(job) { - masterPod, err := pc.FilterPodsForReplicaType(pods, strings.ToLower(string(pyv1.PyTorchReplicaTypeMaster))) - if err != nil { - return err - } - if len(masterPod) > 1 { - pylogger.LoggerForJob(job).Info("Invalid config: Job must contain only one master pod") - return errors.New("invalid config: Job must contain only one master pod") - } else if len(masterPod) == 1 { - if masterPod[0].Status.Phase != v1.PodRunning { - pylogger.LoggerForJob(job).Info("Master Pod is created but not yet in running phase") - return nil - } - } else { - pylogger.LoggerForJob(job).Info("Master Pod is not yet created") - return nil - } - } else { - pylogger.LoggerForJob(job).Info("Invalid config: Job must contain master replica spec") - return errors.New("invalid config: Job must contain master replica spec") - } - } + // Get all pods for the type rt. pods, err := pc.FilterPodsForReplicaType(pods, rt) if err != nil { @@ -210,6 +187,13 @@ func (pc *PyTorchController) createNewPod(job *pyv1.PyTorchJob, rtype pyv1.PyTor pc.Recorder.Event(job, v1.EventTypeWarning, podTemplateRestartPolicyReason, errMsg) } setRestartPolicy(podTemplate, spec) + if !masterRole { + masterAddr := jobcontroller.GenGeneralName(job.Name, strings.ToLower(string(pyv1.PyTorchReplicaTypeMaster)), strconv.Itoa(0)) + err := AddInitContainerForWorkerPod(podTemplate, InitContainerParam{masterAddr}) + if err != nil { + return err + } + } // if gang-scheduling is enabled: // 1. if user has specified other scheduler, we report a warning without overriding any fields. diff --git a/pkg/controller.v1/pytorch/util.go b/pkg/controller.v1/pytorch/util.go index fa6a6b215..52c55ab2b 100644 --- a/pkg/controller.v1/pytorch/util.go +++ b/pkg/controller.v1/pytorch/util.go @@ -17,7 +17,13 @@ package pytorch import ( "fmt" + "bytes" + "html/template" + pyv1 "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1" + "github.com/kubeflow/pytorch-operator/pkg/common/config" + "gopkg.in/yaml.v2" + v1 "k8s.io/api/core/v1" ) var ( @@ -40,9 +46,41 @@ func GetPortFromPyTorchJob(job *pyv1.PyTorchJob, rtype pyv1.PyTorchReplicaType) return -1, errPortNotFound } +type InitContainerParam struct { + MasterAddr string +} + func ContainMasterSpec(job *pyv1.PyTorchJob) bool { if _, ok := job.Spec.PyTorchReplicaSpecs[pyv1.PyTorchReplicaTypeMaster]; ok { return true } return false } + +func GetInitContainer(containerTemplate string, param InitContainerParam) ([]v1.Container, error) { + var buf bytes.Buffer + tpl, err := template.New("container").Parse(containerTemplate) + if err != nil { + return nil, err + } + if err := tpl.Execute(&buf, param); err != nil { + return nil, err + } + + var result []v1.Container + err = yaml.Unmarshal(buf.Bytes(), &result) + if err != nil { + return nil, err + } + + return result, nil +} + +func AddInitContainerForWorkerPod(podTemplate *v1.PodTemplateSpec, param InitContainerParam) error { + containers, err := GetInitContainer(config.GetInitContainerTemplate(), param) + if err != nil { + return err + } + podTemplate.Spec.InitContainers = append(podTemplate.Spec.InitContainers, containers...) + return nil +} diff --git a/pkg/controller.v1/pytorch/util_test.go b/pkg/controller.v1/pytorch/util_test.go index 9c0121a7d..b4201e095 100644 --- a/pkg/controller.v1/pytorch/util_test.go +++ b/pkg/controller.v1/pytorch/util_test.go @@ -78,3 +78,20 @@ func TestConvertPyTorchJobToUnstructured(t *testing.T) { t.Errorf("Expected error to be nil while got %v", err) } } + +func TestGetInitContainer(t *testing.T) { + template := ` +- name: init-pytorch + image: busybox + command: ['sh', '-c', 'until nslookup {{.MasterAddr}}; do echo waiting for master; sleep 2; done;']` + + initContainer, err := GetInitContainer(template, InitContainerParam{"svc"}) + if err != nil { + t.Errorf("Expected error to be nil while got %v", err) + } + + expectedCMD := "until nslookup svc; do echo waiting for master; sleep 2; done;" + if initContainer[0].Command[2] != expectedCMD { + t.Errorf("Expected %s , got %s", expectedCMD, initContainer[0].Command[2]) + } +}