Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

Commit

Permalink
Merge pull request #30 from jian-he/condition
Browse files Browse the repository at this point in the history
Make UpdateJobConditions public to be used by custom operators
  • Loading branch information
richardsliu authored Apr 30, 2019
2 parents ba975e8 + b615b1c commit d8fb8ca
Show file tree
Hide file tree
Showing 9 changed files with 268 additions and 243 deletions.
8 changes: 4 additions & 4 deletions job_controller/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (jc *JobController) ReconcileJobs(
}

// If the Job is terminated, delete all pods and services.
if isSucceeded(jobStatus) || isFailed(jobStatus) || jobExceedsLimit {
if commonutil.IsSucceeded(jobStatus) || commonutil.IsFailed(jobStatus) || jobExceedsLimit {
if err := jc.deletePodsAndServices(runPolicy, job, pods); err != nil {
return err
}
Expand All @@ -168,12 +168,12 @@ func (jc *JobController) ReconcileJobs(
}

if jobExceedsLimit {
jc.Recorder.Event(runtimeObject, v1.EventTypeNormal, JobFailedReason, failureMessage)
jc.Recorder.Event(runtimeObject, v1.EventTypeNormal, commonutil.JobFailedReason, failureMessage)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := updateJobConditions(&jobStatus, common.JobFailed, JobFailedReason, failureMessage)
err := commonutil.UpdateJobConditions(&jobStatus, common.JobFailed, commonutil.JobFailedReason, failureMessage)
if err != nil {
log.Infof("Append job condition error: %v", err)
return err
Expand All @@ -183,7 +183,7 @@ func (jc *JobController) ReconcileJobs(
// At this point the pods may have been deleted.
// 1) If the job succeeded, we manually set the replica status.
// 2) If any replicas are still active, set their status to succeeded.
if isSucceeded(jobStatus) {
if commonutil.IsSucceeded(jobStatus) {
for rtype := range jobStatus.ReplicaStatuses {
jobStatus.ReplicaStatuses[rtype].Succeeded += jobStatus.ReplicaStatuses[rtype].Active
jobStatus.ReplicaStatuses[rtype].Active = 0
Expand Down
7 changes: 4 additions & 3 deletions job_controller/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"

commonv1 "github.com/kubeflow/common/operator/v1"
"github.com/kubeflow/common/util"
"github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -220,8 +221,8 @@ func (jc *JobController) GenOwnerReference(obj metav1.Object) *metav1.OwnerRefer
}

func (jc *JobController) GenLabels(jobName string) map[string]string {
labelGroupName := LabelGroupName
labelJobName := LabelJobName
labelGroupName := util.LabelGroupName
labelJobName := util.LabelJobName
groupName := jc.Controller.GetGroupNameLabelValue()
return map[string]string{
labelGroupName: groupName,
Expand Down Expand Up @@ -256,7 +257,7 @@ func (jc *JobController) SyncPodGroup(job metav1.Object, minAvailableReplicas in

// SyncPdb will create a PDB for gang scheduling by kube-batch.
func (jc *JobController) SyncPdb(job metav1.Object, minAvailableReplicas int32) (*v1beta1.PodDisruptionBudget, error) {
labelJobName := LabelJobName
labelJobName := util.LabelJobName

// Check the pdb exist or not
pdb, err := jc.KubeClientSet.PolicyV1beta1().PodDisruptionBudgets(job.GetNamespace()).Get(job.GetName(), metav1.GetOptions{})
Expand Down
22 changes: 11 additions & 11 deletions job_controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (jc *JobController) AddPod(obj interface{}) {
logger := commonutil.LoggerForPod(pod, jc.Controller.GetAPIGroupVersionKind().Kind)

if job == nil {
if pod.Labels[LabelGroupName] == jc.Controller.GetGroupNameLabelValue() {
if pod.Labels[commonutil.LabelGroupName] == jc.Controller.GetGroupNameLabelValue() {
logger.Info("This pod's job does not exist")
}
return
Expand All @@ -77,12 +77,12 @@ func (jc *JobController) AddPod(obj interface{}) {
return
}

if _, ok := pod.Labels[ReplicaTypeLabel]; !ok {
if _, ok := pod.Labels[commonutil.ReplicaTypeLabel]; !ok {
logger.Infof("This pod maybe not created by %v", jc.Controller.ControllerName())
return
}

rtype := pod.Labels[ReplicaTypeLabel]
rtype := pod.Labels[commonutil.ReplicaTypeLabel]
expectationPodsKey := GenExpectationPodsKey(jobKey, rtype)

jc.Expectations.CreationObserved(expectationPodsKey)
Expand Down Expand Up @@ -178,12 +178,12 @@ func (jc *JobController) DeletePod(obj interface{}) {
return
}

if _, ok := pod.Labels[ReplicaTypeLabel]; !ok {
if _, ok := pod.Labels[commonutil.ReplicaTypeLabel]; !ok {
logger.Infof("This pod maybe not created by %v", jc.Controller.ControllerName())
return
}

rtype := pod.Labels[ReplicaTypeLabel]
rtype := pod.Labels[commonutil.ReplicaTypeLabel]
expectationPodsKey := GenExpectationPodsKey(jobKey, rtype)

jc.Expectations.DeletionObserved(expectationPodsKey)
Expand Down Expand Up @@ -235,7 +235,7 @@ func (jc *JobController) FilterPodsForReplicaType(pods []*v1.Pod, replicaType st
MatchLabels: make(map[string]string),
}

replicaSelector.MatchLabels[ReplicaTypeLabel] = replicaType
replicaSelector.MatchLabels[commonutil.ReplicaTypeLabel] = replicaType

for _, pod := range pods {
selector, err := metav1.LabelSelectorAsSelector(replicaSelector)
Expand All @@ -254,11 +254,11 @@ func (jc *JobController) FilterPodsForReplicaType(pods []*v1.Pod, replicaType st
func (jc *JobController) GetPodSlices(pods []*v1.Pod, replicas int, logger *log.Entry) [][]*v1.Pod {
podSlices := make([][]*v1.Pod, replicas)
for _, pod := range pods {
if _, ok := pod.Labels[ReplicaIndexLabel]; !ok {
if _, ok := pod.Labels[commonutil.ReplicaIndexLabel]; !ok {
logger.Warning("The pod do not have the index label.")
continue
}
index, err := strconv.Atoi(pod.Labels[ReplicaIndexLabel])
index, err := strconv.Atoi(pod.Labels[commonutil.ReplicaIndexLabel])
if err != nil {
logger.Warningf("Error when strconv.Atoi: %v", err)
continue
Expand Down Expand Up @@ -375,11 +375,11 @@ func (jc *JobController) createNewPod(job interface{}, rt, index string, spec *c

// Set type and index for the worker.
labels := jc.GenLabels(metaObject.GetName())
labels[ReplicaTypeLabel] = rt
labels[ReplicaIndexLabel] = index
labels[commonutil.ReplicaTypeLabel] = rt
labels[commonutil.ReplicaIndexLabel] = index

if masterRole {
labels[LabelJobRole] = "master"
labels[commonutil.LabelJobRole] = "master"
}

podTemplate := spec.Template.DeepCopy()
Expand Down
11 changes: 6 additions & 5 deletions job_controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"strconv"

commonutil "github.com/kubeflow/common/util"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -46,12 +47,12 @@ func (jc *JobController) AddService(obj interface{}) {
return
}

if _, ok := service.Labels[ReplicaTypeLabel]; !ok {
if _, ok := service.Labels[commonutil.ReplicaTypeLabel]; !ok {
log.Infof("This service maybe not created by %v", jc.Controller.ControllerName())
return
}

rtype := service.Labels[ReplicaTypeLabel]
rtype := service.Labels[commonutil.ReplicaTypeLabel]
expectationServicesKey := GenExpectationServicesKey(jobKey, rtype)

jc.Expectations.CreationObserved(expectationServicesKey)
Expand Down Expand Up @@ -119,7 +120,7 @@ func (jc *JobController) FilterServicesForReplicaType(services []*v1.Service, re
MatchLabels: make(map[string]string),
}

replicaSelector.MatchLabels[ReplicaTypeLabel] = replicaType
replicaSelector.MatchLabels[commonutil.ReplicaTypeLabel] = replicaType

for _, service := range services {
selector, err := metav1.LabelSelectorAsSelector(replicaSelector)
Expand All @@ -140,11 +141,11 @@ func (jc *JobController) FilterServicesForReplicaType(services []*v1.Service, re
func (jc *JobController) GetServiceSlices(services []*v1.Service, replicas int, logger *log.Entry) [][]*v1.Service {
serviceSlices := make([][]*v1.Service, replicas)
for _, service := range services {
if _, ok := service.Labels[ReplicaIndexLabel]; !ok {
if _, ok := service.Labels[commonutil.ReplicaIndexLabel]; !ok {
logger.Warning("The service do not have the index label.")
continue
}
index, err := strconv.Atoi(service.Labels[ReplicaIndexLabel])
index, err := strconv.Atoi(service.Labels[commonutil.ReplicaIndexLabel])
if err != nil {
logger.Warningf("Error when strconv.Atoi: %v", err)
continue
Expand Down
137 changes: 10 additions & 127 deletions job_controller/status.go
Original file line number Diff line number Diff line change
@@ -1,144 +1,27 @@
package job_controller

import (
common "github.com/kubeflow/common/operator/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/kubeflow/common/operator/v1"
corev1 "k8s.io/api/core/v1"
)

const (
// JobCreatedReason is added in a job when it is created.
JobCreatedReason = "JobCreated"
// JobSucceededReason is added in a job when it is succeeded.
JobSucceededReason = "JobSucceeded"
// JobRunningReason is added in a job when it is running.
JobRunningReason = "JobRunning"
// JobFailedReason is added in a job when it is failed.
JobFailedReason = "JobFailed"
// JobRestarting is added in a job when it is restarting.
JobRestartingReason = "JobRestarting"

// labels for pods and servers.
ReplicaTypeLabel = "replica-type"
ReplicaIndexLabel = "replica-index"
LabelGroupName = "group-name"
LabelJobName = "job-name"
LabelJobRole = "job-role"
)

func isSucceeded(status common.JobStatus) bool {
return hasCondition(status, common.JobSucceeded)
}

func isFailed(status common.JobStatus) bool {
return hasCondition(status, common.JobFailed)
}

func hasCondition(status common.JobStatus, condType common.JobConditionType) bool {
for _, condition := range status.Conditions {
if condition.Type == condType && condition.Status == v1.ConditionTrue {
return true
}
}
return false
}

func updateJobConditions(jobStatus *common.JobStatus, conditionType common.JobConditionType, reason, message string) error {
condition := newCondition(conditionType, reason, message)
setCondition(jobStatus, condition)
return nil
}

// initializeReplicaStatuses initializes the ReplicaStatuses for replica.
func initializeReplicaStatuses(jobStatus *common.JobStatus, rtype common.ReplicaType) {
func initializeReplicaStatuses(jobStatus *v1.JobStatus, rtype v1.ReplicaType) {
if jobStatus.ReplicaStatuses == nil {
jobStatus.ReplicaStatuses = make(map[common.ReplicaType]*common.ReplicaStatus)
jobStatus.ReplicaStatuses = make(map[v1.ReplicaType]*v1.ReplicaStatus)
}

jobStatus.ReplicaStatuses[rtype] = &common.ReplicaStatus{}
jobStatus.ReplicaStatuses[rtype] = &v1.ReplicaStatus{}
}

// updateJobReplicaStatuses updates the JobReplicaStatuses according to the pod.
func updateJobReplicaStatuses(jobStatus *common.JobStatus, rtype common.ReplicaType, pod *v1.Pod) {
func updateJobReplicaStatuses(jobStatus *v1.JobStatus, rtype v1.ReplicaType, pod *corev1.Pod) {
switch pod.Status.Phase {
case v1.PodRunning:
case corev1.PodRunning:
jobStatus.ReplicaStatuses[rtype].Active++
case v1.PodSucceeded:
case corev1.PodSucceeded:
jobStatus.ReplicaStatuses[rtype].Succeeded++
case v1.PodFailed:
case corev1.PodFailed:
jobStatus.ReplicaStatuses[rtype].Failed++
}
}

// newCondition creates a new job condition.
func newCondition(conditionType common.JobConditionType, reason, message string) common.JobCondition {
return common.JobCondition{
Type: conditionType,
Status: v1.ConditionTrue,
LastUpdateTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Reason: reason,
Message: message,
}
}

// getCondition returns the condition with the provided type.
func getCondition(status common.JobStatus, condType common.JobConditionType) *common.JobCondition {
for _, condition := range status.Conditions {
if condition.Type == condType {
return &condition
}
}
return nil
}

// setCondition updates the job to include the provided condition.
// If the condition that we are about to add already exists
// and has the same status and reason then we are not going to update.
func setCondition(status *common.JobStatus, condition common.JobCondition) {
// Do nothing if JobStatus have failed condition
if isFailed(*status) {
return
}

currentCond := getCondition(*status, condition.Type)

// Do nothing if condition doesn't change
if currentCond != nil && currentCond.Status == condition.Status && currentCond.Reason == condition.Reason {
return
}

// Do not update lastTransitionTime if the status of the condition doesn't change.
if currentCond != nil && currentCond.Status == condition.Status {
condition.LastTransitionTime = currentCond.LastTransitionTime
}

// Append the updated condition to the conditions
newConditions := filterOutCondition(status.Conditions, condition.Type)
status.Conditions = append(newConditions, condition)
}

// filterOutCondition returns a new slice of job conditions without conditions with the provided type.
func filterOutCondition(conditions []common.JobCondition, condType common.JobConditionType) []common.JobCondition {
var newConditions []common.JobCondition
for _, c := range conditions {
if condType == common.JobRestarting && c.Type == common.JobRunning {
continue
}
if condType == common.JobRunning && c.Type == common.JobRestarting {
continue
}

if c.Type == condType {
continue
}

// Set the running condition status to be false when current condition failed or succeeded
if (condType == common.JobFailed || condType == common.JobSucceeded) && c.Type == common.JobRunning {
c.Status = v1.ConditionFalse
}

newConditions = append(newConditions, c)
}
return newConditions
}
}
Loading

0 comments on commit d8fb8ca

Please sign in to comment.