Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pytorch): Add init container config to avoid DNS lookup failure #1493

Merged
merged 4 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/training-operator.v1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"os"

"github.com/kubeflow/training-operator/pkg/config"
controller_v1 "github.com/kubeflow/training-operator/pkg/controller.v1"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
Expand Down Expand Up @@ -70,6 +71,13 @@ func main() {
flag.Var(&enabledSchemes, "enable-scheme", "Enable scheme(s) as --enable-scheme=tfjob --enable-scheme=pytorchjob, case insensitive."+
" Now supporting TFJob, PyTorchJob, MXNetJob, XGBoostJob. By default, all supported schemes will be enabled.")
flag.BoolVar(&enableGangScheduling, "enable-gang-scheduling", false, "Set true to enable gang scheduling")

// PyTorch related flags
flag.StringVar(&config.Config.PyTorchInitContainerImage, "pytorch-init-container-image",
config.PyTorchInitContainerImageDefault, "The image for pytorch init container")
flag.StringVar(&config.Config.PyTorchInitContainerTemplateFile, "pytorch-init-container-template-file",
config.PyTorchInitContainerTemplateFileDefault, "The template file for pytorch init container")

opts := zap.Options{
Development: true,
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
k8s.io/klog v1.0.0
k8s.io/kube-openapi v0.0.0-20200805222855-6aeccd4b50c6
sigs.k8s.io/controller-runtime v0.7.2
sigs.k8s.io/yaml v1.2.0
volcano.sh/apis v1.2.0-k8s1.19.6
)

Expand Down Expand Up @@ -87,5 +88,4 @@ require (
k8s.io/klog/v2 v2.2.0 // indirect
k8s.io/utils v0.0.0-20200912215256-4140de9c8800 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.0.1 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)
16 changes: 16 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package config

// Config is the global configuration for the training operator.
var Config struct {
PyTorchInitContainerTemplateFile string
PyTorchInitContainerImage string
}

const (
// PyTorchInitContainerImageDefault is the default image for the pytorch
// init container.
PyTorchInitContainerImageDefault = "alpine:3.10"
Jeffwan marked this conversation as resolved.
Show resolved Hide resolved
// PyTorchInitContainerTemplateFileDefault is the default template file for
// the pytorch init container.
PyTorchInitContainerTemplateFileDefault = "/etc/config/initContainer.yaml"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw PyTorchInitContainerImage in the original pytorch-operator, but not PyTorchInitContainerTemplateFileDefault. Do we really need this one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation in pytorch operator hard coded a path. Here we use a CLI flag to make it configurable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I would suggest @Jeffwan or other pytorch-controller contributor to take a look before merge.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/cc @Jeffwan

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does user have any requirements to custom init container? Could we make it completely transparent to users?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they may update the resource requests. WDYT

/cc @johnugeorge

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd like to custom the init-container-image bescause of my project worked on a network limitations env,what shou i do?

)
5 changes: 1 addition & 4 deletions pkg/controller.v1/pytorch/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@ var (
onceElastic sync.Once
)

type EnvVarGenerator interface {
Generate(job *pytorchv1.PyTorchJob) ([]corev1.EnvVar, error)
}

// ElasticEnvVarGenerator is the environment variable generator for Elastic related arguments.
type ElasticEnvVarGenerator struct{}

func GetElasticEnvVarGenerator() EnvVarGenerator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,31 @@ import (
corev1 "k8s.io/api/core/v1"
)

func SetPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, index string) error {
// EnvVarGenerator is the environment variable generator interface.
type EnvVarGenerator interface {
Generate(job *pytorchv1.PyTorchJob) ([]corev1.EnvVar, error)
}

func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, index string) error {
pytorchjob, ok := obj.(*pytorchv1.PyTorchJob)
if !ok {
return fmt.Errorf("%+v is not a type of PyTorchJob", obj)
}

for i := range podTemplateSpec.Spec.Containers {
// Initialize the environment variables.
if len(podTemplateSpec.Spec.Containers[i].Env) == 0 {
podTemplateSpec.Spec.Containers[i].Env = make([]corev1.EnvVar, 0)
}
podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{
Name: "PYTHONUNBUFFERED",
Value: "0",
})
// Set PYTHONUNBUFFERED to true, to disable output buffering.
// Ref https://stackoverflow.com/questions/59812009/what-is-the-use-of-pythonunbuffered-in-docker-file.
podTemplateSpec.Spec.Containers[i].Env = append(
podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{
Name: "PYTHONUNBUFFERED",
Value: "0",
})

// If the master is not null, then we need to set the MASTER_ADDR and RANK.
if pytorchjob.Spec.PyTorchReplicaSpecs[pytorchv1.PyTorchReplicaTypeMaster] != nil {
envVars, err := GetMasterEnvVarGenerator().Generate(pytorchjob)
if err != nil {
Expand Down Expand Up @@ -69,14 +79,16 @@ func SetPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype,
})
}

envVars, err := GetElasticEnvVarGenerator().Generate(pytorchjob)
if err != nil {
return err
// Set the elastic environment variables if the elasticPolicy is not null.
if pytorchjob.Spec.ElasticPolicy != nil {
envVars, err := GetElasticEnvVarGenerator().Generate(pytorchjob)
if err != nil {
return err
}
// Set elastic related environment variables.
podTemplateSpec.Spec.Containers[i].Env = append(
podTemplateSpec.Spec.Containers[i].Env, envVars...)
}
// Set elastic related environment variables.
podTemplateSpec.Spec.Containers[i].Env = append(
podTemplateSpec.Spec.Containers[i].Env, envVars...)

}

return nil
Expand Down
133 changes: 133 additions & 0 deletions pkg/controller.v1/pytorch/initcontainer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright 2021 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License

package pytorch

import (
"bytes"
"fmt"
"html/template"
"io/ioutil"
"strconv"
"strings"
"sync"

"github.com/go-logr/logr"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/yaml"

pytorchv1 "github.com/kubeflow/training-operator/pkg/apis/pytorch/v1"
"github.com/kubeflow/training-operator/pkg/config"
)

var (
initContainerTemplate = `
- name: init-pytorch
image: {{.InitContainerImage}}
imagePullPolicy: IfNotPresent
resources:
limits:
cpu: 100m
Jeffwan marked this conversation as resolved.
Show resolved Hide resolved
memory: 20Mi
requests:
cpu: 50m
memory: 10Mi
command: ['sh', '-c', 'until nslookup {{.MasterAddr}}; do echo waiting for master; sleep 2; done;']`
Jeffwan marked this conversation as resolved.
Show resolved Hide resolved
onceInitContainer sync.Once
icGenerator *initContainerGenerator
)

type initContainerGenerator struct {
template string
image string
}

func getInitContainerGenerator() *initContainerGenerator {
onceInitContainer.Do(func() {
icGenerator = &initContainerGenerator{
template: getInitContainerTemplateOrDefault(config.Config.PyTorchInitContainerTemplateFile),
image: config.Config.PyTorchInitContainerImage,
}
})
return icGenerator
}

func (i *initContainerGenerator) GetInitContainer(masterAddr string) ([]v1.Container, error) {
var buf bytes.Buffer
tpl, err := template.New("container").Parse(i.template)
if err != nil {
return nil, err
}
if err := tpl.Execute(&buf, struct {
MasterAddr string
InitContainerImage string
}{
MasterAddr: masterAddr,
InitContainerImage: i.image,
}); err != nil {
return nil, err
}

var result []v1.Container
err = yaml.Unmarshal(buf.Bytes(), &result)
if err != nil {
return nil, err
}

return result, nil
}

// getInitContainerTemplateOrDefault returns the init container template file if
// it exists, or return initContainerTemplate by default.
func getInitContainerTemplateOrDefault(file string) string {
bytes, err := ioutil.ReadFile(file)
if err == nil {
return string(bytes)
}
return initContainerTemplate
}

func setInitContainer(obj interface{}, podTemplate *v1.PodTemplateSpec,
rtype, index string, log logr.Logger) error {
pytorchjob, ok := obj.(*pytorchv1.PyTorchJob)
if !ok {
return fmt.Errorf("%+v is not a type of PyTorchJob", obj)
}
logger := log.WithValues(pytorchv1.Singular, types.NamespacedName{
Namespace: pytorchjob.Namespace,
Name: pytorchjob.Name,
})

// There is no need to set init container if no master is specified.
if pytorchjob.Spec.PyTorchReplicaSpecs[pytorchv1.PyTorchReplicaTypeMaster] == nil {
logger.V(1).Info("No master is specified, skip setting init container")
return nil
}

// Set the init container only if the master is specified and the current
// rtype is worker.
if rtype == strings.ToLower(string(pytorchv1.PyTorchReplicaTypeWorker)) {
g := getInitContainerGenerator()
initContainers, err := g.GetInitContainer(genGeneralName(pytorchjob.Name,
strings.ToLower(string(pytorchv1.PyTorchReplicaTypeMaster)), strconv.Itoa(0)))
if err != nil {
return err
}
podTemplate.Spec.InitContainers = append(podTemplate.Spec.InitContainers,
initContainers...)

}
return nil
}
109 changes: 109 additions & 0 deletions pkg/controller.v1/pytorch/initcontainer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2021 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License

package pytorch

import (
"strings"
"testing"

"github.com/go-logr/logr"
commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"

pytorchv1 "github.com/kubeflow/training-operator/pkg/apis/pytorch/v1"
"github.com/kubeflow/training-operator/pkg/config"
)

func TestInitContainer(t *testing.T) {
gomega.RegisterFailHandler(ginkgo.Fail)
defer ginkgo.GinkgoRecover()

config.Config.PyTorchInitContainerImage = config.PyTorchInitContainerImageDefault
config.Config.PyTorchInitContainerTemplateFile = config.PyTorchInitContainerTemplateFileDefault

testCases := []struct {
job *pytorchv1.PyTorchJob
rtype commonv1.ReplicaType
index string
expected int
exepctedErr error
}{
{
job: &pytorchv1.PyTorchJob{
Spec: pytorchv1.PyTorchJobSpec{
PyTorchReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
pytorchv1.PyTorchReplicaTypeWorker: {
Replicas: int32Ptr(1),
},
},
},
},
rtype: pytorchv1.PyTorchReplicaTypeWorker,
index: "0",
expected: 0,
exepctedErr: nil,
},
{
job: &pytorchv1.PyTorchJob{
Spec: pytorchv1.PyTorchJobSpec{
PyTorchReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
pytorchv1.PyTorchReplicaTypeWorker: {
Replicas: int32Ptr(1),
},
pytorchv1.PyTorchReplicaTypeMaster: {
Replicas: int32Ptr(1),
},
},
},
},
rtype: pytorchv1.PyTorchReplicaTypeWorker,
index: "0",
expected: 1,
exepctedErr: nil,
},
{
job: &pytorchv1.PyTorchJob{
Spec: pytorchv1.PyTorchJobSpec{
PyTorchReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
pytorchv1.PyTorchReplicaTypeWorker: {
Replicas: int32Ptr(1),
},
pytorchv1.PyTorchReplicaTypeMaster: {
Replicas: int32Ptr(1),
},
},
},
},
rtype: pytorchv1.PyTorchReplicaTypeMaster,
index: "0",
expected: 0,
exepctedErr: nil,
},
}

for _, t := range testCases {
log := logr.Discard()
podTemplateSpec := t.job.Spec.PyTorchReplicaSpecs[t.rtype].Template
err := setInitContainer(t.job, &podTemplateSpec,
strings.ToLower(string(t.rtype)), t.index, log)
if t.exepctedErr == nil {
gomega.Expect(err).To(gomega.BeNil())
} else {
gomega.Expect(err).To(gomega.Equal(t.exepctedErr))
}
gomega.Expect(len(podTemplateSpec.Spec.InitContainers)).To(gomega.Equal(t.expected))
}
}
5 changes: 3 additions & 2 deletions pkg/controller.v1/pytorch/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ import (

var (
masterGenerator EnvVarGenerator
once sync.Once
onceMaster sync.Once
EnvMasterPort = "MASTER_PORT"
EnvMasterAddr = "MASTER_ADDR"
)

// MasterEnvVarGenerator is the environment variable generator for Master related arguments.
type MasterEnvVarGenerator struct {
}

func GetMasterEnvVarGenerator() EnvVarGenerator {
once.Do(func() {
onceMaster.Do(func() {
masterGenerator = &MasterEnvVarGenerator{}
})
return masterGenerator
Expand Down
Loading