From d06f8f02e0f59c22bbefb25f9b6cc3ca6bb399e3 Mon Sep 17 00:00:00 2001 From: Daniel Vega-Myhre Date: Tue, 2 Apr 2024 23:02:50 +0000 Subject: [PATCH 1/6] refactor controller to perform 1 status update per reconciliation attempt --- pkg/constants/constants.go | 11 + pkg/controllers/jobset_controller.go | 452 ++++++++++-------- pkg/controllers/jobset_controller_test.go | 83 ++-- pkg/controllers/startup_policy.go | 28 +- pkg/util/testing/wrappers.go | 6 + .../controller/jobset_controller_test.go | 5 + test/util/util.go | 4 +- 7 files changed, 348 insertions(+), 241 deletions(-) diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index a107ff2b..a2291565 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -58,4 +58,15 @@ const ( InOrderStartupPolicyReason = "StartupPolicyInOrder" InOrderStartupPolicyExecutingMessage = "in order startup policy is executing" InOrderStartupPolicyCompletedMessage = "in order startup policy has completed" + + // Event reason and messages related to JobSet restarts. + JobSetRestartReason = "Restarting" + + // Event reason and messages related to suspending a JobSet. + JobSetSuspendedReason = "SuspendedJobs" + JobSetSuspendedMessage = "jobset is suspended" + + // Event reason and message related to resuming a JobSet. + JobSetResumedReason = "ResumeJobs" + JobSetResumedMessage = "jobset is resumed" ) diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index 9d0b2432..f9530eba 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -19,6 +19,7 @@ import ( "encoding/hex" "errors" "fmt" + "sort" "strconv" "sync" @@ -64,6 +65,21 @@ type childJobs struct { delete []*batchv1.Job } +// statusUpdateOpts tracks if a JobSet status update should be performed at the end of the reconciliation +// attempt, as well as events that should be conditionally emitted if the status update succeeds. +type statusUpdateOpts struct { + shouldUpdate bool + events []*eventParams +} + +// eventParams contains parameters used for emitting a Kubernetes event. +type eventParams struct { + object runtime.Object + eventType string + eventReason string + eventMessage string +} + func NewJobSetReconciler(client client.Client, scheme *runtime.Scheme, record record.EventRecorder) *JobSetReconciler { return &JobSetReconciler{Client: client, Scheme: scheme, Record: record, clock: clock.RealClock{}} } @@ -86,7 +102,22 @@ func (r *JobSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return ctrl.Result{}, client.IgnoreNotFound(err) } - log := ctrl.LoggerFrom(ctx).WithValues("jobset", klog.KObj(&js)) + // Track JobSet status updates that should be performed at the end of the reconciliation attempt. + updateStatusOpts := statusUpdateOpts{} + + // Reconcile the JobSet. + result, err := r.reconcile(ctx, &js, &updateStatusOpts) + if err != nil { + return result, err + } + + // At the end of this Reconcile attempt, do one API call to persist all the JobSet status changes. + return ctrl.Result{}, r.updateJobSetStatus(ctx, &js, &updateStatusOpts) +} + +// reconcile is the internal method containing the core JobSet reconciliation logic. +func (r *JobSetReconciler) reconcile(ctx context.Context, js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) (ctrl.Result, error) { + log := ctrl.LoggerFrom(ctx).WithValues("jobset", klog.KObj(js)) ctx = ctrl.LoggerInto(ctx, log) // Check the controller configured for the JobSet. @@ -100,22 +131,22 @@ func (r *JobSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr log.V(2).Info("Reconciling JobSet") // Get Jobs owned by JobSet. - ownedJobs, err := r.getChildJobs(ctx, &js) + ownedJobs, err := r.getChildJobs(ctx, js) if err != nil { log.Error(err, "getting jobs owned by jobset") return ctrl.Result{}, err } - // Calculate JobsReady and update statuses for each ReplicatedJob - status := r.calculateReplicatedJobStatuses(ctx, &js, ownedJobs) - if err := r.updateReplicatedJobsStatuses(ctx, &js, ownedJobs, status); err != nil { - log.Error(err, "updating replicated jobs statuses") - return ctrl.Result{}, err + // Calculate JobsReady and update statuses for each ReplicatedJob. + // If status ReplicatedJobsStatus has changed, update the JobSet status. + rjobStatuses := r.calculateReplicatedJobStatuses(ctx, js, ownedJobs) + if !replicatedJobStatusesEqual(js.Status.ReplicatedJobsStatus, rjobStatuses) { + updateReplicatedJobsStatuses(ctx, js, rjobStatuses, updateStatusOpts) } // If JobSet is already completed or failed, clean up active child jobs and requeue if TTLSecondsAfterFinished is set. - if jobSetFinished(&js) { - requeueAfter, err := executeTTLAfterFinishedPolicy(ctx, r.Client, r.clock, &js) + if jobSetFinished(js) { + requeueAfter, err := executeTTLAfterFinishedPolicy(ctx, r.Client, r.clock, js) if err != nil { log.Error(err, "executing ttl after finished policy") return ctrl.Result{}, err @@ -138,52 +169,44 @@ func (r *JobSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr // If any jobs have failed, execute the JobSet failure policy (if any). if len(ownedJobs.failed) > 0 { - if err := r.executeFailurePolicy(ctx, &js, ownedJobs); err != nil { - log.Error(err, "executing failure policy") - return ctrl.Result{}, err - } + executeFailurePolicy(ctx, js, ownedJobs, updateStatusOpts) return ctrl.Result{}, nil } // If any jobs have succeeded, execute the JobSet success policy. if len(ownedJobs.successful) > 0 { - completed, err := r.executeSuccessPolicy(ctx, &js, ownedJobs) - if err != nil { - log.Error(err, "executing success policy") - return ctrl.Result{}, err - } + completed := executeSuccessPolicy(ctx, js, ownedJobs, updateStatusOpts) if completed { return ctrl.Result{}, nil } } // If pod DNS hostnames are enabled, create a headless service for the JobSet - if err := r.createHeadlessSvcIfNecessary(ctx, &js); err != nil { + if err := r.createHeadlessSvcIfNecessary(ctx, js); err != nil { log.Error(err, "creating headless service") return ctrl.Result{}, err } // If job has not failed or succeeded, continue creating any // jobs that are ready to be started. - if err := r.createJobs(ctx, &js, ownedJobs, status); err != nil { + if err := r.createJobs(ctx, js, ownedJobs, rjobStatuses, updateStatusOpts); err != nil { log.Error(err, "creating jobs") return ctrl.Result{}, err } // Handle suspending a jobset or resuming a suspended jobset. - jobsetSuspended := jobSetSuspended(&js) + jobsetSuspended := jobSetSuspended(js) if jobsetSuspended { - if err := r.suspendJobs(ctx, &js, ownedJobs.active); err != nil { + if err := r.suspendJobs(ctx, js, ownedJobs.active, updateStatusOpts); err != nil { log.Error(err, "suspending jobset") return ctrl.Result{}, err } } else { - if err := r.resumeJobsIfNecessary(ctx, &js, ownedJobs.active, status); err != nil { + if err := r.resumeJobsIfNecessary(ctx, js, ownedJobs.active, rjobStatuses, updateStatusOpts); err != nil { log.Error(err, "resuming jobset") return ctrl.Result{}, err } } - return ctrl.Result{}, nil } @@ -211,6 +234,25 @@ func SetupJobSetIndexes(ctx context.Context, indexer client.FieldIndexer) error }) } +// updateJobSetStatus will update the JobSet status if updateStatusOpts requires it, +// and conditionally emit events in updateStatusOpts if the status update call succeeds. +func (r *JobSetReconciler) updateJobSetStatus(ctx context.Context, js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) error { + log := ctrl.LoggerFrom(ctx) + + if updateStatusOpts.shouldUpdate { + // Make single API call to persist the JobSet status update. + if err := r.Status().Update(ctx, js); err != nil { + log.Error(err, "updating jobset status") + return err + } + // If the status update was successful (or if we had no status updates), emit any enqueued events. + for _, event := range updateStatusOpts.events { + r.Record.Eventf(event.object, event.eventType, event.eventReason, event.eventMessage) + } + } + return nil +} + // getChildJobs gets jobs owned by the JobSet then categorizes them by status (active, successful, failed). // Another list (`delete`) is also added which tracks jobs marked for deletion. func (r *JobSetReconciler) getChildJobs(ctx context.Context, js *jobset.JobSet) (*childJobs, error) { @@ -253,15 +295,15 @@ func (r *JobSetReconciler) getChildJobs(ctx context.Context, js *jobset.JobSet) return &ownedJobs, nil } -func (r *JobSetReconciler) updateReplicatedJobsStatuses(ctx context.Context, js *jobset.JobSet, jobs *childJobs, status []jobset.ReplicatedJobStatus) error { - // Check if status ReplicatedJobsStatus has changed - if apiequality.Semantic.DeepEqual(js.Status.ReplicatedJobsStatus, status) { - return nil - } - js.Status.ReplicatedJobsStatus = status - return r.Status().Update(ctx, js) +// updateReplicatedJobsStatuses updates the replicatedJob statuses if they have changed. +func updateReplicatedJobsStatuses(ctx context.Context, js *jobset.JobSet, statuses []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) { + // Add a new status update to perform at the end of the reconciliation attempt. + js.Status.ReplicatedJobsStatus = statuses + updateStatusOpts.shouldUpdate = true } +// calculateReplicatedJobStatuses uses the JobSet's child jobs to update the statuses +// of each of its replicatedJobs. func (r *JobSetReconciler) calculateReplicatedJobStatuses(ctx context.Context, js *jobset.JobSet, jobs *childJobs) []jobset.ReplicatedJobStatus { log := ctrl.LoggerFrom(ctx) @@ -324,7 +366,7 @@ func (r *JobSetReconciler) calculateReplicatedJobStatuses(ctx context.Context, j return rjStatus } -func (r *JobSetReconciler) suspendJobs(ctx context.Context, js *jobset.JobSet, activeJobs []*batchv1.Job) error { +func (r *JobSetReconciler) suspendJobs(ctx context.Context, js *jobset.JobSet, activeJobs []*batchv1.Job, updateStatusOpts *statusUpdateOpts) error { for _, job := range activeJobs { if !jobSuspended(job) { job.Spec.Suspend = ptr.To(true) @@ -333,20 +375,11 @@ func (r *JobSetReconciler) suspendJobs(ctx context.Context, js *jobset.JobSet, a } } } - return r.ensureCondition(ctx, ensureConditionOpts{ - jobset: js, - eventType: corev1.EventTypeNormal, - condition: metav1.Condition{ - Type: string(jobset.JobSetSuspended), - Status: metav1.ConditionStatus(corev1.ConditionTrue), - LastTransitionTime: metav1.Now(), - Reason: "SuspendedJobs", - Message: "jobset is suspended", - }, - }) + setJobSetSuspended(js, updateStatusOpts) + return nil } -func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset.JobSet, activeJobs []*batchv1.Job, replicatedJobStatuses []jobset.ReplicatedJobStatus) error { +func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset.JobSet, activeJobs []*batchv1.Job, replicatedJobStatuses []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) error { // Store node selector for each replicatedJob template. nodeAffinities := map[string]map[string]string{} for _, replicatedJob := range js.Spec.ReplicatedJobs { @@ -361,6 +394,7 @@ func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset } startupPolicy := js.Spec.StartupPolicy + numJobsResumed := 0 // If JobSpec is unsuspended, ensure all active child Jobs are also // unsuspended and update the suspend condition to true. for _, replicatedJob := range js.Spec.ReplicatedJobs { @@ -371,54 +405,37 @@ func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset } jobsFromRJob := replicatedJobToActiveJobs[replicatedJob.Name] for _, job := range jobsFromRJob { + if !jobSuspended(job) { + return nil + } if err := r.resumeJob(ctx, job, nodeAffinities); err != nil { return err } + numJobsResumed += 1 } // If using in order startup policy, return early and wait for the replicated job to be ready. - if inOrderStartupPolicy(startupPolicy) { - // Add a condition to the JobSet indicating the in order startup policy is executing. - return r.ensureCondition(ctx, ensureConditionOpts{ - jobset: js, - eventType: corev1.EventTypeNormal, - forceFalseUpdate: true, - condition: inOrderStartupPolicyExecutingCondition(), - }) + if numJobsResumed > 0 && inOrderStartupPolicy(startupPolicy) { + setInOrderStartupPolicyInProgress(js, updateStatusOpts) + return nil } } // At this point all replicated jobs have had their jobs resumed. // If using an in order startup policy. add a condition to the JobSet indicating the // in order startup policy has completed. if inOrderStartupPolicy(startupPolicy) { - if err := r.ensureCondition(ctx, ensureConditionOpts{ - jobset: js, - eventType: corev1.EventTypeNormal, - condition: inOrderStartupPolicyCompletedCondition(), - }); err != nil { - return err - } + setInOrderStartupPolicyCompleted(js, updateStatusOpts) } // Finally, set the suspended condition on the JobSet to false to indicate // the JobSet is no longer suspended. - return r.ensureCondition(ctx, ensureConditionOpts{ - jobset: js, - eventType: corev1.EventTypeNormal, - condition: metav1.Condition{ - Type: string(jobset.JobSetSuspended), - Status: metav1.ConditionStatus(corev1.ConditionFalse), - LastTransitionTime: metav1.Now(), - Reason: "ResumeJobs", - Message: "jobset is resumed", - }, - }) + if numJobsResumed > 0 { + setJobSetResumed(js, updateStatusOpts) + } + return nil } func (r *JobSetReconciler) resumeJob(ctx context.Context, job *batchv1.Job, nodeAffinities map[string]map[string]string) error { log := ctrl.LoggerFrom(ctx) - if !jobSuspended(job) { - return nil - } // Kubernetes validates that a job template is immutable // so if the job has started i.e., startTime != nil), we must set it to nil first. if job.Status.StartTime != nil { @@ -438,7 +455,7 @@ func (r *JobSetReconciler) resumeJob(ctx context.Context, job *batchv1.Job, node return r.Update(ctx, job) } -func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs, replicatedJobStatus []jobset.ReplicatedJobStatus) error { +func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs, replicatedJobStatus []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) error { log := ctrl.LoggerFrom(ctx) startupPolicy := js.Spec.StartupPolicy @@ -484,30 +501,20 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow // This updates the StartupPolicy condition and notifies that we are waiting // for this replicated job to finish. if !jobSetSuspended(js) && inOrderStartupPolicy(startupPolicy) { - return r.ensureCondition(ctx, ensureConditionOpts{ - jobset: js, - eventType: corev1.EventTypeNormal, - forceFalseUpdate: true, - condition: inOrderStartupPolicyExecutingCondition(), - }) + setInOrderStartupPolicyInProgress(js, updateStatusOpts) + return nil } } allErrs := errors.Join(finalErrs...) if allErrs != nil { - // Emit event to propagate the Job creation failures up to be more visible to the user. - // TODO(#422): Investigate ways to validate Job templates at JobSet validation time. - r.Record.Eventf(js, corev1.EventTypeWarning, constants.JobCreationFailedReason, allErrs.Error()) return allErrs } // Skip emitting a condition for StartupPolicy if JobSet is suspended if !jobSetSuspended(js) && inOrderStartupPolicy(startupPolicy) { - return r.ensureCondition(ctx, ensureConditionOpts{ - jobset: js, - eventType: corev1.EventTypeNormal, - condition: inOrderStartupPolicyCompletedCondition(), - }) + setInOrderStartupPolicyCompleted(js, updateStatusOpts) + return nil } - return allErrs + return nil } func (r *JobSetReconciler) deleteJobs(ctx context.Context, jobsForDeletion []*batchv1.Job) error { @@ -582,127 +589,48 @@ func (r *JobSetReconciler) createHeadlessSvcIfNecessary(ctx context.Context, js // executeSuccessPolicy checks the completed jobs against the jobset success policy // and updates the jobset status to completed if the success policy conditions are met. // Returns a boolean value indicating if the jobset was completed or not. -func (r *JobSetReconciler) executeSuccessPolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs) (bool, error) { +func executeSuccessPolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs, updateStatusOpts *statusUpdateOpts) bool { if numJobsMatchingSuccessPolicy(js, ownedJobs.successful) >= numJobsExpectedToSucceed(js) { - if err := r.ensureCondition(ctx, ensureConditionOpts{ - jobset: js, - eventType: corev1.EventTypeNormal, - condition: metav1.Condition{ - Type: string(jobset.JobSetCompleted), - Status: metav1.ConditionStatus(corev1.ConditionTrue), - Reason: constants.AllJobsCompletedReason, - Message: constants.AllJobsCompletedMessage, - }, - }); err != nil { - return false, err - } - return true, nil + setJobSetCompleted(js, updateStatusOpts) + return true } - return false, nil + return false } -func (r *JobSetReconciler) executeFailurePolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs) error { +func executeFailurePolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs, updateStatusOpts *statusUpdateOpts) { // If no failure policy is defined, mark the JobSet as failed. if js.Spec.FailurePolicy == nil { firstFailedJob := findFirstFailedJob(ownedJobs.failed) - return r.failJobSet(ctx, js, constants.FailedJobsReason, messageWithFirstFailedJob(constants.FailedJobsMessage, firstFailedJob.Name)) + setJobSetFailed(ctx, js, constants.FailedJobsReason, messageWithFirstFailedJob(constants.FailedJobsMessage, firstFailedJob.Name), updateStatusOpts) + return } // If JobSet has reached max restarts, fail the JobSet. if js.Status.Restarts >= js.Spec.FailurePolicy.MaxRestarts { firstFailedJob := findFirstFailedJob(ownedJobs.failed) - return r.failJobSet(ctx, js, constants.ReachedMaxRestartsReason, messageWithFirstFailedJob(constants.ReachedMaxRestartsMessage, firstFailedJob.Name)) + setJobSetFailed(ctx, js, constants.ReachedMaxRestartsReason, messageWithFirstFailedJob(constants.ReachedMaxRestartsMessage, firstFailedJob.Name), updateStatusOpts) + return } // To reach this point a job must have failed. - return r.failurePolicyRecreateAll(ctx, js, ownedJobs) + failurePolicyRecreateAll(ctx, js, updateStatusOpts) } -func (r *JobSetReconciler) failurePolicyRecreateAll(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs) error { +func failurePolicyRecreateAll(ctx context.Context, js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { log := ctrl.LoggerFrom(ctx) // Increment JobSet restarts. This will trigger reconciliation and result in deletions // of old jobs not part of the current jobSet run. js.Status.Restarts += 1 - if err := r.updateStatus(ctx, js, corev1.EventTypeWarning, "Restarting", fmt.Sprintf("restarting jobset, attempt %d", js.Status.Restarts)); err != nil { - return err - } - log.V(2).Info("attempting restart", "restart attempt", js.Status.Restarts) - return nil -} + updateStatusOpts.shouldUpdate = true -func (r *JobSetReconciler) failJobSet(ctx context.Context, js *jobset.JobSet, reason, msg string) error { - return r.ensureCondition(ctx, ensureConditionOpts{ - jobset: js, - condition: metav1.Condition{ - Type: string(jobset.JobSetFailed), - Status: metav1.ConditionStatus(corev1.ConditionTrue), - Reason: reason, - Message: msg, - }, - eventType: corev1.EventTypeWarning, + // Emit event for each JobSet restarts for observability and debugability. + enqueueEvent(updateStatusOpts, &eventParams{ + object: js, + eventType: corev1.EventTypeWarning, + eventReason: fmt.Sprintf("restarting jobset, attempt %d", js.Status.Restarts), }) -} - -// updateStatus updates the status of a JobSet. -func (r *JobSetReconciler) updateStatus(ctx context.Context, js *jobset.JobSet, eventType, eventReason, eventMsg string) error { - if err := r.Status().Update(ctx, js); err != nil { - return err - } - r.Record.Eventf(js, eventType, eventReason, eventMsg) - return nil -} - -// function parameters for ensureCondition -type ensureConditionOpts struct { - jobset *jobset.JobSet - // specify the type of event - eventType string - // do we add a false condition - forceFalseUpdate bool - condition metav1.Condition -} - -func (r *JobSetReconciler) ensureCondition(ctx context.Context, opts ensureConditionOpts) error { - if !updateCondition(opts.jobset, opts.condition, opts.forceFalseUpdate) { - return nil - } - if err := r.Status().Update(ctx, opts.jobset); err != nil { - return err - } - - r.Record.Eventf(opts.jobset, opts.eventType, opts.condition.Reason, opts.condition.Message) - return nil -} - -func updateCondition(js *jobset.JobSet, condition metav1.Condition, forceFalseUpdate bool) bool { - condition.LastTransitionTime = metav1.Now() - for i, val := range js.Status.Conditions { - if condition.Type == val.Type && condition.Status != val.Status { - js.Status.Conditions[i] = condition - // Condition found but different status so we should update - return true - } else if condition.Type == val.Type && condition.Status == val.Status && condition.Reason == val.Reason && condition.Message == val.Message { - // Duplicate condition so no update - return false - } - } - if forceFalseUpdate { - // Some conditions need an update even if false - // StartupPolicy is one example - // If startup policy is not specified, then - // we assume that there was no startup policy ever applied - // We use the false condition to signify progress of StartupPolicy. - js.Status.Conditions = append(js.Status.Conditions, condition) - return true - } - - // condition doesn't exist, update only if the status is true - if condition.Status == metav1.ConditionTrue { - js.Status.Conditions = append(js.Status.Conditions, condition) - return true - } - return false + log.V(2).Info("attempting restart", "restart attempt", js.Status.Restarts) } func constructJobsFromTemplate(js *jobset.JobSet, rjob *jobset.ReplicatedJob, ownedJobs *childJobs) ([]*batchv1.Job, error) { @@ -951,9 +879,159 @@ func findJobFailureTime(job *batchv1.Job) *metav1.Time { return nil } -func managedByExternalController(js jobset.JobSet) *string { +// managedByExternalController returns a pointer to the name of the external controller managing +// the JobSet, if one exists. Otherwise, it returns nil. +func managedByExternalController(js *jobset.JobSet) *string { if controllerName := js.Spec.ManagedBy; controllerName != nil && *controllerName != jobset.JobSetControllerName { return controllerName } return nil } + +// enqueueEvent appends a new k8s event to be emitted if and only after running the status +// update functions in the updateStatusOpts, the status update API call suceeds. +func enqueueEvent(updateStatusOpts *statusUpdateOpts, event *eventParams) { + updateStatusOpts.events = append(updateStatusOpts.events, event) +} + +// function parameters for setCondition +type conditionOpts struct { + eventType string + condition *metav1.Condition +} + +// setCondition will add a new condition to the JobSet status (or update an existing one), +// and enqueue an event for emission if the status update succeeds at the end of the reconcile. +func setCondition(js *jobset.JobSet, condOpts *conditionOpts, updateStatusOpts *statusUpdateOpts) { + // Return early if this condition is already set. + if !updateCondition(js, condOpts) { + return + } + + // If we made some changes to the status conditions, configure updateStatusOpts + // to persist the status update at the end of the reconciliation attempt. + updateStatusOpts.shouldUpdate = true + + // Conditionally emit an event for each JobSet condition update if and only if + // the status update call is successful. + enqueueEvent(updateStatusOpts, &eventParams{ + object: js, + eventType: condOpts.eventType, + eventReason: condOpts.condition.Reason, + eventMessage: condOpts.condition.Message, + }) +} + +// updateCondition accepts a given condition and does one of the following: +// 1. If an identical condition already exists, do nothing and return false (indicating +// no change was made). +// 2. If a condition of the same type exists but with a different status, update +// the condition in place and return true (indicating a condition change was made). +func updateCondition(js *jobset.JobSet, opts *conditionOpts) bool { + if opts == nil || opts.condition == nil { + return false + } + + condition := *opts.condition + condition.LastTransitionTime = metav1.Now() + for i, val := range js.Status.Conditions { + if condition.Type == val.Type && condition.Status != val.Status { + js.Status.Conditions[i] = condition + // Condition found but different status so we should update. + return true + } else if condition.Type == val.Type && condition.Status == val.Status && condition.Reason == val.Reason && condition.Message == val.Message { + // Duplicate condition so no update. + return false + } + } + + // Condition doesn't exist, add it. + js.Status.Conditions = append(js.Status.Conditions, condition) + return true +} + +// setJobSetCompleted sets a condition on the JobSet status indicating it has completed. +func setJobSetCompleted(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { + setCondition(js, completedConditionsOpts, updateStatusOpts) +} + +// setJobSetFailed sets a condition on the JobSet status indicating it has failed. +func setJobSetFailed(ctx context.Context, js *jobset.JobSet, reason, msg string, updateStatusOpts *statusUpdateOpts) { + setCondition(js, makeFailedConditionOpts(reason, msg), updateStatusOpts) +} + +// setJobSetSuspended sets a condition on the JobSet status indicating it is currently suspended. +func setJobSetSuspended(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { + setCondition(js, makeSuspendedConditionOpts(metav1.Now()), updateStatusOpts) +} + +// setJobSetResumed sets a condition on the JobSet status indicating it has been resumed. +// This updates the "suspended" condition type from "true" to "false." +func setJobSetResumed(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { + setCondition(js, makeResumedConditionOpts(metav1.Now()), updateStatusOpts) +} + +// completedConditionsOpts contains the options we use to generate the JobSet completed condition. +var completedConditionsOpts = &conditionOpts{ + eventType: corev1.EventTypeNormal, + condition: &metav1.Condition{ + Type: string(jobset.JobSetCompleted), + Status: metav1.ConditionStatus(corev1.ConditionTrue), + Reason: constants.AllJobsCompletedReason, + Message: constants.AllJobsCompletedMessage, + }, +} + +// makeFailedConditionOpts returns the options we use to generate the JobSet failed condition. +func makeFailedConditionOpts(reason, msg string) *conditionOpts { + return &conditionOpts{ + condition: &metav1.Condition{ + Type: string(jobset.JobSetFailed), + Status: metav1.ConditionStatus(corev1.ConditionTrue), + Reason: reason, + Message: msg, + }, + eventType: corev1.EventTypeWarning, + } +} + +// makeSuspendedConditionOpts returns the options we use to generate the JobSet suspended condition. +func makeSuspendedConditionOpts(now metav1.Time) *conditionOpts { + return &conditionOpts{ + eventType: corev1.EventTypeNormal, + condition: &metav1.Condition{ + Type: string(jobset.JobSetSuspended), + Status: metav1.ConditionStatus(corev1.ConditionTrue), + LastTransitionTime: now, + Reason: constants.JobSetSuspendedReason, + Message: constants.JobSetSuspendedMessage, + }, + } +} + +// makeResumedConditionOpts returns the options we use to generate the JobSet resumed condition. +func makeResumedConditionOpts(now metav1.Time) *conditionOpts { + return &conditionOpts{ + eventType: corev1.EventTypeNormal, + condition: &metav1.Condition{ + Type: string(jobset.JobSetSuspended), + Status: metav1.ConditionStatus(corev1.ConditionFalse), + LastTransitionTime: now, + Reason: constants.JobSetResumedReason, + Message: constants.JobSetResumedMessage, + }, + } +} + +// replicatedJobStatusesEqual compares two slices of replicatedJob statuses, and returns +// a boolean value indicating if they are equal. This is a semantic equality check, not +// a memory equality check. +func replicatedJobStatusesEqual(oldStatuses, newStatuses []jobset.ReplicatedJobStatus) bool { + sort.Slice(oldStatuses, func(i, j int) bool { + return oldStatuses[i].Name > oldStatuses[j].Name + }) + sort.Slice(newStatuses, func(i, j int) bool { + return newStatuses[i].Name > newStatuses[j].Name + }) + return apiequality.Semantic.DeepEqual(oldStatuses, newStatuses) +} diff --git a/pkg/controllers/jobset_controller_test.go b/pkg/controllers/jobset_controller_test.go index 0a237f79..59dfd03d 100644 --- a/pkg/controllers/jobset_controller_test.go +++ b/pkg/controllers/jobset_controller_test.go @@ -661,62 +661,35 @@ func TestUpdateConditions(t *testing.T) { replicatedJobName = "replicated-job" jobName = "test-job" ns = "default" + now = metav1.Now() ) tests := []struct { name string js *jobset.JobSet - conditions []metav1.Condition - newCondition metav1.Condition - forceUpdate bool + opts *conditionOpts expectedUpdate bool }{ { - name: "no condition", + name: "no existing conditions, not adding conditions", js: testutils.MakeJobSet(jobSetName, ns). ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName). Job(testutils.MakeJobTemplate(jobName, ns).Obj()). Replicas(1). Obj()).Obj(), - newCondition: metav1.Condition{}, - conditions: []metav1.Condition{}, + opts: &conditionOpts{}, expectedUpdate: false, }, { - name: "do not update if false", + name: "no existing conditions, add a condition", js: testutils.MakeJobSet(jobSetName, ns). ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName). Job(testutils.MakeJobTemplate(jobName, ns).Obj()). Replicas(1). Obj()).Obj(), - newCondition: metav1.Condition{Status: metav1.ConditionFalse, Type: string(jobset.JobSetSuspended), Reason: "JobsResumed"}, - conditions: []metav1.Condition{}, - expectedUpdate: false, - }, - { - name: "force update if false", - js: testutils.MakeJobSet(jobSetName, ns). - ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName). - Job(testutils.MakeJobTemplate(jobName, ns).Obj()). - Replicas(1). - Obj()).Obj(), - newCondition: metav1.Condition{Status: metav1.ConditionFalse, Type: string(jobset.JobSetStartupPolicyCompleted), Reason: "StartupPolicy"}, - conditions: []metav1.Condition{}, + opts: completedConditionsOpts, expectedUpdate: true, - forceUpdate: true, }, - { - name: "update if condition is true", - js: testutils.MakeJobSet(jobSetName, ns). - ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName). - Job(testutils.MakeJobTemplate(jobName, ns).Obj()). - Replicas(1). - Obj()).Obj(), - newCondition: metav1.Condition{Status: metav1.ConditionTrue, Type: string(jobset.JobSetSuspended), Reason: "JobsResumed"}, - conditions: []metav1.Condition{}, - expectedUpdate: true, - }, - { name: "suspended", js: testutils.MakeJobSet(jobSetName, ns). @@ -724,40 +697,54 @@ func TestUpdateConditions(t *testing.T) { Job(testutils.MakeJobTemplate(jobName, ns).Obj()). Replicas(1). Obj()).Obj(), - newCondition: metav1.Condition{Status: metav1.ConditionTrue, Type: string(jobset.JobSetSuspended), Reason: "JobsSuspended"}, - conditions: []metav1.Condition{}, + opts: makeSuspendedConditionOpts(now), expectedUpdate: true, }, { - name: "resumed", + name: "resume (update suspended condition type in-place)", js: testutils.MakeJobSet(jobSetName, ns). ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName). Job(testutils.MakeJobTemplate(jobName, ns).Obj()). Replicas(1). - Obj()).Obj(), - newCondition: metav1.Condition{Type: string(jobset.JobSetSuspended), Reason: "JobsResumed", Status: metav1.ConditionStatus(corev1.ConditionFalse)}, - conditions: []metav1.Condition{{Type: string(jobset.JobSetSuspended), Reason: "JobsSuspended", Status: metav1.ConditionStatus(corev1.ConditionTrue)}}, + Obj()). + Conditions([]metav1.Condition{ + // JobSet is currrently suspended. + { + Type: string(jobset.JobSetSuspended), + Reason: constants.JobSetSuspendedReason, + Message: constants.JobSetSuspendedMessage, + Status: metav1.ConditionStatus(corev1.ConditionTrue), + }, + }). + Obj(), + opts: makeResumedConditionOpts(now), expectedUpdate: true, }, { - name: "duplicateComplete", + name: "existing conditions, attempt to add duplicate", js: testutils.MakeJobSet(jobSetName, ns). ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName). Job(testutils.MakeJobTemplate(jobName, ns).Obj()). Replicas(1). - Obj()).Obj(), - newCondition: metav1.Condition{Type: string(jobset.JobSetCompleted), Message: "Jobs completed", Reason: "JobsCompleted", Status: metav1.ConditionTrue}, - conditions: []metav1.Condition{{Type: string(jobset.JobSetCompleted), Message: "Jobs completed", Reason: "JobsCompleted", Status: metav1.ConditionTrue}}, + Obj()). + Conditions([]metav1.Condition{ + // JobSet is completed.. + { + Type: string(jobset.JobSetCompleted), + Reason: constants.AllJobsCompletedReason, + Message: constants.AllJobsCompletedMessage, + Status: metav1.ConditionStatus(corev1.ConditionTrue), + }, + }).Obj(), + opts: completedConditionsOpts, expectedUpdate: false, }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - jsWithConditions := tc.js - jsWithConditions.Status.Conditions = tc.conditions - gotUpdate := updateCondition(jsWithConditions, tc.newCondition, tc.forceUpdate) + gotUpdate := updateCondition(tc.js, tc.opts) if gotUpdate != tc.expectedUpdate { - t.Errorf("updateCondition return mismatch") + t.Errorf("updateCondition return mismatch (want: %v, got %v)", tc.expectedUpdate, gotUpdate) } }) } @@ -1099,7 +1086,7 @@ func TestCalculateReplicatedJobStatuses(t *testing.T) { t.Run(tc.name, func(t *testing.T) { r := JobSetReconciler{Client: (fake.NewClientBuilder()).Build()} statuses := r.calculateReplicatedJobStatuses(context.TODO(), tc.js, &tc.jobs) - var less interface{} = func(a, b jobset.ReplicatedJobStatus) bool { + less := func(a, b jobset.ReplicatedJobStatus) bool { return a.Name < b.Name } if diff := cmp.Diff(tc.expected, statuses, cmpopts.SortSlices(less)); diff != "" { diff --git a/pkg/controllers/startup_policy.go b/pkg/controllers/startup_policy.go index 84ff789d..5475d6df 100644 --- a/pkg/controllers/startup_policy.go +++ b/pkg/controllers/startup_policy.go @@ -14,6 +14,7 @@ limitations under the License. package controllers import ( + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" @@ -33,8 +34,8 @@ func inOrderStartupPolicy(sp *jobset.StartupPolicy) bool { return sp != nil && sp.StartupPolicyOrder == jobset.InOrder } -func inOrderStartupPolicyExecutingCondition() metav1.Condition { - return metav1.Condition{ +func inOrderStartupPolicyInProgressCondition() *metav1.Condition { + return &metav1.Condition{ Type: string(jobset.JobSetStartupPolicyCompleted), // Status is True when in order startup policy is completed. // Otherwise it is set as False to indicate it is still executing. @@ -44,8 +45,8 @@ func inOrderStartupPolicyExecutingCondition() metav1.Condition { } } -func inOrderStartupPolicyCompletedCondition() metav1.Condition { - return metav1.Condition{ +func inOrderStartupPolicyCompletedCondition() *metav1.Condition { + return &metav1.Condition{ Type: string(jobset.JobSetStartupPolicyCompleted), // Status is True when in order startup policy is completed. // Otherwise it is set as False to indicate it is still executing. @@ -54,3 +55,22 @@ func inOrderStartupPolicyCompletedCondition() metav1.Condition { Message: constants.InOrderStartupPolicyCompletedMessage, } } + +// setInOrderStartupPolicyInProgress sets a condition on the JobSet status indicating it is +// currently executing an in-order startup policy. +func setInOrderStartupPolicyInProgress(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { + // Add a condition to the JobSet indicating the in order startup policy is executing. + setCondition(js, &conditionOpts{ + eventType: corev1.EventTypeNormal, + condition: inOrderStartupPolicyInProgressCondition(), + }, updateStatusOpts) +} + +// setInOrderStartupPolicyCompleted sets a condition on the JobSet status indicating it has finished +// running an in-order startup policy to completion. +func setInOrderStartupPolicyCompleted(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { + setCondition(js, &conditionOpts{ + eventType: corev1.EventTypeNormal, + condition: inOrderStartupPolicyCompletedCondition(), + }, updateStatusOpts) +} diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index 4f3d4bb6..465d9af7 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -54,6 +54,12 @@ func MakeJobSet(name, ns string) *JobSetWrapper { } } +// Conditions sets the value of jobSet.status.conditions +func (j *JobSetWrapper) Conditions(conditions []metav1.Condition) *JobSetWrapper { + j.Status.Conditions = conditions + return j +} + // ManagedBy sets the value of jobSet.spec.managedBy func (j *JobSetWrapper) ManagedBy(managedBy string) *JobSetWrapper { j.Spec.ManagedBy = ptr.To(managedBy) diff --git a/test/integration/controller/jobset_controller_test.go b/test/integration/controller/jobset_controller_test.go index 024ad252..5afa01df 100644 --- a/test/integration/controller/jobset_controller_test.go +++ b/test/integration/controller/jobset_controller_test.go @@ -825,6 +825,7 @@ var _ = ginkgo.Describe("JobSet controller", func() { }) }, updates: []*update{ + // Ensure replicated job statuses report all child jobs are suspended. { checkJobSetState: func(js *jobset.JobSet) { matchJobSetReplicatedStatus(js, []jobset.ReplicatedJobStatus{ @@ -839,6 +840,8 @@ var _ = ginkgo.Describe("JobSet controller", func() { }) }, }, + // Resume jobset. Only first replicated job should be unsuspended due to in-order + // startup policy. { jobSetUpdateFn: func(js *jobset.JobSet) { suspendJobSet(js, false) @@ -858,6 +861,8 @@ var _ = ginkgo.Describe("JobSet controller", func() { }) }, }, + // Update first replicatedJob so all its child jobs are ready. This will allow + // the next replicatedJob to proceed. { jobUpdateFn: func(jobList *batchv1.JobList) { readyReplicatedJob(jobList, "replicated-job-a") diff --git a/test/util/util.go b/test/util/util.go index 8114f6b6..6fba7f98 100644 --- a/test/util/util.go +++ b/test/util/util.go @@ -96,7 +96,7 @@ func JobSetResumed(ctx context.Context, k8sClient client.Client, js *jobset.JobS } func JobSetStartupPolicyComplete(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, timeout time.Duration) { - ginkgo.By(fmt.Sprintf("checking jobset status is: %s", jobset.JobSetStartupPolicyCompleted)) + ginkgo.By(fmt.Sprintf("checking jobset condition %q status is %q", jobset.JobSetStartupPolicyCompleted, metav1.ConditionTrue)) conditions := []metav1.Condition{ { Type: string(jobset.JobSetStartupPolicyCompleted), @@ -107,7 +107,7 @@ func JobSetStartupPolicyComplete(ctx context.Context, k8sClient client.Client, j } func JobSetStartupPolicyNotFinished(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, timeout time.Duration) { - ginkgo.By(fmt.Sprintf("checking jobset status is: %s", jobset.JobSetStartupPolicyCompleted)) + ginkgo.By(fmt.Sprintf("checking jobset condition %q status is %q", jobset.JobSetStartupPolicyCompleted, metav1.ConditionFalse)) conditions := []metav1.Condition{ { Type: string(jobset.JobSetStartupPolicyCompleted), From 6e2914dcbd97bf60cbbb9c55a74598be6dbf4da7 Mon Sep 17 00:00:00 2001 From: Daniel Vega-Myhre Date: Thu, 11 Apr 2024 15:59:52 +0000 Subject: [PATCH 2/6] requeue if necessary --- pkg/controllers/jobset_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index f9530eba..715d4598 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -112,7 +112,7 @@ func (r *JobSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr } // At the end of this Reconcile attempt, do one API call to persist all the JobSet status changes. - return ctrl.Result{}, r.updateJobSetStatus(ctx, &js, &updateStatusOpts) + return ctrl.Result{RequeueAfter: result.RequeueAfter}, r.updateJobSetStatus(ctx, &js, &updateStatusOpts) } // reconcile is the internal method containing the core JobSet reconciliation logic. From a12756759d2529306ad6554d67174738b3a284c8 Mon Sep 17 00:00:00 2001 From: Daniel Vega-Myhre Date: Thu, 11 Apr 2024 16:17:15 +0000 Subject: [PATCH 3/6] only set resumed condition if we actually resumed jobs --- pkg/controllers/jobset_controller.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index 715d4598..201905eb 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -419,18 +419,20 @@ func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset return nil } } + + // If no jobs were resumed / no action was taken, there's nothing more to do here. + if numJobsResumed == 0 { + return nil + } // At this point all replicated jobs have had their jobs resumed. // If using an in order startup policy. add a condition to the JobSet indicating the // in order startup policy has completed. if inOrderStartupPolicy(startupPolicy) { setInOrderStartupPolicyCompleted(js, updateStatusOpts) } - // Finally, set the suspended condition on the JobSet to false to indicate // the JobSet is no longer suspended. - if numJobsResumed > 0 { - setJobSetResumed(js, updateStatusOpts) - } + setJobSetResumed(js, updateStatusOpts) return nil } From f5e5364acc75befbd1b4ae68178135eb48741934 Mon Sep 17 00:00:00 2001 From: Daniel Vega-Myhre Date: Fri, 12 Apr 2024 17:12:07 +0000 Subject: [PATCH 4/6] address comments --- pkg/controllers/jobset_controller.go | 48 ++++++++++++++-------------- pkg/controllers/startup_policy.go | 8 ++--- 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index 201905eb..3ab6ab5a 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -138,11 +138,8 @@ func (r *JobSetReconciler) reconcile(ctx context.Context, js *jobset.JobSet, upd } // Calculate JobsReady and update statuses for each ReplicatedJob. - // If status ReplicatedJobsStatus has changed, update the JobSet status. rjobStatuses := r.calculateReplicatedJobStatuses(ctx, js, ownedJobs) - if !replicatedJobStatusesEqual(js.Status.ReplicatedJobsStatus, rjobStatuses) { - updateReplicatedJobsStatuses(ctx, js, rjobStatuses, updateStatusOpts) - } + updateReplicatedJobsStatuses(ctx, js, rjobStatuses, updateStatusOpts) // If JobSet is already completed or failed, clean up active child jobs and requeue if TTLSecondsAfterFinished is set. if jobSetFinished(js) { @@ -175,8 +172,7 @@ func (r *JobSetReconciler) reconcile(ctx context.Context, js *jobset.JobSet, upd // If any jobs have succeeded, execute the JobSet success policy. if len(ownedJobs.successful) > 0 { - completed := executeSuccessPolicy(ctx, js, ownedJobs, updateStatusOpts) - if completed { + if completed := executeSuccessPolicy(ctx, js, ownedJobs, updateStatusOpts); completed { return ctrl.Result{}, nil } } @@ -297,6 +293,10 @@ func (r *JobSetReconciler) getChildJobs(ctx context.Context, js *jobset.JobSet) // updateReplicatedJobsStatuses updates the replicatedJob statuses if they have changed. func updateReplicatedJobsStatuses(ctx context.Context, js *jobset.JobSet, statuses []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) { + // If replicated job statuses haven't changed, there's nothing to do here. + if replicatedJobStatusesEqual(js.Status.ReplicatedJobsStatus, statuses) { + return + } // Add a new status update to perform at the end of the reconciliation attempt. js.Status.ReplicatedJobsStatus = statuses updateStatusOpts.shouldUpdate = true @@ -375,7 +375,7 @@ func (r *JobSetReconciler) suspendJobs(ctx context.Context, js *jobset.JobSet, a } } } - setJobSetSuspended(js, updateStatusOpts) + setJobSetSuspendedCondition(js, updateStatusOpts) return nil } @@ -406,7 +406,7 @@ func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset jobsFromRJob := replicatedJobToActiveJobs[replicatedJob.Name] for _, job := range jobsFromRJob { if !jobSuspended(job) { - return nil + continue } if err := r.resumeJob(ctx, job, nodeAffinities); err != nil { return err @@ -415,7 +415,7 @@ func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset } // If using in order startup policy, return early and wait for the replicated job to be ready. if numJobsResumed > 0 && inOrderStartupPolicy(startupPolicy) { - setInOrderStartupPolicyInProgress(js, updateStatusOpts) + setInOrderStartupPolicyInProgressCondition(js, updateStatusOpts) return nil } } @@ -428,11 +428,11 @@ func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset // If using an in order startup policy. add a condition to the JobSet indicating the // in order startup policy has completed. if inOrderStartupPolicy(startupPolicy) { - setInOrderStartupPolicyCompleted(js, updateStatusOpts) + setInOrderStartupPolicyCompletedCondition(js, updateStatusOpts) } // Finally, set the suspended condition on the JobSet to false to indicate // the JobSet is no longer suspended. - setJobSetResumed(js, updateStatusOpts) + setJobSetResumedCondition(js, updateStatusOpts) return nil } @@ -503,7 +503,7 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow // This updates the StartupPolicy condition and notifies that we are waiting // for this replicated job to finish. if !jobSetSuspended(js) && inOrderStartupPolicy(startupPolicy) { - setInOrderStartupPolicyInProgress(js, updateStatusOpts) + setInOrderStartupPolicyInProgressCondition(js, updateStatusOpts) return nil } } @@ -513,7 +513,7 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow } // Skip emitting a condition for StartupPolicy if JobSet is suspended if !jobSetSuspended(js) && inOrderStartupPolicy(startupPolicy) { - setInOrderStartupPolicyCompleted(js, updateStatusOpts) + setInOrderStartupPolicyCompletedCondition(js, updateStatusOpts) return nil } return nil @@ -593,7 +593,7 @@ func (r *JobSetReconciler) createHeadlessSvcIfNecessary(ctx context.Context, js // Returns a boolean value indicating if the jobset was completed or not. func executeSuccessPolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs, updateStatusOpts *statusUpdateOpts) bool { if numJobsMatchingSuccessPolicy(js, ownedJobs.successful) >= numJobsExpectedToSucceed(js) { - setJobSetCompleted(js, updateStatusOpts) + setJobSetCompletedCondition(js, updateStatusOpts) return true } return false @@ -603,14 +603,14 @@ func executeFailurePolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *chi // If no failure policy is defined, mark the JobSet as failed. if js.Spec.FailurePolicy == nil { firstFailedJob := findFirstFailedJob(ownedJobs.failed) - setJobSetFailed(ctx, js, constants.FailedJobsReason, messageWithFirstFailedJob(constants.FailedJobsMessage, firstFailedJob.Name), updateStatusOpts) + setJobSetFailedCondition(ctx, js, constants.FailedJobsReason, messageWithFirstFailedJob(constants.FailedJobsMessage, firstFailedJob.Name), updateStatusOpts) return } // If JobSet has reached max restarts, fail the JobSet. if js.Status.Restarts >= js.Spec.FailurePolicy.MaxRestarts { firstFailedJob := findFirstFailedJob(ownedJobs.failed) - setJobSetFailed(ctx, js, constants.ReachedMaxRestartsReason, messageWithFirstFailedJob(constants.ReachedMaxRestartsMessage, firstFailedJob.Name), updateStatusOpts) + setJobSetFailedCondition(ctx, js, constants.ReachedMaxRestartsReason, messageWithFirstFailedJob(constants.ReachedMaxRestartsMessage, firstFailedJob.Name), updateStatusOpts) return } @@ -952,24 +952,24 @@ func updateCondition(js *jobset.JobSet, opts *conditionOpts) bool { return true } -// setJobSetCompleted sets a condition on the JobSet status indicating it has completed. -func setJobSetCompleted(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { +// setJobSetCompletedCondition sets a condition on the JobSet status indicating it has completed. +func setJobSetCompletedCondition(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { setCondition(js, completedConditionsOpts, updateStatusOpts) } -// setJobSetFailed sets a condition on the JobSet status indicating it has failed. -func setJobSetFailed(ctx context.Context, js *jobset.JobSet, reason, msg string, updateStatusOpts *statusUpdateOpts) { +// setJobSetFailedCondition sets a condition on the JobSet status indicating it has failed. +func setJobSetFailedCondition(ctx context.Context, js *jobset.JobSet, reason, msg string, updateStatusOpts *statusUpdateOpts) { setCondition(js, makeFailedConditionOpts(reason, msg), updateStatusOpts) } -// setJobSetSuspended sets a condition on the JobSet status indicating it is currently suspended. -func setJobSetSuspended(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { +// setJobSetSuspendedCondition sets a condition on the JobSet status indicating it is currently suspended. +func setJobSetSuspendedCondition(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { setCondition(js, makeSuspendedConditionOpts(metav1.Now()), updateStatusOpts) } -// setJobSetResumed sets a condition on the JobSet status indicating it has been resumed. +// setJobSetResumedCondition sets a condition on the JobSet status indicating it has been resumed. // This updates the "suspended" condition type from "true" to "false." -func setJobSetResumed(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { +func setJobSetResumedCondition(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { setCondition(js, makeResumedConditionOpts(metav1.Now()), updateStatusOpts) } diff --git a/pkg/controllers/startup_policy.go b/pkg/controllers/startup_policy.go index 5475d6df..a720dbf1 100644 --- a/pkg/controllers/startup_policy.go +++ b/pkg/controllers/startup_policy.go @@ -56,9 +56,9 @@ func inOrderStartupPolicyCompletedCondition() *metav1.Condition { } } -// setInOrderStartupPolicyInProgress sets a condition on the JobSet status indicating it is +// setInOrderStartupPolicyInProgressCondition sets a condition on the JobSet status indicating it is // currently executing an in-order startup policy. -func setInOrderStartupPolicyInProgress(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { +func setInOrderStartupPolicyInProgressCondition(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { // Add a condition to the JobSet indicating the in order startup policy is executing. setCondition(js, &conditionOpts{ eventType: corev1.EventTypeNormal, @@ -66,9 +66,9 @@ func setInOrderStartupPolicyInProgress(js *jobset.JobSet, updateStatusOpts *stat }, updateStatusOpts) } -// setInOrderStartupPolicyCompleted sets a condition on the JobSet status indicating it has finished +// setInOrderStartupPolicyCompletedCondition sets a condition on the JobSet status indicating it has finished // running an in-order startup policy to completion. -func setInOrderStartupPolicyCompleted(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { +func setInOrderStartupPolicyCompletedCondition(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { setCondition(js, &conditionOpts{ eventType: corev1.EventTypeNormal, condition: inOrderStartupPolicyCompletedCondition(), From 6289a88ccd9bccd15b2255df4d37c0087c648f1b Mon Sep 17 00:00:00 2001 From: Daniel Vega-Myhre Date: Sat, 13 Apr 2024 00:41:18 +0000 Subject: [PATCH 5/6] refactor in order startup policy condition into two mutually exclusive conditions --- api/jobset/v1alpha2/jobset_types.go | 6 +- pkg/constants/constants.go | 6 +- pkg/controllers/jobset_controller.go | 126 +++++++++++++--------- pkg/controllers/jobset_controller_test.go | 9 +- pkg/controllers/startup_policy.go | 36 +++---- test/util/util.go | 4 +- 6 files changed, 102 insertions(+), 85 deletions(-) diff --git a/api/jobset/v1alpha2/jobset_types.go b/api/jobset/v1alpha2/jobset_types.go index 620115ec..7cc078a5 100644 --- a/api/jobset/v1alpha2/jobset_types.go +++ b/api/jobset/v1alpha2/jobset_types.go @@ -53,9 +53,11 @@ const ( JobSetCompleted JobSetConditionType = "Completed" // JobSetFailed means the job has failed its execution. JobSetFailed JobSetConditionType = "Failed" - // JobSetSuspended means the job is suspended + // JobSetSuspended means the job is suspended. JobSetSuspended JobSetConditionType = "Suspended" - // JobSetStartupPolicyCompleted means the StartupPolicy was complete + // JobSetStartupPolicyInProgress means the StartupPolicy is in progress. + JobSetStartupPolicyInProgress JobSetConditionType = "StartupPolicyInProgress" + // JobSetStartupPolicyCompleted means the StartupPolicy has completed. JobSetStartupPolicyCompleted JobSetConditionType = "StartupPolicyCompleted" ) diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index a2291565..e2807ae6 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -55,8 +55,10 @@ const ( ExclusivePlacementViolationMessage = "Pod violated JobSet exclusive placement policy" // Event reason and messages related to startup policy. - InOrderStartupPolicyReason = "StartupPolicyInOrder" - InOrderStartupPolicyExecutingMessage = "in order startup policy is executing" + InOrderStartupPolicyInProgressReason = "InOrderStartupPolicyInProgress" + InOrderStartupPolicyInProgressMessage = "in order startup policy is in progress" + + InOrderStartupPolicyCompletedReason = "InOrderStartupPolicyCompleted" InOrderStartupPolicyCompletedMessage = "in order startup policy has completed" // Event reason and messages related to JobSet restarts. diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index 3ab6ab5a..8f23d07b 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -198,10 +198,12 @@ func (r *JobSetReconciler) reconcile(ctx context.Context, js *jobset.JobSet, upd return ctrl.Result{}, err } } else { - if err := r.resumeJobsIfNecessary(ctx, js, ownedJobs.active, rjobStatuses, updateStatusOpts); err != nil { + requeue, err := r.resumeJobsIfNecessary(ctx, js, ownedJobs.active, rjobStatuses, updateStatusOpts) + if err != nil { log.Error(err, "resuming jobset") return ctrl.Result{}, err } + return ctrl.Result{Requeue: requeue}, nil } return ctrl.Result{}, nil } @@ -379,7 +381,11 @@ func (r *JobSetReconciler) suspendJobs(ctx context.Context, js *jobset.JobSet, a return nil } -func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset.JobSet, activeJobs []*batchv1.Job, replicatedJobStatuses []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) error { +// resumeJobsIfNecessary iterates through each replicatedJob, resuming any suspended jobs if the JobSet +// is not suspended. Returns a boolean value indicating if the JobSet should be requeued for reconciliation. +// This is so in-order startup policy can be respected, where after resuming one replicatedJob, we must +// wait for it to become ready before resuming the next. +func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset.JobSet, activeJobs []*batchv1.Job, replicatedJobStatuses []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) (bool, error) { // Store node selector for each replicatedJob template. nodeAffinities := map[string]map[string]string{} for _, replicatedJob := range js.Spec.ReplicatedJobs { @@ -394,7 +400,6 @@ func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset } startupPolicy := js.Spec.StartupPolicy - numJobsResumed := 0 // If JobSpec is unsuspended, ensure all active child Jobs are also // unsuspended and update the suspend condition to true. for _, replicatedJob := range js.Spec.ReplicatedJobs { @@ -409,31 +414,21 @@ func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset continue } if err := r.resumeJob(ctx, job, nodeAffinities); err != nil { - return err + return false, err } - numJobsResumed += 1 } - // If using in order startup policy, return early and wait for the replicated job to be ready. - if numJobsResumed > 0 && inOrderStartupPolicy(startupPolicy) { + // If in order startup policy, we need to return early and allow for + // this replicatedJob to become ready before resuming the next. + if inOrderStartupPolicy(startupPolicy) { setInOrderStartupPolicyInProgressCondition(js, updateStatusOpts) - return nil + return true, nil } } - // If no jobs were resumed / no action was taken, there's nothing more to do here. - if numJobsResumed == 0 { - return nil - } - // At this point all replicated jobs have had their jobs resumed. - // If using an in order startup policy. add a condition to the JobSet indicating the - // in order startup policy has completed. - if inOrderStartupPolicy(startupPolicy) { - setInOrderStartupPolicyCompletedCondition(js, updateStatusOpts) - } // Finally, set the suspended condition on the JobSet to false to indicate // the JobSet is no longer suspended. setJobSetResumedCondition(js, updateStatusOpts) - return nil + return false, nil } func (r *JobSetReconciler) resumeJob(ctx context.Context, job *batchv1.Job, nodeAffinities map[string]map[string]string) error { @@ -471,7 +466,7 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow status := findReplicatedJobStatus(replicatedJobStatus, replicatedJob.Name) - // For startup policy, if the job is started we can skip this loop. + // For startup policy, if the replicatedJob is started we can skip this loop. // Jobs have been created. if !jobSetSuspended(js) && inOrderStartupPolicy(startupPolicy) && allReplicasStarted(replicatedJob.Replicas, status) { continue @@ -501,7 +496,7 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow // If we are using inOrder StartupPolicy, then we return to wait for jobs to be ready. // This updates the StartupPolicy condition and notifies that we are waiting - // for this replicated job to finish. + // for this replicated job to start up before moving onto the next one. if !jobSetSuspended(js) && inOrderStartupPolicy(startupPolicy) { setInOrderStartupPolicyInProgressCondition(js, updateStatusOpts) return nil @@ -905,7 +900,7 @@ type conditionOpts struct { // setCondition will add a new condition to the JobSet status (or update an existing one), // and enqueue an event for emission if the status update succeeds at the end of the reconcile. func setCondition(js *jobset.JobSet, condOpts *conditionOpts, updateStatusOpts *statusUpdateOpts) { - // Return early if this condition is already set. + // Return early if no status update is required for this condition. if !updateCondition(js, condOpts) { return } @@ -934,27 +929,46 @@ func updateCondition(js *jobset.JobSet, opts *conditionOpts) bool { return false } - condition := *opts.condition - condition.LastTransitionTime = metav1.Now() - for i, val := range js.Status.Conditions { - if condition.Type == val.Type && condition.Status != val.Status { - js.Status.Conditions[i] = condition - // Condition found but different status so we should update. - return true - } else if condition.Type == val.Type && condition.Status == val.Status && condition.Reason == val.Reason && condition.Message == val.Message { - // Duplicate condition so no update. - return false + found := false + shouldUpdate := false + newCond := *opts.condition + newCond.LastTransitionTime = metav1.Now() + + for i, currCond := range js.Status.Conditions { + // If condition type has a status change, update it. + if newCond.Type == currCond.Type { + // Status change of an existing condition. Update status call will be required. + if newCond.Status != currCond.Status { + js.Status.Conditions[i] = newCond + shouldUpdate = true + } + + // If both are true or both are false, this is a duplicate condition, do nothing. + found = true + } else { + // If conditions are of different types, only perform an update if they are both true + // and they are mutually exclusive. If so, then set the existing condition status to + // false before adding the new condition. + if exclusiveConditions(currCond, newCond) && + currCond.Status == metav1.ConditionTrue && + newCond.Status == metav1.ConditionTrue { + js.Status.Conditions[i].Status = metav1.ConditionFalse + shouldUpdate = true + } } } - // Condition doesn't exist, add it. - js.Status.Conditions = append(js.Status.Conditions, condition) - return true + // Condition doesn't exist, add it if condition status is true. + if !found && newCond.Status == metav1.ConditionTrue { + js.Status.Conditions = append(js.Status.Conditions, newCond) + shouldUpdate = true + } + return shouldUpdate } // setJobSetCompletedCondition sets a condition on the JobSet status indicating it has completed. func setJobSetCompletedCondition(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { - setCondition(js, completedConditionsOpts, updateStatusOpts) + setCondition(js, makeCompletedConditionsOpts(), updateStatusOpts) } // setJobSetFailedCondition sets a condition on the JobSet status indicating it has failed. @@ -964,24 +978,26 @@ func setJobSetFailedCondition(ctx context.Context, js *jobset.JobSet, reason, ms // setJobSetSuspendedCondition sets a condition on the JobSet status indicating it is currently suspended. func setJobSetSuspendedCondition(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { - setCondition(js, makeSuspendedConditionOpts(metav1.Now()), updateStatusOpts) + setCondition(js, makeSuspendedConditionOpts(), updateStatusOpts) } // setJobSetResumedCondition sets a condition on the JobSet status indicating it has been resumed. // This updates the "suspended" condition type from "true" to "false." func setJobSetResumedCondition(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { - setCondition(js, makeResumedConditionOpts(metav1.Now()), updateStatusOpts) + setCondition(js, makeResumedConditionOpts(), updateStatusOpts) } // completedConditionsOpts contains the options we use to generate the JobSet completed condition. -var completedConditionsOpts = &conditionOpts{ - eventType: corev1.EventTypeNormal, - condition: &metav1.Condition{ - Type: string(jobset.JobSetCompleted), - Status: metav1.ConditionStatus(corev1.ConditionTrue), - Reason: constants.AllJobsCompletedReason, - Message: constants.AllJobsCompletedMessage, - }, +func makeCompletedConditionsOpts() *conditionOpts { + return &conditionOpts{ + eventType: corev1.EventTypeNormal, + condition: &metav1.Condition{ + Type: string(jobset.JobSetCompleted), + Status: metav1.ConditionStatus(corev1.ConditionTrue), + Reason: constants.AllJobsCompletedReason, + Message: constants.AllJobsCompletedMessage, + }, + } } // makeFailedConditionOpts returns the options we use to generate the JobSet failed condition. @@ -998,13 +1014,13 @@ func makeFailedConditionOpts(reason, msg string) *conditionOpts { } // makeSuspendedConditionOpts returns the options we use to generate the JobSet suspended condition. -func makeSuspendedConditionOpts(now metav1.Time) *conditionOpts { +func makeSuspendedConditionOpts() *conditionOpts { return &conditionOpts{ eventType: corev1.EventTypeNormal, condition: &metav1.Condition{ Type: string(jobset.JobSetSuspended), Status: metav1.ConditionStatus(corev1.ConditionTrue), - LastTransitionTime: now, + LastTransitionTime: metav1.Now(), Reason: constants.JobSetSuspendedReason, Message: constants.JobSetSuspendedMessage, }, @@ -1012,13 +1028,13 @@ func makeSuspendedConditionOpts(now metav1.Time) *conditionOpts { } // makeResumedConditionOpts returns the options we use to generate the JobSet resumed condition. -func makeResumedConditionOpts(now metav1.Time) *conditionOpts { +func makeResumedConditionOpts() *conditionOpts { return &conditionOpts{ eventType: corev1.EventTypeNormal, condition: &metav1.Condition{ Type: string(jobset.JobSetSuspended), Status: metav1.ConditionStatus(corev1.ConditionFalse), - LastTransitionTime: now, + LastTransitionTime: metav1.Now(), Reason: constants.JobSetResumedReason, Message: constants.JobSetResumedMessage, }, @@ -1037,3 +1053,13 @@ func replicatedJobStatusesEqual(oldStatuses, newStatuses []jobset.ReplicatedJobS }) return apiequality.Semantic.DeepEqual(oldStatuses, newStatuses) } + +// exclusiveConditions accepts 2 conditions and returns a boolean indicating if +// they are mutually exclusive. +func exclusiveConditions(cond1, cond2 metav1.Condition) bool { + inProgressAndCompleted := cond1.Type == string(jobset.JobSetStartupPolicyInProgress) && + cond2.Type == string(jobset.JobSetStartupPolicyCompleted) + completedAndInProgress := cond1.Type == string(jobset.JobSetStartupPolicyCompleted) && + cond2.Type == string(jobset.JobSetStartupPolicyInProgress) + return inProgressAndCompleted || completedAndInProgress +} diff --git a/pkg/controllers/jobset_controller_test.go b/pkg/controllers/jobset_controller_test.go index 59dfd03d..ecd42a2e 100644 --- a/pkg/controllers/jobset_controller_test.go +++ b/pkg/controllers/jobset_controller_test.go @@ -661,7 +661,6 @@ func TestUpdateConditions(t *testing.T) { replicatedJobName = "replicated-job" jobName = "test-job" ns = "default" - now = metav1.Now() ) tests := []struct { @@ -687,7 +686,7 @@ func TestUpdateConditions(t *testing.T) { Job(testutils.MakeJobTemplate(jobName, ns).Obj()). Replicas(1). Obj()).Obj(), - opts: completedConditionsOpts, + opts: makeCompletedConditionsOpts(), expectedUpdate: true, }, { @@ -697,7 +696,7 @@ func TestUpdateConditions(t *testing.T) { Job(testutils.MakeJobTemplate(jobName, ns).Obj()). Replicas(1). Obj()).Obj(), - opts: makeSuspendedConditionOpts(now), + opts: makeSuspendedConditionOpts(), expectedUpdate: true, }, { @@ -717,7 +716,7 @@ func TestUpdateConditions(t *testing.T) { }, }). Obj(), - opts: makeResumedConditionOpts(now), + opts: makeResumedConditionOpts(), expectedUpdate: true, }, { @@ -736,7 +735,7 @@ func TestUpdateConditions(t *testing.T) { Status: metav1.ConditionStatus(corev1.ConditionTrue), }, }).Obj(), - opts: completedConditionsOpts, + opts: makeCompletedConditionsOpts(), expectedUpdate: false, }, } diff --git a/pkg/controllers/startup_policy.go b/pkg/controllers/startup_policy.go index a720dbf1..8960595c 100644 --- a/pkg/controllers/startup_policy.go +++ b/pkg/controllers/startup_policy.go @@ -34,35 +34,18 @@ func inOrderStartupPolicy(sp *jobset.StartupPolicy) bool { return sp != nil && sp.StartupPolicyOrder == jobset.InOrder } -func inOrderStartupPolicyInProgressCondition() *metav1.Condition { - return &metav1.Condition{ - Type: string(jobset.JobSetStartupPolicyCompleted), - // Status is True when in order startup policy is completed. - // Otherwise it is set as False to indicate it is still executing. - Status: metav1.ConditionFalse, - Reason: constants.InOrderStartupPolicyReason, - Message: constants.InOrderStartupPolicyExecutingMessage, - } -} - -func inOrderStartupPolicyCompletedCondition() *metav1.Condition { - return &metav1.Condition{ - Type: string(jobset.JobSetStartupPolicyCompleted), - // Status is True when in order startup policy is completed. - // Otherwise it is set as False to indicate it is still executing. - Status: metav1.ConditionTrue, - Reason: constants.InOrderStartupPolicyReason, - Message: constants.InOrderStartupPolicyCompletedMessage, - } -} - // setInOrderStartupPolicyInProgressCondition sets a condition on the JobSet status indicating it is // currently executing an in-order startup policy. func setInOrderStartupPolicyInProgressCondition(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { // Add a condition to the JobSet indicating the in order startup policy is executing. setCondition(js, &conditionOpts{ eventType: corev1.EventTypeNormal, - condition: inOrderStartupPolicyInProgressCondition(), + condition: &metav1.Condition{ + Type: string(jobset.JobSetStartupPolicyInProgress), + Status: metav1.ConditionTrue, + Reason: constants.InOrderStartupPolicyInProgressReason, + Message: constants.InOrderStartupPolicyInProgressMessage, + }, }, updateStatusOpts) } @@ -71,6 +54,11 @@ func setInOrderStartupPolicyInProgressCondition(js *jobset.JobSet, updateStatusO func setInOrderStartupPolicyCompletedCondition(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { setCondition(js, &conditionOpts{ eventType: corev1.EventTypeNormal, - condition: inOrderStartupPolicyCompletedCondition(), + condition: &metav1.Condition{ + Type: string(jobset.JobSetStartupPolicyCompleted), + Status: metav1.ConditionTrue, + Reason: constants.InOrderStartupPolicyCompletedReason, + Message: constants.InOrderStartupPolicyCompletedMessage, + }, }, updateStatusOpts) } diff --git a/test/util/util.go b/test/util/util.go index 6fba7f98..026d3aeb 100644 --- a/test/util/util.go +++ b/test/util/util.go @@ -110,8 +110,8 @@ func JobSetStartupPolicyNotFinished(ctx context.Context, k8sClient client.Client ginkgo.By(fmt.Sprintf("checking jobset condition %q status is %q", jobset.JobSetStartupPolicyCompleted, metav1.ConditionFalse)) conditions := []metav1.Condition{ { - Type: string(jobset.JobSetStartupPolicyCompleted), - Status: metav1.ConditionFalse, + Type: string(jobset.JobSetStartupPolicyInProgress), + Status: metav1.ConditionTrue, }, } gomega.Eventually(checkJobSetStatus, timeout, interval).WithArguments(ctx, k8sClient, js, conditions).Should(gomega.Equal(true)) From f199865d1ed2482eb02aaf96afdb59f06d0a771c Mon Sep 17 00:00:00 2001 From: Daniel Vega-Myhre Date: Sat, 13 Apr 2024 17:44:25 +0000 Subject: [PATCH 6/6] remove requeue --- pkg/controllers/jobset_controller.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index 8f23d07b..fe39cc83 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -198,12 +198,10 @@ func (r *JobSetReconciler) reconcile(ctx context.Context, js *jobset.JobSet, upd return ctrl.Result{}, err } } else { - requeue, err := r.resumeJobsIfNecessary(ctx, js, ownedJobs.active, rjobStatuses, updateStatusOpts) - if err != nil { + if err := r.resumeJobsIfNecessary(ctx, js, ownedJobs.active, rjobStatuses, updateStatusOpts); err != nil { log.Error(err, "resuming jobset") return ctrl.Result{}, err } - return ctrl.Result{Requeue: requeue}, nil } return ctrl.Result{}, nil } @@ -385,7 +383,7 @@ func (r *JobSetReconciler) suspendJobs(ctx context.Context, js *jobset.JobSet, a // is not suspended. Returns a boolean value indicating if the JobSet should be requeued for reconciliation. // This is so in-order startup policy can be respected, where after resuming one replicatedJob, we must // wait for it to become ready before resuming the next. -func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset.JobSet, activeJobs []*batchv1.Job, replicatedJobStatuses []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) (bool, error) { +func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset.JobSet, activeJobs []*batchv1.Job, replicatedJobStatuses []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) error { // Store node selector for each replicatedJob template. nodeAffinities := map[string]map[string]string{} for _, replicatedJob := range js.Spec.ReplicatedJobs { @@ -414,21 +412,21 @@ func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset continue } if err := r.resumeJob(ctx, job, nodeAffinities); err != nil { - return false, err + return err } } // If in order startup policy, we need to return early and allow for // this replicatedJob to become ready before resuming the next. if inOrderStartupPolicy(startupPolicy) { setInOrderStartupPolicyInProgressCondition(js, updateStatusOpts) - return true, nil + return nil } } // Finally, set the suspended condition on the JobSet to false to indicate // the JobSet is no longer suspended. setJobSetResumedCondition(js, updateStatusOpts) - return false, nil + return nil } func (r *JobSetReconciler) resumeJob(ctx context.Context, job *batchv1.Job, nodeAffinities map[string]map[string]string) error {