Skip to content

Commit

Permalink
fix sync PodGroup logic
Browse files Browse the repository at this point in the history
  • Loading branch information
wackxu committed May 28, 2019
1 parent 671cdab commit 36dd05a
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 32 deletions.
26 changes: 19 additions & 7 deletions pkg/common/jobcontroller/jobcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"k8s.io/api/core/v1"
"k8s.io/api/policy/v1beta1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
kubeinformers "k8s.io/client-go/informers"
Expand All @@ -29,7 +31,6 @@ import (

// Common Interface to be implemented by all operators.
type ControllerInterface interface {

// Returns the Controller name
ControllerName() string

Expand Down Expand Up @@ -266,21 +267,32 @@ func (jc *JobController) SyncPdb(job metav1.Object, minAvailableReplicas int32)
return jc.KubeClientSet.PolicyV1beta1().PodDisruptionBudgets(job.GetNamespace()).Create(createPdb)
}

func (jc *JobController) DeletePodGroup(job metav1.Object) error {
func (jc *JobController) DeletePodGroup(object runtime.Object) error {
kubeBatchClientInterface := jc.KubeBatchClientSet

accessor, err := meta.Accessor(object)
if err != nil {
return fmt.Errorf("object does not have ObjectMeta, %v", err)
}

//check whether podGroup exists or not
_, err := kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(job.GetNamespace()).Get(job.GetName(), metav1.GetOptions{})
if err != nil && k8serrors.IsNotFound(err) {
return nil
_, err = kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(accessor.GetNamespace()).Get(accessor.GetName(), metav1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}

log.Infof("Deleting PodGroup %s", job.GetName())
log.Infof("Deleting PodGroup %s", accessor.GetName())

//delete podGroup
err = kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(job.GetNamespace()).Delete(job.GetName(), &metav1.DeleteOptions{})
err = kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(accessor.GetNamespace()).Delete(accessor.GetName(), &metav1.DeleteOptions{})
if err != nil {
jc.Recorder.Eventf(object, v1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err)
return fmt.Errorf("unable to delete PodGroup: %v", err)
} else {
jc.Recorder.Eventf(object, v1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", accessor.GetName())
}
return nil
}
Expand Down
21 changes: 8 additions & 13 deletions pkg/controller.v1/tensorflow/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,14 +306,6 @@ func (tc *TFController) syncTFJob(key string) (bool, error) {
tfjob := sharedTFJob.DeepCopy()
tfjobNeedsSync := tc.satisfiedExpectations(tfjob)

if tc.Config.EnableGangScheduling {
minAvailableReplicas := getTotalReplicas(tfjob)
_, err := tc.SyncPodGroup(tfjob, minAvailableReplicas)
if err != nil {
logger.Warnf("Sync PodGroup %v: %v", tfjob.Name, err)
}
}

// Set default for the new tfjob.
scheme.Scheme.Default(tfjob)

Expand Down Expand Up @@ -418,13 +410,8 @@ func (tc *TFController) reconcileTFJobs(tfjob *tfv1.TFJob) error {
}

if tc.Config.EnableGangScheduling {
tc.Recorder.Event(tfjob, v1.EventTypeNormal, "JobTerminated", "Job is terminated, deleting PodGroup")
if err := tc.DeletePodGroup(tfjob); err != nil {
tc.Recorder.Eventf(tfjob, v1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err)
return err
} else {
tc.Recorder.Eventf(tfjob, v1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", tfjob.Name)

}
}

Expand All @@ -439,6 +426,14 @@ func (tc *TFController) reconcileTFJobs(tfjob *tfv1.TFJob) error {
return tc.updateStatusHandler(tfjob)
}

if tc.Config.EnableGangScheduling {
minAvailableReplicas := getTotalReplicas(tfjob)
_, err := tc.SyncPodGroup(tfjob, minAvailableReplicas)
if err != nil {
logger.Warnf("Sync PodGroup %v: %v", tfjob.Name, err)
}
}

// Save the current state of the replicas
replicasStatus := make(map[string]v1.PodPhase)

Expand Down
20 changes: 8 additions & 12 deletions pkg/controller.v1beta2/tensorflow/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,14 +306,6 @@ func (tc *TFController) syncTFJob(key string) (bool, error) {
tfjob := sharedTFJob.DeepCopy()
tfjobNeedsSync := tc.satisfiedExpectations(tfjob)

if tc.Config.EnableGangScheduling {
minAvailableReplicas := getTotalReplicas(tfjob)
_, err := tc.SyncPodGroup(tfjob, minAvailableReplicas)
if err != nil {
logger.Warnf("Sync PodGroup %v: %v", tfjob.Name, err)
}
}

// Set default for the new tfjob.
scheme.Scheme.Default(tfjob)

Expand Down Expand Up @@ -418,12 +410,8 @@ func (tc *TFController) reconcileTFJobs(tfjob *tfv1beta2.TFJob) error {
}

if tc.Config.EnableGangScheduling {
tc.Recorder.Event(tfjob, v1.EventTypeNormal, "JobTerminated", "Job is terminated, deleting PodGroup")
if err := tc.DeletePodGroup(tfjob); err != nil {
tc.Recorder.Eventf(tfjob, v1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err)
return err
} else {
tc.Recorder.Eventf(tfjob, v1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", tfjob.Name)
}
}

Expand All @@ -438,6 +426,14 @@ func (tc *TFController) reconcileTFJobs(tfjob *tfv1beta2.TFJob) error {
return tc.updateStatusHandler(tfjob)
}

if tc.Config.EnableGangScheduling {
minAvailableReplicas := getTotalReplicas(tfjob)
_, err := tc.SyncPodGroup(tfjob, minAvailableReplicas)
if err != nil {
logger.Warnf("Sync PodGroup %v: %v", tfjob.Name, err)
}
}

// Save the current state of the replicas
replicasStatus := make(map[string]v1.PodPhase)

Expand Down

0 comments on commit 36dd05a

Please sign in to comment.