Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use podGroup instead of PDB in v1beta2 #954

Merged
merged 2 commits into from
Mar 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ required = [
name = "github.com/stretchr/testify"
version = "1.2.2"

[[constraint]]
name = "github.com/kubernetes-sigs/kube-batch"
version = "v0.3"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use 0.4

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using using v0.4 of kube-batch, there is dependency conflict for k8s.io/api, which in kubeflow, v1.11.2 is used but in kube-batch 1.13.2 is used, that is why v0.3 is used for kube-batch

Copy link
Collaborator

@k82cn k82cn Mar 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richardsliu , any plan to upgrade k8s version? 1.14 is going to be release, and upstream only support three latest versions (1.12, 1.13, 1.14) until LTS.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With v1beta2, the subresource status requires 1.13.


[[constraint]]
name = "github.com/sirupsen/logrus"
version = "v1.0.4"
Expand Down
22 changes: 14 additions & 8 deletions cmd/tf-operator.v1beta1/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
controller "github.com/kubeflow/tf-operator/pkg/controller.v1beta1/tensorflow"
"github.com/kubeflow/tf-operator/pkg/util/signals"
"github.com/kubeflow/tf-operator/pkg/version"
kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
crdclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
Expand Down Expand Up @@ -91,7 +92,7 @@ func Run(opt *options.ServerOption) error {
}

// Create clients.
kubeClientSet, leaderElectionClientSet, tfJobClientSet, err := createClientSets(kcfg)
kubeClientSet, leaderElectionClientSet, tfJobClientSet, kubeBatchClientSet, err := createClientSets(kcfg)
if err != nil {
return err
}
Expand All @@ -103,7 +104,7 @@ func Run(opt *options.ServerOption) error {
unstructuredInformer := controller.NewUnstructuredTFJobInformer(kcfg, opt.Namespace)

// Create tf controller.
tc := controller.NewTFController(unstructuredInformer, kubeClientSet, tfJobClientSet, kubeInformerFactory, tfJobInformerFactory, *opt)
tc := controller.NewTFController(unstructuredInformer, kubeClientSet, kubeBatchClientSet, tfJobClientSet, kubeInformerFactory, tfJobInformerFactory, *opt)

// Start informer goroutines.
go kubeInformerFactory.Start(stopCh)
Expand Down Expand Up @@ -161,32 +162,37 @@ func Run(opt *options.ServerOption) error {
return nil
}

func createClientSets(config *restclientset.Config) (kubeclientset.Interface, kubeclientset.Interface, tfjobclientset.Interface, error) {
func createClientSets(config *restclientset.Config) (kubeclientset.Interface, kubeclientset.Interface, tfjobclientset.Interface, kubebatchclient.Interface, error) {

crdClient, err := crdclient.NewForConfig(config)

if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

checkCRDExists(crdClient, v1beta1.TFCRD)

kubeClientSet, err := kubeclientset.NewForConfig(restclientset.AddUserAgent(config, "tf-operator"))
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

leaderElectionClientSet, err := kubeclientset.NewForConfig(restclientset.AddUserAgent(config, "leader-election"))
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

tfJobClientSet, err := tfjobclientset.NewForConfig(config)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

return kubeClientSet, leaderElectionClientSet, tfJobClientSet, nil
kubeBatchClientSet, err := kubebatchclient.NewForConfig(restclientset.AddUserAgent(config, "kube-batch"))
if err != nil {
return nil, nil, nil, nil, err
}

return kubeClientSet, leaderElectionClientSet, tfJobClientSet, kubeBatchClientSet, nil
}

func checkCRDExists(clientset crdclient.Interface, crdName string) {
Expand Down
21 changes: 13 additions & 8 deletions cmd/tf-operator.v1beta2/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
controller "github.com/kubeflow/tf-operator/pkg/controller.v1beta2/tensorflow"
"github.com/kubeflow/tf-operator/pkg/util/signals"
"github.com/kubeflow/tf-operator/pkg/version"
kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
crdclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
Expand Down Expand Up @@ -91,7 +92,7 @@ func Run(opt *options.ServerOption) error {
}

// Create clients.
kubeClientSet, leaderElectionClientSet, tfJobClientSet, err := createClientSets(kcfg)
kubeClientSet, leaderElectionClientSet, tfJobClientSet, kubeBatchClientSet, err := createClientSets(kcfg)
if err != nil {
return err
}
Expand All @@ -103,7 +104,7 @@ func Run(opt *options.ServerOption) error {
unstructuredInformer := controller.NewUnstructuredTFJobInformer(kcfg, opt.Namespace)

// Create tf controller.
tc := controller.NewTFController(unstructuredInformer, kubeClientSet, tfJobClientSet, kubeInformerFactory, tfJobInformerFactory, *opt)
tc := controller.NewTFController(unstructuredInformer, kubeClientSet, kubeBatchClientSet, tfJobClientSet, kubeInformerFactory, tfJobInformerFactory, *opt)

// Start informer goroutines.
go kubeInformerFactory.Start(stopCh)
Expand Down Expand Up @@ -161,32 +162,36 @@ func Run(opt *options.ServerOption) error {
return nil
}

func createClientSets(config *restclientset.Config) (kubeclientset.Interface, kubeclientset.Interface, tfjobclientset.Interface, error) {
func createClientSets(config *restclientset.Config) (kubeclientset.Interface, kubeclientset.Interface, tfjobclientset.Interface, kubebatchclient.Interface, error) {

crdClient, err := crdclient.NewForConfig(config)

if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

checkCRDExists(crdClient, v1beta2.TFCRD)

kubeClientSet, err := kubeclientset.NewForConfig(restclientset.AddUserAgent(config, "tf-operator"))
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

leaderElectionClientSet, err := kubeclientset.NewForConfig(restclientset.AddUserAgent(config, "leader-election"))
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

tfJobClientSet, err := tfjobclientset.NewForConfig(config)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

return kubeClientSet, leaderElectionClientSet, tfJobClientSet, nil
kubeBatchClientSet, err := kubebatchclient.NewForConfig(restclientset.AddUserAgent(config, "kube-batch"))
if err != nil {
return nil, nil, nil, nil, err
}
return kubeClientSet, leaderElectionClientSet, tfJobClientSet, kubeBatchClientSet, nil
}

func checkCRDExists(clientset crdclient.Interface, crdName string) {
Expand Down
39 changes: 39 additions & 0 deletions examples/crd/crd-podgroup.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: podgroups.scheduling.incubator.k8s.io
spec:
group: scheduling.incubator.k8s.io
names:
kind: PodGroup
plural: podgroups
scope: Namespaced
validation:
openAPIV3Schema:
properties:
apiVersion:
type: string
kind:
type: string
metadata:
type: object
spec:
properties:
minMember:
format: int32
type: integer
type: object
status:
properties:
succeeded:
format: int32
type: integer
failed:
format: int32
type: integer
running:
format: int32
type: integer
type: object
type: object
version: v1alpha1
3 changes: 3 additions & 0 deletions examples/distribution_strategy/distributed_tfjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ spec:
replicas: 3
restartPolicy: Never
template:
metadata:
annotations:
scheduling.k8s.io/group-name: "distributed-training"
spec:
containers:
- name: tensorflow
Expand Down
67 changes: 59 additions & 8 deletions pkg/common/jobcontroller/jobcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"strings"

"github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
"k8s.io/api/policy/v1beta1"
Expand Down Expand Up @@ -88,6 +90,9 @@ type JobController struct {
// kubeClientSet is a standard kubernetes clientset.
KubeClientSet kubeclientset.Interface

//KubeBatchClientSet is a standard kube-batch clientset.
KubeBatchClientSet kubebatchclient.Interface

// podLister can list/get pods from the shared informer's store.
PodLister corelisters.PodLister

Expand Down Expand Up @@ -135,6 +140,7 @@ func NewJobController(
reconcilerSyncPeriod metav1.Duration,
enableGangScheduling bool,
kubeClientSet kubeclientset.Interface,
kubeBatchClientSet kubebatchclient.Interface,
kubeInformerFactory kubeinformers.SharedInformerFactory,
workQueueName string) JobController {

Expand All @@ -160,14 +166,15 @@ func NewJobController(
}

jc := JobController{
Controller: controllerImpl,
Config: jobControllerConfig,
PodControl: realPodControl,
ServiceControl: realServiceControl,
KubeClientSet: kubeClientSet,
Expectations: controller.NewControllerExpectations(),
WorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName),
Recorder: recorder,
Controller: controllerImpl,
Config: jobControllerConfig,
PodControl: realPodControl,
ServiceControl: realServiceControl,
KubeClientSet: kubeClientSet,
KubeBatchClientSet: kubeBatchClientSet,
Expectations: controller.NewControllerExpectations(),
WorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName),
Recorder: recorder,
}
return jc

Expand Down Expand Up @@ -197,6 +204,31 @@ func (jc *JobController) GenLabels(jobName string) map[string]string {
}
}

func (jc *JobController) SyncPodGroup(job metav1.Object, minAvailableReplicas int32) (*v1alpha1.PodGroup, error) {

kubeBatchClientInterface := jc.KubeBatchClientSet
// Check whether podGroup exists or not
podGroup, err := kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(job.GetNamespace()).Get(job.GetName(), metav1.GetOptions{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: use jc.KubeBatchClientSet directly?

if err == nil {
return podGroup, nil
}

// create podGroup for gang scheduling by kube-batch
minAvailable := intstr.FromInt(int(minAvailableReplicas))
createPodGroup := &v1alpha1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: job.GetName(),
OwnerReferences: []metav1.OwnerReference{
*jc.GenOwnerReference(job),
},
},
Spec: v1alpha1.PodGroupSpec{
MinMember: minAvailable.IntVal,
},
}
return kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(job.GetNamespace()).Create(createPodGroup)
}

// SyncPdb will create a PDB for gang scheduling by kube-arbitrator.
func (jc *JobController) SyncPdb(job metav1.Object, minAvailableReplicas int32) (*v1beta1.PodDisruptionBudget, error) {
labelJobName := jc.Controller.GetJobNameLabelKey()
Expand Down Expand Up @@ -231,6 +263,25 @@ func (jc *JobController) SyncPdb(job metav1.Object, minAvailableReplicas int32)
return jc.KubeClientSet.PolicyV1beta1().PodDisruptionBudgets(job.GetNamespace()).Create(createPdb)
}

func (jc *JobController) DeletePodGroup(job metav1.Object) error {
kubeBatchClientInterface := jc.KubeBatchClientSet

//check whether podGroup exists or not
_, err := kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(job.GetNamespace()).Get(job.GetName(), metav1.GetOptions{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: use jc.KubeBatchClientSet directly?

if err != nil && k8serrors.IsNotFound(err) {
return nil
}

log.Infof("Deleting PodGroup %s", job.GetName())

//delete podGroup
err = kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(job.GetNamespace()).Delete(job.GetName(), &metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("unable to delete PodGroup: %v", err)
}
return nil
}

func (jc *JobController) DeletePdb(job metav1.Object) error {

// Check the pdb exist or not
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller.v1beta1/tensorflow/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"time"

kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -98,6 +99,7 @@ func NewTFController(
// This variable is for unstructured informer.
tfJobInformer tfjobinformersv1beta1.TFJobInformer,
kubeClientSet kubeclientset.Interface,
kubeBatchClientSet kubebatchclient.Interface,
tfJobClientSet tfjobclientset.Interface,
kubeInformerFactory kubeinformers.SharedInformerFactory,
// This field is not used now but we keep it since it will be used
Expand All @@ -116,7 +118,7 @@ func NewTFController(
// Create base controller
log.Info("Creating Job controller")
jc := jobcontroller.NewJobController(tc, metav1.Duration{Duration: 15 * time.Second},
option.EnableGangScheduling, kubeClientSet, kubeInformerFactory, tfv1beta1.Plural)
option.EnableGangScheduling, kubeClientSet, kubeBatchClientSet, kubeInformerFactory, tfv1beta1.Plural)
tc.JobController = jc
// Set sync handler.
tc.syncHandler = tc.syncTFJob
Expand Down
Loading