Skip to content

Commit

Permalink
Use podGroup instead of PDB in v1beta2
Browse files Browse the repository at this point in the history
  • Loading branch information
thandayuthapani committed Mar 8, 2019
1 parent 0e3756a commit b411cd9
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 13 deletions.
15 changes: 15 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ required = [
name = "github.com/stretchr/testify"
version = "1.2.2"

[[constraint]]
name = "github.com/kubernetes-sigs/kube-batch"
version = "v0.3"

[[constraint]]
name = "github.com/sirupsen/logrus"
version = "v1.0.4"
Expand Down
23 changes: 16 additions & 7 deletions cmd/tf-operator.v1beta2/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"github.com/kubeflow/tf-operator/pkg/client/clientset/versioned/scheme"
tfjobinformers "github.com/kubeflow/tf-operator/pkg/client/informers/externalversions"
controller "github.com/kubeflow/tf-operator/pkg/controller.v1beta2/tensorflow"
"github.com/kubeflow/tf-operator/pkg/util/kbutil"
"github.com/kubeflow/tf-operator/pkg/util/signals"
"github.com/kubeflow/tf-operator/pkg/version"
kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
crdclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
Expand Down Expand Up @@ -91,11 +93,14 @@ func Run(opt *options.ServerOption) error {
}

// Create clients.
kubeClientSet, leaderElectionClientSet, tfJobClientSet, err := createClientSets(kcfg)
kubeClientSet, leaderElectionClientSet, tfJobClientSet, kubeBatchClientSet, err := createClientSets(kcfg)
if err != nil {
return err
}

//Sets kube-batch Interface, used for accessing podGroup CRD APIs.
kbutil.SetKubeBatchClientInterface(kubeBatchClientSet)

// Create informer factory.
kubeInformerFactory := kubeinformers.NewFilteredSharedInformerFactory(kubeClientSet, resyncPeriod, opt.Namespace, nil)
tfJobInformerFactory := tfjobinformers.NewSharedInformerFactory(tfJobClientSet, resyncPeriod)
Expand Down Expand Up @@ -161,32 +166,36 @@ func Run(opt *options.ServerOption) error {
return nil
}

func createClientSets(config *restclientset.Config) (kubeclientset.Interface, kubeclientset.Interface, tfjobclientset.Interface, error) {
func createClientSets(config *restclientset.Config) (kubeclientset.Interface, kubeclientset.Interface, tfjobclientset.Interface, kubebatchclient.Interface, error) {

crdClient, err := crdclient.NewForConfig(config)

if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

checkCRDExists(crdClient, v1beta2.TFCRD)

kubeClientSet, err := kubeclientset.NewForConfig(restclientset.AddUserAgent(config, "tf-operator"))
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

leaderElectionClientSet, err := kubeclientset.NewForConfig(restclientset.AddUserAgent(config, "leader-election"))
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

tfJobClientSet, err := tfjobclientset.NewForConfig(config)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

return kubeClientSet, leaderElectionClientSet, tfJobClientSet, nil
kubeBatchClientSet, err := kubebatchclient.NewForConfig(restclientset.AddUserAgent(config, "kube-batch"))
if err != nil {
return nil, nil, nil, nil, err
}
return kubeClientSet, leaderElectionClientSet, tfJobClientSet, kubeBatchClientSet, nil
}

func checkCRDExists(clientset crdclient.Interface, crdName string) {
Expand Down
39 changes: 39 additions & 0 deletions examples/crd/crd-podgroup.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: podgroups.scheduling.incubator.k8s.io
spec:
group: scheduling.incubator.k8s.io
names:
kind: PodGroup
plural: podgroups
scope: Namespaced
validation:
openAPIV3Schema:
properties:
apiVersion:
type: string
kind:
type: string
metadata:
type: object
spec:
properties:
minMember:
format: int32
type: integer
type: object
status:
properties:
succeeded:
format: int32
type: integer
failed:
format: int32
type: integer
running:
format: int32
type: integer
type: object
type: object
version: v1alpha1
3 changes: 3 additions & 0 deletions examples/distribution_strategy/distributed_tfjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ spec:
replicas: 3
restartPolicy: Never
template:
metadata:
annotations:
scheduling.k8s.io/group-name: "distributed-training"
spec:
containers:
- name: tensorflow
Expand Down
48 changes: 48 additions & 0 deletions pkg/common/jobcontroller/jobcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"

"github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
"k8s.io/api/policy/v1beta1"
Expand All @@ -23,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/controller"

"github.com/kubeflow/tf-operator/pkg/control"
"github.com/kubeflow/tf-operator/pkg/util/kbutil"
)

// Common Interface to be implemented by all operators.
Expand Down Expand Up @@ -197,6 +199,34 @@ func (jc *JobController) GenLabels(jobName string) map[string]string {
}
}

func (jc *JobController) SyncPodGroup(job metav1.Object, minAvailableReplicas int32) (*v1alpha1.PodGroup, error) {

kubeBatchClientInterface := kbutil.GetKubeBatchClientInterface()
// Check whether podGroup exists or not
podGroup, err := kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(job.GetNamespace()).Get(job.GetName(), metav1.GetOptions{})
if err == nil || !k8serrors.IsNotFound(err) {
if err == nil {
err = errors.New(string(metav1.StatusReasonAlreadyExists))
}
return podGroup, err
}

// create podGroup for gang scheduling by kube-batch
minAvailable := intstr.FromInt(int(minAvailableReplicas))
createPodGroup := &v1alpha1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: job.GetName(),
OwnerReferences: []metav1.OwnerReference{
*jc.GenOwnerReference(job),
},
},
Spec: v1alpha1.PodGroupSpec{
MinMember: minAvailable.IntVal,
},
}
return kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(job.GetNamespace()).Create(createPodGroup)
}

// SyncPdb will create a PDB for gang scheduling by kube-arbitrator.
func (jc *JobController) SyncPdb(job metav1.Object, minAvailableReplicas int32) (*v1beta1.PodDisruptionBudget, error) {
labelJobName := jc.Controller.GetJobNameLabelKey()
Expand Down Expand Up @@ -231,6 +261,24 @@ func (jc *JobController) SyncPdb(job metav1.Object, minAvailableReplicas int32)
return jc.KubeClientSet.PolicyV1beta1().PodDisruptionBudgets(job.GetNamespace()).Create(createPdb)
}

func (jc *JobController) DeletePodGroup(job metav1.Object) error {
kubeBatchClientInterface := kbutil.GetKubeBatchClientInterface()

//check whether podGroup exists or not
_, err := kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(job.GetNamespace()).Get(job.GetName(), metav1.GetOptions{})
if err != nil && k8serrors.IsNotFound(err) {
return nil
}

log.Infof("Deleting PodGroup %s", job.GetName())

//delete podGroup
if err := kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(job.GetNamespace()).Delete(job.GetName(), &metav1.DeleteOptions{}); err != nil {
return fmt.Errorf("unable to delete PodGroup: %v", err)
}
return nil
}

func (jc *JobController) DeletePdb(job metav1.Object) error {

// Check the pdb exist or not
Expand Down
12 changes: 6 additions & 6 deletions pkg/controller.v1beta2/tensorflow/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,9 @@ func (tc *TFController) syncTFJob(key string) (bool, error) {

if tc.Config.EnableGangScheduling {
minAvailableReplicas := getTotalReplicas(tfjob)
_, err := tc.SyncPdb(tfjob, minAvailableReplicas)
_, err := tc.SyncPodGroup(tfjob, minAvailableReplicas)
if err != nil {
logger.Warnf("Sync pdb %v: %v", tfjob.Name, err)
logger.Warnf("Sync PodGroup %v: %v", tfjob.Name, err)
}
}

Expand Down Expand Up @@ -362,12 +362,12 @@ func (tc *TFController) reconcileTFJobs(tfjob *tfv1beta2.TFJob) error {
}

if tc.Config.EnableGangScheduling {
tc.Recorder.Event(tfjob, v1.EventTypeNormal, "JobTerminated", "Job is terminated, deleting pdb")
if err := tc.DeletePdb(tfjob); err != nil {
tc.Recorder.Eventf(tfjob, v1.EventTypeWarning, "FailedDeletePdb", "Error deleting: %v", err)
tc.Recorder.Event(tfjob, v1.EventTypeNormal, "JobTerminated", "Job is terminated, deleting PodGroup")
if err := tc.DeletePodGroup(tfjob); err != nil {
tc.Recorder.Eventf(tfjob, v1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err)
return err
} else {
tc.Recorder.Eventf(tfjob, v1.EventTypeNormal, "SuccessfulDeletePdb", "Deleted pdb: %v", tfjob.Name)
tc.Recorder.Eventf(tfjob, v1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted pdb: %v", tfjob.Name)

}
}
Expand Down
31 changes: 31 additions & 0 deletions pkg/util/kbutil/kbutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2019 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kbutil

import (
kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
)

var kubeBatchClientSet kubebatchclient.Interface

//SetKubeBatchClientSet sets kube-batch client set.
func SetKubeBatchClientInterface(clientset kubebatchclient.Interface) {
kubeBatchClientSet = clientset
}

// GetKubeBatchClientSet gets kube-batch client set.
func GetKubeBatchClientInterface() kubebatchclient.Interface {
return kubeBatchClientSet
}

0 comments on commit b411cd9

Please sign in to comment.