From 25396bee8f80c503aa90c06e46a46f92e8d822a5 Mon Sep 17 00:00:00 2001 From: googs1025 Date: Wed, 5 Jun 2024 15:00:29 +0800 Subject: [PATCH 1/6] feat: add jobset status feat: add jobset status --- api/jobset/v1alpha2/jobset_types.go | 7 +++++ api/jobset/v1alpha2/openapi_generated.go | 7 +++++ .../jobset/v1alpha2/jobsetstatus.go | 9 ++++++ .../crd/bases/jobset.x-k8s.io_jobsets.yaml | 8 +++++ hack/python-sdk/swagger.json | 4 +++ pkg/controllers/failure_policy.go | 1 + pkg/controllers/jobset_controller.go | 23 ++++++++++---- pkg/controllers/jobset_controller_test.go | 4 +-- pkg/util/testing/wrappers.go | 6 ++++ sdk/python/docs/JobsetV1alpha2JobSetStatus.md | 1 + .../models/jobset_v1alpha2_job_set_status.py | 30 ++++++++++++++++++- .../test/test_jobset_v1alpha2_job_set.py | 1 + .../test/test_jobset_v1alpha2_job_set_list.py | 2 ++ .../test_jobset_v1alpha2_job_set_status.py | 1 + .../controller/jobset_controller_test.go | 1 + 15 files changed, 96 insertions(+), 9 deletions(-) diff --git a/api/jobset/v1alpha2/jobset_types.go b/api/jobset/v1alpha2/jobset_types.go index 3ba52f7f..b6b41daa 100644 --- a/api/jobset/v1alpha2/jobset_types.go +++ b/api/jobset/v1alpha2/jobset_types.go @@ -50,6 +50,8 @@ type JobSetConditionType string // These are built-in conditions of a JobSet. const ( + // JobSetRunning means the job is running. + JobSetRunning JobSetConditionType = "Running" // JobSetCompleted means the job has completed its execution. JobSetCompleted JobSetConditionType = "Completed" // JobSetFailed means the job has failed its execution. @@ -134,6 +136,10 @@ type JobSetStatus struct { // RestartsCountTowardsMax tracks the number of times the JobSet has restarted that counts towards the maximum allowed number of restarts. RestartsCountTowardsMax int32 `json:"restartsCountTowardsMax,omitempty"` + // Phase of the JobSet. + // +kubebuilder:default="Running" + Phase string `json:"phase,omitempty"` + // ReplicatedJobsStatus track the number of JobsReady for each replicatedJob. // +optional // +listType=map @@ -169,6 +175,7 @@ type ReplicatedJobStatus struct { // +k8s:openapi-gen=true // +kubebuilder:object:root=true // +kubebuilder:subresource:status +// +kubebuilder:printcolumn:name="Phase",JSONPath=".status.phase",type=string,description="Phase of the JobSet" // +kubebuilder:printcolumn:name="Restarts",JSONPath=".status.restarts",type=string,description="Number of restarts" // +kubebuilder:printcolumn:name="Completed",type="string",priority=0,JSONPath=".status.conditions[?(@.type==\"Completed\")].status" // +kubebuilder:printcolumn:name="Suspended",type="string",JSONPath=".spec.suspend",description="JobSet suspended" diff --git a/api/jobset/v1alpha2/openapi_generated.go b/api/jobset/v1alpha2/openapi_generated.go index 55cc4b6b..d1417408 100644 --- a/api/jobset/v1alpha2/openapi_generated.go +++ b/api/jobset/v1alpha2/openapi_generated.go @@ -358,6 +358,13 @@ func schema_jobset_api_jobset_v1alpha2_JobSetStatus(ref common.ReferenceCallback Format: "int32", }, }, + "phase": { + SchemaProps: spec.SchemaProps{ + Description: "Phase of the JobSet.", + Type: []string{"string"}, + Format: "", + }, + }, "replicatedJobsStatus": { VendorExtensible: spec.VendorExtensible{ Extensions: spec.Extensions{ diff --git a/client-go/applyconfiguration/jobset/v1alpha2/jobsetstatus.go b/client-go/applyconfiguration/jobset/v1alpha2/jobsetstatus.go index f31e5d99..c504a1a1 100644 --- a/client-go/applyconfiguration/jobset/v1alpha2/jobsetstatus.go +++ b/client-go/applyconfiguration/jobset/v1alpha2/jobsetstatus.go @@ -24,6 +24,7 @@ type JobSetStatusApplyConfiguration struct { Conditions []v1.Condition `json:"conditions,omitempty"` Restarts *int32 `json:"restarts,omitempty"` RestartsCountTowardsMax *int32 `json:"restartsCountTowardsMax,omitempty"` + Phase *string `json:"phase,omitempty"` ReplicatedJobsStatus []ReplicatedJobStatusApplyConfiguration `json:"replicatedJobsStatus,omitempty"` } @@ -59,6 +60,14 @@ func (b *JobSetStatusApplyConfiguration) WithRestartsCountTowardsMax(value int32 return b } +// WithPhase sets the Phase field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Phase field is set to the value of the last call. +func (b *JobSetStatusApplyConfiguration) WithPhase(value string) *JobSetStatusApplyConfiguration { + b.Phase = &value + return b +} + // WithReplicatedJobsStatus adds the given value to the ReplicatedJobsStatus field in the declarative configuration // and returns the receiver, so that objects can be build by chaining "With" function invocations. // If called multiple times, values provided by each call will be appended to the ReplicatedJobsStatus field. diff --git a/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml b/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml index ebd17bc1..d5ecaf35 100644 --- a/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml +++ b/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml @@ -15,6 +15,10 @@ spec: scope: Namespaced versions: - additionalPrinterColumns: + - description: Phase of the JobSet + jsonPath: .status.phase + name: Phase + type: string - description: Number of restarts jsonPath: .status.restarts name: Restarts @@ -8487,6 +8491,10 @@ spec: x-kubernetes-list-map-keys: - type x-kubernetes-list-type: map + phase: + default: Running + description: Phase of the JobSet. + type: string replicatedJobsStatus: description: ReplicatedJobsStatus track the number of JobsReady for each replicatedJob. diff --git a/hack/python-sdk/swagger.json b/hack/python-sdk/swagger.json index 8df6b367..bbd8247b 100644 --- a/hack/python-sdk/swagger.json +++ b/hack/python-sdk/swagger.json @@ -179,6 +179,10 @@ ], "x-kubernetes-list-type": "map" }, + "phase": { + "description": "Phase of the JobSet.", + "type": "string" + }, "replicatedJobsStatus": { "description": "ReplicatedJobsStatus track the number of JobsReady for each replicatedJob.", "type": "array", diff --git a/pkg/controllers/failure_policy.go b/pkg/controllers/failure_policy.go index 2c6369da..5013bc0f 100644 --- a/pkg/controllers/failure_policy.go +++ b/pkg/controllers/failure_policy.go @@ -250,6 +250,7 @@ func makeFailedConditionOpts(reason, msg string) *conditionOpts { Reason: reason, Message: msg, }, + phase: string(jobset.JobSetFailed), eventType: corev1.EventTypeWarning, } } diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index 8753c9ee..679e571e 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -630,7 +630,7 @@ func (r *JobSetReconciler) createHeadlessSvcIfNecessary(ctx context.Context, js // executeSuccessPolicy checks the completed jobs against the jobset success policy // and updates the jobset status to completed if the success policy conditions are met. // Returns a boolean value indicating if the jobset was completed or not. -func executeSuccessPolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs, updateStatusOpts *statusUpdateOpts) bool { +func executeSuccessPolicy(_ context.Context, js *jobset.JobSet, ownedJobs *childJobs, updateStatusOpts *statusUpdateOpts) bool { if numJobsMatchingSuccessPolicy(js, ownedJobs.successful) >= numJobsExpectedToSucceed(js) { setJobSetCompletedCondition(js, updateStatusOpts) return true @@ -867,14 +867,15 @@ func enqueueEvent(updateStatusOpts *statusUpdateOpts, event *eventParams) { // function parameters for setCondition type conditionOpts struct { eventType string + phase string condition *metav1.Condition } -// setCondition will add a new condition to the JobSet status (or update an existing one), +// setCondition will add a new condition and phase to the JobSet status (or update an existing one), // and enqueue an event for emission if the status update succeeds at the end of the reconcile. func setCondition(js *jobset.JobSet, condOpts *conditionOpts, updateStatusOpts *statusUpdateOpts) { - // Return early if no status update is required for this condition. - if !updateCondition(js, condOpts) { + // Return early if no status update is required for this condition and phase. + if !updateConditionAndPhase(js, condOpts) { return } @@ -897,12 +898,12 @@ func setCondition(js *jobset.JobSet, condOpts *conditionOpts, updateStatusOpts * enqueueEvent(updateStatusOpts, event) } -// updateCondition accepts a given condition and does one of the following: +// updateConditionAndPhase accepts a given condition and does one of the following: // 1. If an identical condition already exists, do nothing and return false (indicating // no change was made). // 2. If a condition of the same type exists but with a different status, update // the condition in place and return true (indicating a condition change was made). -func updateCondition(js *jobset.JobSet, opts *conditionOpts) bool { +func updateConditionAndPhase(js *jobset.JobSet, opts *conditionOpts) bool { if opts == nil || opts.condition == nil { return false } @@ -941,6 +942,13 @@ func updateCondition(js *jobset.JobSet, opts *conditionOpts) bool { js.Status.Conditions = append(js.Status.Conditions, newCond) shouldUpdate = true } + + // Update the JobSet phase if necessary. + if opts.phase != "" && js.Status.Phase != opts.phase { + js.Status.Phase = opts.phase + shouldUpdate = true + } + return shouldUpdate } @@ -970,6 +978,7 @@ func makeCompletedConditionsOpts() *conditionOpts { Reason: constants.AllJobsCompletedReason, Message: constants.AllJobsCompletedMessage, }, + phase: string(jobset.JobSetCompleted), } } @@ -984,6 +993,7 @@ func makeSuspendedConditionOpts() *conditionOpts { Reason: constants.JobSetSuspendedReason, Message: constants.JobSetSuspendedMessage, }, + phase: string(jobset.JobSetSuspended), } } @@ -998,6 +1008,7 @@ func makeResumedConditionOpts() *conditionOpts { Reason: constants.JobSetResumedReason, Message: constants.JobSetResumedMessage, }, + phase: string(jobset.JobSetRunning), } } diff --git a/pkg/controllers/jobset_controller_test.go b/pkg/controllers/jobset_controller_test.go index a978b55c..1709768d 100644 --- a/pkg/controllers/jobset_controller_test.go +++ b/pkg/controllers/jobset_controller_test.go @@ -736,7 +736,7 @@ func TestUpdateConditions(t *testing.T) { ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName). Job(testutils.MakeJobTemplate(jobName, ns).Obj()). Replicas(1). - Obj()). + Obj()).Phase(string(jobset.JobSetCompleted)). Conditions([]metav1.Condition{ // JobSet is completed.. { @@ -752,7 +752,7 @@ func TestUpdateConditions(t *testing.T) { } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - gotUpdate := updateCondition(tc.js, tc.opts) + gotUpdate := updateConditionAndPhase(tc.js, tc.opts) if gotUpdate != tc.expectedUpdate { t.Errorf("updateCondition return mismatch (want: %v, got %v)", tc.expectedUpdate, gotUpdate) } diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index 211b1b29..ccdb9c34 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -159,6 +159,12 @@ func (j *JobSetWrapper) FailedCondition(failedAt metav1.Time) *JobSetWrapper { return j } +// Phase sets the value of JobSet.Status.Phase. +func (j *JobSetWrapper) Phase(phase string) *JobSetWrapper { + j.Status.Phase = phase + return j +} + func (j *JobSetWrapper) DeletionTimestamp(deletionTimestamp *metav1.Time) *JobSetWrapper { j.ObjectMeta.DeletionTimestamp = deletionTimestamp return j diff --git a/sdk/python/docs/JobsetV1alpha2JobSetStatus.md b/sdk/python/docs/JobsetV1alpha2JobSetStatus.md index 3a223368..ca1867a4 100644 --- a/sdk/python/docs/JobsetV1alpha2JobSetStatus.md +++ b/sdk/python/docs/JobsetV1alpha2JobSetStatus.md @@ -5,6 +5,7 @@ JobSetStatus defines the observed state of JobSet Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **conditions** | [**list[V1Condition]**](V1Condition.md) | | [optional] +**phase** | **str** | Phase of the JobSet. | [optional] **replicated_jobs_status** | [**list[JobsetV1alpha2ReplicatedJobStatus]**](JobsetV1alpha2ReplicatedJobStatus.md) | ReplicatedJobsStatus track the number of JobsReady for each replicatedJob. | [optional] **restarts** | **int** | Restarts tracks the number of times the JobSet has restarted (i.e. recreated in case of RecreateAll policy). | [optional] **restarts_count_towards_max** | **int** | RestartsCountTowardsMax tracks the number of times the JobSet has restarted that counts towards the maximum allowed number of restarts. | [optional] diff --git a/sdk/python/jobset/models/jobset_v1alpha2_job_set_status.py b/sdk/python/jobset/models/jobset_v1alpha2_job_set_status.py index e6486617..dcdb02c6 100644 --- a/sdk/python/jobset/models/jobset_v1alpha2_job_set_status.py +++ b/sdk/python/jobset/models/jobset_v1alpha2_job_set_status.py @@ -34,6 +34,7 @@ class JobsetV1alpha2JobSetStatus(object): """ openapi_types = { 'conditions': 'list[V1Condition]', + 'phase': 'str', 'replicated_jobs_status': 'list[JobsetV1alpha2ReplicatedJobStatus]', 'restarts': 'int', 'restarts_count_towards_max': 'int' @@ -41,18 +42,20 @@ class JobsetV1alpha2JobSetStatus(object): attribute_map = { 'conditions': 'conditions', + 'phase': 'phase', 'replicated_jobs_status': 'replicatedJobsStatus', 'restarts': 'restarts', 'restarts_count_towards_max': 'restartsCountTowardsMax' } - def __init__(self, conditions=None, replicated_jobs_status=None, restarts=None, restarts_count_towards_max=None, local_vars_configuration=None): # noqa: E501 + def __init__(self, conditions=None, phase=None, replicated_jobs_status=None, restarts=None, restarts_count_towards_max=None, local_vars_configuration=None): # noqa: E501 """JobsetV1alpha2JobSetStatus - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration() self.local_vars_configuration = local_vars_configuration self._conditions = None + self._phase = None self._replicated_jobs_status = None self._restarts = None self._restarts_count_towards_max = None @@ -60,6 +63,8 @@ def __init__(self, conditions=None, replicated_jobs_status=None, restarts=None, if conditions is not None: self.conditions = conditions + if phase is not None: + self.phase = phase if replicated_jobs_status is not None: self.replicated_jobs_status = replicated_jobs_status if restarts is not None: @@ -88,6 +93,29 @@ def conditions(self, conditions): self._conditions = conditions + @property + def phase(self): + """Gets the phase of this JobsetV1alpha2JobSetStatus. # noqa: E501 + + Phase of the JobSet. # noqa: E501 + + :return: The phase of this JobsetV1alpha2JobSetStatus. # noqa: E501 + :rtype: str + """ + return self._phase + + @phase.setter + def phase(self, phase): + """Sets the phase of this JobsetV1alpha2JobSetStatus. + + Phase of the JobSet. # noqa: E501 + + :param phase: The phase of this JobsetV1alpha2JobSetStatus. # noqa: E501 + :type: str + """ + + self._phase = phase + @property def replicated_jobs_status(self): """Gets the replicated_jobs_status of this JobsetV1alpha2JobSetStatus. # noqa: E501 diff --git a/sdk/python/test/test_jobset_v1alpha2_job_set.py b/sdk/python/test/test_jobset_v1alpha2_job_set.py index 34f58a0e..07b6ca63 100644 --- a/sdk/python/test/test_jobset_v1alpha2_job_set.py +++ b/sdk/python/test/test_jobset_v1alpha2_job_set.py @@ -76,6 +76,7 @@ def make_instance(self, include_optional): conditions = [ None ], + phase = '0', replicated_jobs_status = [ jobset.models.jobset_v1alpha2_replicated_job_status.JobsetV1alpha2ReplicatedJobStatus( active = 56, diff --git a/sdk/python/test/test_jobset_v1alpha2_job_set_list.py b/sdk/python/test/test_jobset_v1alpha2_job_set_list.py index dad4a1d9..4caa5513 100644 --- a/sdk/python/test/test_jobset_v1alpha2_job_set_list.py +++ b/sdk/python/test/test_jobset_v1alpha2_job_set_list.py @@ -79,6 +79,7 @@ def make_instance(self, include_optional): conditions = [ None ], + phase = '0', replicated_jobs_status = [ jobset.models.jobset_v1alpha2_replicated_job_status.JobsetV1alpha2ReplicatedJobStatus( active = 56, @@ -136,6 +137,7 @@ def make_instance(self, include_optional): conditions = [ None ], + phase = '0', replicated_jobs_status = [ jobset.models.jobset_v1alpha2_replicated_job_status.JobsetV1alpha2ReplicatedJobStatus( active = 56, diff --git a/sdk/python/test/test_jobset_v1alpha2_job_set_status.py b/sdk/python/test/test_jobset_v1alpha2_job_set_status.py index d7457001..867c58dd 100644 --- a/sdk/python/test/test_jobset_v1alpha2_job_set_status.py +++ b/sdk/python/test/test_jobset_v1alpha2_job_set_status.py @@ -41,6 +41,7 @@ def make_instance(self, include_optional): conditions = [ None ], + phase = '0', replicated_jobs_status = [ jobset.models.jobset_v1alpha2_replicated_job_status.JobsetV1alpha2ReplicatedJobStatus( active = 56, diff --git a/test/integration/controller/jobset_controller_test.go b/test/integration/controller/jobset_controller_test.go index 46a598a4..e6367bc8 100644 --- a/test/integration/controller/jobset_controller_test.go +++ b/test/integration/controller/jobset_controller_test.go @@ -1612,6 +1612,7 @@ var _ = ginkgo.Describe("JobSet controller", func() { LastTransitionTime: metav1.Now(), }, }, + Phase: string(jobset.JobSetRunning), Restarts: 1, ReplicatedJobsStatus: []jobset.ReplicatedJobStatus{ { From 9e78074fab3026ecab004b053ac36e25dd664d60 Mon Sep 17 00:00:00 2001 From: googs1025 Date: Fri, 7 Jun 2024 08:46:03 +0800 Subject: [PATCH 2/6] add updateConditionAndPhase comment --- pkg/controllers/jobset_controller.go | 6 ++++-- pkg/controllers/jobset_controller_test.go | 8 ++++---- pkg/util/testing/wrappers.go | 4 ++-- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index 679e571e..3df7c73a 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -898,11 +898,13 @@ func setCondition(js *jobset.JobSet, condOpts *conditionOpts, updateStatusOpts * enqueueEvent(updateStatusOpts, event) } -// updateConditionAndPhase accepts a given condition and does one of the following: +// updateConditionAndPhase accepts a condition and a phase, and does the following: // 1. If an identical condition already exists, do nothing and return false (indicating // no change was made). // 2. If a condition of the same type exists but with a different status, update // the condition in place and return true (indicating a condition change was made). +// 3. If the specified phase is different from the current phase of the JobSet, +// update the JobSet Status Phase func updateConditionAndPhase(js *jobset.JobSet, opts *conditionOpts) bool { if opts == nil || opts.condition == nil { return false @@ -943,7 +945,7 @@ func updateConditionAndPhase(js *jobset.JobSet, opts *conditionOpts) bool { shouldUpdate = true } - // Update the JobSet phase if necessary. + // Update the JobSet Status Phase if necessary. if opts.phase != "" && js.Status.Phase != opts.phase { js.Status.Phase = opts.phase shouldUpdate = true diff --git a/pkg/controllers/jobset_controller_test.go b/pkg/controllers/jobset_controller_test.go index 1709768d..9eaa7462 100644 --- a/pkg/controllers/jobset_controller_test.go +++ b/pkg/controllers/jobset_controller_test.go @@ -696,7 +696,7 @@ func TestUpdateConditions(t *testing.T) { ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName). Job(testutils.MakeJobTemplate(jobName, ns).Obj()). Replicas(1). - Obj()).Obj(), + Obj()).Phase(jobset.JobSetRunning).Obj(), opts: makeCompletedConditionsOpts(), expectedUpdate: true, }, @@ -706,7 +706,7 @@ func TestUpdateConditions(t *testing.T) { ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName). Job(testutils.MakeJobTemplate(jobName, ns).Obj()). Replicas(1). - Obj()).Obj(), + Obj()).Phase(jobset.JobSetRunning).Obj(), opts: makeSuspendedConditionOpts(), expectedUpdate: true, }, @@ -716,7 +716,7 @@ func TestUpdateConditions(t *testing.T) { ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName). Job(testutils.MakeJobTemplate(jobName, ns).Obj()). Replicas(1). - Obj()). + Obj()).Phase(jobset.JobSetRunning). Conditions([]metav1.Condition{ // JobSet is currrently suspended. { @@ -736,7 +736,7 @@ func TestUpdateConditions(t *testing.T) { ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName). Job(testutils.MakeJobTemplate(jobName, ns).Obj()). Replicas(1). - Obj()).Phase(string(jobset.JobSetCompleted)). + Obj()).Phase(jobset.JobSetCompleted). Conditions([]metav1.Condition{ // JobSet is completed.. { diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index ccdb9c34..b36f6d0b 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -160,8 +160,8 @@ func (j *JobSetWrapper) FailedCondition(failedAt metav1.Time) *JobSetWrapper { } // Phase sets the value of JobSet.Status.Phase. -func (j *JobSetWrapper) Phase(phase string) *JobSetWrapper { - j.Status.Phase = phase +func (j *JobSetWrapper) Phase(phase jobset.JobSetConditionType) *JobSetWrapper { + j.Status.Phase = string(phase) return j } From c45851b4c40eb02ccd9c6c1f6fed55c4711b229c Mon Sep 17 00:00:00 2001 From: googs1025 Date: Mon, 10 Jun 2024 15:15:27 +0800 Subject: [PATCH 3/6] remove unnecessary ctx --- pkg/controllers/jobset_controller.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index 3df7c73a..428f6347 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -149,7 +149,7 @@ func (r *JobSetReconciler) reconcile(ctx context.Context, js *jobset.JobSet, upd // Calculate JobsReady and update statuses for each ReplicatedJob. rjobStatuses := r.calculateReplicatedJobStatuses(ctx, js, ownedJobs) - updateReplicatedJobsStatuses(ctx, js, rjobStatuses, updateStatusOpts) + updateReplicatedJobsStatuses(js, rjobStatuses, updateStatusOpts) // If JobSet is already completed or failed, clean up active child jobs and requeue if TTLSecondsAfterFinished is set. if jobSetFinished(js) { @@ -185,7 +185,7 @@ func (r *JobSetReconciler) reconcile(ctx context.Context, js *jobset.JobSet, upd // If any jobs have succeeded, execute the JobSet success policy. if len(ownedJobs.successful) > 0 { - if completed := executeSuccessPolicy(ctx, js, ownedJobs, updateStatusOpts); completed { + if completed := executeSuccessPolicy(js, ownedJobs, updateStatusOpts); completed { return ctrl.Result{}, nil } } @@ -304,7 +304,7 @@ func (r *JobSetReconciler) getChildJobs(ctx context.Context, js *jobset.JobSet) } // updateReplicatedJobsStatuses updates the replicatedJob statuses if they have changed. -func updateReplicatedJobsStatuses(ctx context.Context, js *jobset.JobSet, statuses []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) { +func updateReplicatedJobsStatuses(js *jobset.JobSet, statuses []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) { // If replicated job statuses haven't changed, there's nothing to do here. if replicatedJobStatusesEqual(js.Status.ReplicatedJobsStatus, statuses) { return @@ -630,7 +630,7 @@ func (r *JobSetReconciler) createHeadlessSvcIfNecessary(ctx context.Context, js // executeSuccessPolicy checks the completed jobs against the jobset success policy // and updates the jobset status to completed if the success policy conditions are met. // Returns a boolean value indicating if the jobset was completed or not. -func executeSuccessPolicy(_ context.Context, js *jobset.JobSet, ownedJobs *childJobs, updateStatusOpts *statusUpdateOpts) bool { +func executeSuccessPolicy(js *jobset.JobSet, ownedJobs *childJobs, updateStatusOpts *statusUpdateOpts) bool { if numJobsMatchingSuccessPolicy(js, ownedJobs.successful) >= numJobsExpectedToSucceed(js) { setJobSetCompletedCondition(js, updateStatusOpts) return true From 4137a08b6008009ad2ed0bee330fca4ec346d970 Mon Sep 17 00:00:00 2001 From: googs1025 Date: Thu, 20 Jun 2024 21:07:52 +0800 Subject: [PATCH 4/6] feat: add jobset TerminalState field Signed-off-by: googs1025 --- api/jobset/v1alpha2/jobset_types.go | 10 ++- api/jobset/v1alpha2/openapi_generated.go | 4 +- .../jobset/v1alpha2/jobsetstatus.go | 10 +-- .../crd/bases/jobset.x-k8s.io_jobsets.yaml | 15 ++--- hack/python-sdk/swagger.json | 8 +-- pkg/controllers/failure_policy.go | 4 +- pkg/controllers/jobset_controller.go | 30 +++++---- pkg/controllers/jobset_controller_test.go | 10 +-- pkg/util/testing/wrappers.go | 6 +- sdk/python/docs/JobsetV1alpha2JobSetStatus.md | 2 +- .../models/jobset_v1alpha2_job_set_status.py | 62 +++++++++---------- .../test/test_jobset_v1alpha2_job_set.py | 4 +- .../test/test_jobset_v1alpha2_job_set_list.py | 8 +-- .../test_jobset_v1alpha2_job_set_status.py | 4 +- .../controller/jobset_controller_test.go | 1 - 15 files changed, 87 insertions(+), 91 deletions(-) diff --git a/api/jobset/v1alpha2/jobset_types.go b/api/jobset/v1alpha2/jobset_types.go index b6b41daa..e6d1736c 100644 --- a/api/jobset/v1alpha2/jobset_types.go +++ b/api/jobset/v1alpha2/jobset_types.go @@ -50,8 +50,6 @@ type JobSetConditionType string // These are built-in conditions of a JobSet. const ( - // JobSetRunning means the job is running. - JobSetRunning JobSetConditionType = "Running" // JobSetCompleted means the job has completed its execution. JobSetCompleted JobSetConditionType = "Completed" // JobSetFailed means the job has failed its execution. @@ -136,9 +134,9 @@ type JobSetStatus struct { // RestartsCountTowardsMax tracks the number of times the JobSet has restarted that counts towards the maximum allowed number of restarts. RestartsCountTowardsMax int32 `json:"restartsCountTowardsMax,omitempty"` - // Phase of the JobSet. - // +kubebuilder:default="Running" - Phase string `json:"phase,omitempty"` + // TerminalState the state of the JobSet when it finishes execution. + // It can be either Complete or Failed. Otherwise, it is empty by default. + TerminalState string `json:"terminalState,omitempty"` // ReplicatedJobsStatus track the number of JobsReady for each replicatedJob. // +optional @@ -175,7 +173,7 @@ type ReplicatedJobStatus struct { // +k8s:openapi-gen=true // +kubebuilder:object:root=true // +kubebuilder:subresource:status -// +kubebuilder:printcolumn:name="Phase",JSONPath=".status.phase",type=string,description="Phase of the JobSet" +// +kubebuilder:printcolumn:name="TerminalState",JSONPath=".status.terminalState",type=string,description="Final state of JobSet" // +kubebuilder:printcolumn:name="Restarts",JSONPath=".status.restarts",type=string,description="Number of restarts" // +kubebuilder:printcolumn:name="Completed",type="string",priority=0,JSONPath=".status.conditions[?(@.type==\"Completed\")].status" // +kubebuilder:printcolumn:name="Suspended",type="string",JSONPath=".spec.suspend",description="JobSet suspended" diff --git a/api/jobset/v1alpha2/openapi_generated.go b/api/jobset/v1alpha2/openapi_generated.go index d1417408..aa06e565 100644 --- a/api/jobset/v1alpha2/openapi_generated.go +++ b/api/jobset/v1alpha2/openapi_generated.go @@ -358,9 +358,9 @@ func schema_jobset_api_jobset_v1alpha2_JobSetStatus(ref common.ReferenceCallback Format: "int32", }, }, - "phase": { + "terminalState": { SchemaProps: spec.SchemaProps{ - Description: "Phase of the JobSet.", + Description: "TerminalState the state of the JobSet when it finishes execution. It can be either Complete or Failed. Otherwise, it is empty by default.", Type: []string{"string"}, Format: "", }, diff --git a/client-go/applyconfiguration/jobset/v1alpha2/jobsetstatus.go b/client-go/applyconfiguration/jobset/v1alpha2/jobsetstatus.go index c504a1a1..d1cbf2f2 100644 --- a/client-go/applyconfiguration/jobset/v1alpha2/jobsetstatus.go +++ b/client-go/applyconfiguration/jobset/v1alpha2/jobsetstatus.go @@ -24,7 +24,7 @@ type JobSetStatusApplyConfiguration struct { Conditions []v1.Condition `json:"conditions,omitempty"` Restarts *int32 `json:"restarts,omitempty"` RestartsCountTowardsMax *int32 `json:"restartsCountTowardsMax,omitempty"` - Phase *string `json:"phase,omitempty"` + TerminalState *string `json:"terminalState,omitempty"` ReplicatedJobsStatus []ReplicatedJobStatusApplyConfiguration `json:"replicatedJobsStatus,omitempty"` } @@ -60,11 +60,11 @@ func (b *JobSetStatusApplyConfiguration) WithRestartsCountTowardsMax(value int32 return b } -// WithPhase sets the Phase field in the declarative configuration to the given value +// WithTerminalState sets the TerminalState field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the Phase field is set to the value of the last call. -func (b *JobSetStatusApplyConfiguration) WithPhase(value string) *JobSetStatusApplyConfiguration { - b.Phase = &value +// If called multiple times, the TerminalState field is set to the value of the last call. +func (b *JobSetStatusApplyConfiguration) WithTerminalState(value string) *JobSetStatusApplyConfiguration { + b.TerminalState = &value return b } diff --git a/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml b/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml index d5ecaf35..52ddc2a5 100644 --- a/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml +++ b/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml @@ -15,9 +15,9 @@ spec: scope: Namespaced versions: - additionalPrinterColumns: - - description: Phase of the JobSet - jsonPath: .status.phase - name: Phase + - description: Final state of JobSet + jsonPath: .status.terminalState + name: TerminalState type: string - description: Number of restarts jsonPath: .status.restarts @@ -8491,10 +8491,6 @@ spec: x-kubernetes-list-map-keys: - type x-kubernetes-list-type: map - phase: - default: Running - description: Phase of the JobSet. - type: string replicatedJobsStatus: description: ReplicatedJobsStatus track the number of JobsReady for each replicatedJob. @@ -8555,6 +8551,11 @@ spec: of restarts. format: int32 type: integer + terminalState: + description: |- + TerminalState the state of the JobSet when it finishes execution. + It can be either Complete or Failed. Otherwise, it is empty by default. + type: string type: object type: object served: true diff --git a/hack/python-sdk/swagger.json b/hack/python-sdk/swagger.json index bbd8247b..dad229c4 100644 --- a/hack/python-sdk/swagger.json +++ b/hack/python-sdk/swagger.json @@ -179,10 +179,6 @@ ], "x-kubernetes-list-type": "map" }, - "phase": { - "description": "Phase of the JobSet.", - "type": "string" - }, "replicatedJobsStatus": { "description": "ReplicatedJobsStatus track the number of JobsReady for each replicatedJob.", "type": "array", @@ -204,6 +200,10 @@ "description": "RestartsCountTowardsMax tracks the number of times the JobSet has restarted that counts towards the maximum allowed number of restarts.", "type": "integer", "format": "int32" + }, + "terminalState": { + "description": "TerminalState the state of the JobSet when it finishes execution. It can be either Complete or Failed. Otherwise, it is empty by default.", + "type": "string" } } }, diff --git a/pkg/controllers/failure_policy.go b/pkg/controllers/failure_policy.go index 5013bc0f..0dc0245d 100644 --- a/pkg/controllers/failure_policy.go +++ b/pkg/controllers/failure_policy.go @@ -250,8 +250,8 @@ func makeFailedConditionOpts(reason, msg string) *conditionOpts { Reason: reason, Message: msg, }, - phase: string(jobset.JobSetFailed), - eventType: corev1.EventTypeWarning, + terminalState: string(jobset.JobSetFailed), + eventType: corev1.EventTypeWarning, } } diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index 428f6347..30cd6a12 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -866,16 +866,16 @@ func enqueueEvent(updateStatusOpts *statusUpdateOpts, event *eventParams) { // function parameters for setCondition type conditionOpts struct { - eventType string - phase string - condition *metav1.Condition + eventType string + terminalState string + condition *metav1.Condition } -// setCondition will add a new condition and phase to the JobSet status (or update an existing one), +// setCondition will add a new condition and terminalState to the JobSet status (or update an existing one), // and enqueue an event for emission if the status update succeeds at the end of the reconcile. func setCondition(js *jobset.JobSet, condOpts *conditionOpts, updateStatusOpts *statusUpdateOpts) { - // Return early if no status update is required for this condition and phase. - if !updateConditionAndPhase(js, condOpts) { + // Return early if no status update is required for this condition and terminalState. + if !updateConditionAndTerminalState(js, condOpts) { return } @@ -898,14 +898,14 @@ func setCondition(js *jobset.JobSet, condOpts *conditionOpts, updateStatusOpts * enqueueEvent(updateStatusOpts, event) } -// updateConditionAndPhase accepts a condition and a phase, and does the following: +// updateConditionAndTerminalState accepts a condition and a terminalState, and does the following: // 1. If an identical condition already exists, do nothing and return false (indicating // no change was made). // 2. If a condition of the same type exists but with a different status, update // the condition in place and return true (indicating a condition change was made). -// 3. If the specified phase is different from the current phase of the JobSet, -// update the JobSet Status Phase -func updateConditionAndPhase(js *jobset.JobSet, opts *conditionOpts) bool { +// 3. If the specified terminalState is different from the current terminalState of JobSet, +// update the JobSet Status TerminalState +func updateConditionAndTerminalState(js *jobset.JobSet, opts *conditionOpts) bool { if opts == nil || opts.condition == nil { return false } @@ -945,9 +945,9 @@ func updateConditionAndPhase(js *jobset.JobSet, opts *conditionOpts) bool { shouldUpdate = true } - // Update the JobSet Status Phase if necessary. - if opts.phase != "" && js.Status.Phase != opts.phase { - js.Status.Phase = opts.phase + // If the jobset is in a terminal state, set the terminal state on the jobset. + if opts.terminalState != "" && js.Status.TerminalState != opts.terminalState { + js.Status.TerminalState = opts.terminalState shouldUpdate = true } @@ -980,7 +980,7 @@ func makeCompletedConditionsOpts() *conditionOpts { Reason: constants.AllJobsCompletedReason, Message: constants.AllJobsCompletedMessage, }, - phase: string(jobset.JobSetCompleted), + terminalState: string(jobset.JobSetCompleted), } } @@ -995,7 +995,6 @@ func makeSuspendedConditionOpts() *conditionOpts { Reason: constants.JobSetSuspendedReason, Message: constants.JobSetSuspendedMessage, }, - phase: string(jobset.JobSetSuspended), } } @@ -1010,7 +1009,6 @@ func makeResumedConditionOpts() *conditionOpts { Reason: constants.JobSetResumedReason, Message: constants.JobSetResumedMessage, }, - phase: string(jobset.JobSetRunning), } } diff --git a/pkg/controllers/jobset_controller_test.go b/pkg/controllers/jobset_controller_test.go index 9eaa7462..c9239340 100644 --- a/pkg/controllers/jobset_controller_test.go +++ b/pkg/controllers/jobset_controller_test.go @@ -696,7 +696,7 @@ func TestUpdateConditions(t *testing.T) { ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName). Job(testutils.MakeJobTemplate(jobName, ns).Obj()). Replicas(1). - Obj()).Phase(jobset.JobSetRunning).Obj(), + Obj()).Obj(), opts: makeCompletedConditionsOpts(), expectedUpdate: true, }, @@ -706,7 +706,7 @@ func TestUpdateConditions(t *testing.T) { ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName). Job(testutils.MakeJobTemplate(jobName, ns).Obj()). Replicas(1). - Obj()).Phase(jobset.JobSetRunning).Obj(), + Obj()).Obj(), opts: makeSuspendedConditionOpts(), expectedUpdate: true, }, @@ -716,7 +716,7 @@ func TestUpdateConditions(t *testing.T) { ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName). Job(testutils.MakeJobTemplate(jobName, ns).Obj()). Replicas(1). - Obj()).Phase(jobset.JobSetRunning). + Obj()). Conditions([]metav1.Condition{ // JobSet is currrently suspended. { @@ -736,7 +736,7 @@ func TestUpdateConditions(t *testing.T) { ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName). Job(testutils.MakeJobTemplate(jobName, ns).Obj()). Replicas(1). - Obj()).Phase(jobset.JobSetCompleted). + Obj()).TerminalState(jobset.JobSetCompleted). Conditions([]metav1.Condition{ // JobSet is completed.. { @@ -752,7 +752,7 @@ func TestUpdateConditions(t *testing.T) { } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - gotUpdate := updateConditionAndPhase(tc.js, tc.opts) + gotUpdate := updateConditionAndTerminalState(tc.js, tc.opts) if gotUpdate != tc.expectedUpdate { t.Errorf("updateCondition return mismatch (want: %v, got %v)", tc.expectedUpdate, gotUpdate) } diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index b36f6d0b..a08eef46 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -159,9 +159,9 @@ func (j *JobSetWrapper) FailedCondition(failedAt metav1.Time) *JobSetWrapper { return j } -// Phase sets the value of JobSet.Status.Phase. -func (j *JobSetWrapper) Phase(phase jobset.JobSetConditionType) *JobSetWrapper { - j.Status.Phase = string(phase) +// TerminalState sets the value of JobSet.Status.TerminalState. +func (j *JobSetWrapper) TerminalState(terminalState jobset.JobSetConditionType) *JobSetWrapper { + j.Status.TerminalState = string(terminalState) return j } diff --git a/sdk/python/docs/JobsetV1alpha2JobSetStatus.md b/sdk/python/docs/JobsetV1alpha2JobSetStatus.md index ca1867a4..f85bde4e 100644 --- a/sdk/python/docs/JobsetV1alpha2JobSetStatus.md +++ b/sdk/python/docs/JobsetV1alpha2JobSetStatus.md @@ -5,10 +5,10 @@ JobSetStatus defines the observed state of JobSet Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **conditions** | [**list[V1Condition]**](V1Condition.md) | | [optional] -**phase** | **str** | Phase of the JobSet. | [optional] **replicated_jobs_status** | [**list[JobsetV1alpha2ReplicatedJobStatus]**](JobsetV1alpha2ReplicatedJobStatus.md) | ReplicatedJobsStatus track the number of JobsReady for each replicatedJob. | [optional] **restarts** | **int** | Restarts tracks the number of times the JobSet has restarted (i.e. recreated in case of RecreateAll policy). | [optional] **restarts_count_towards_max** | **int** | RestartsCountTowardsMax tracks the number of times the JobSet has restarted that counts towards the maximum allowed number of restarts. | [optional] +**terminal_state** | **str** | TerminalState the state of the JobSet when it finishes execution. It can be either Complete or Failed. Otherwise, it is empty by default. | [optional] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/sdk/python/jobset/models/jobset_v1alpha2_job_set_status.py b/sdk/python/jobset/models/jobset_v1alpha2_job_set_status.py index dcdb02c6..d95cfa32 100644 --- a/sdk/python/jobset/models/jobset_v1alpha2_job_set_status.py +++ b/sdk/python/jobset/models/jobset_v1alpha2_job_set_status.py @@ -34,43 +34,43 @@ class JobsetV1alpha2JobSetStatus(object): """ openapi_types = { 'conditions': 'list[V1Condition]', - 'phase': 'str', 'replicated_jobs_status': 'list[JobsetV1alpha2ReplicatedJobStatus]', 'restarts': 'int', - 'restarts_count_towards_max': 'int' + 'restarts_count_towards_max': 'int', + 'terminal_state': 'str' } attribute_map = { 'conditions': 'conditions', - 'phase': 'phase', 'replicated_jobs_status': 'replicatedJobsStatus', 'restarts': 'restarts', - 'restarts_count_towards_max': 'restartsCountTowardsMax' + 'restarts_count_towards_max': 'restartsCountTowardsMax', + 'terminal_state': 'terminalState' } - def __init__(self, conditions=None, phase=None, replicated_jobs_status=None, restarts=None, restarts_count_towards_max=None, local_vars_configuration=None): # noqa: E501 + def __init__(self, conditions=None, replicated_jobs_status=None, restarts=None, restarts_count_towards_max=None, terminal_state=None, local_vars_configuration=None): # noqa: E501 """JobsetV1alpha2JobSetStatus - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration() self.local_vars_configuration = local_vars_configuration self._conditions = None - self._phase = None self._replicated_jobs_status = None self._restarts = None self._restarts_count_towards_max = None + self._terminal_state = None self.discriminator = None if conditions is not None: self.conditions = conditions - if phase is not None: - self.phase = phase if replicated_jobs_status is not None: self.replicated_jobs_status = replicated_jobs_status if restarts is not None: self.restarts = restarts if restarts_count_towards_max is not None: self.restarts_count_towards_max = restarts_count_towards_max + if terminal_state is not None: + self.terminal_state = terminal_state @property def conditions(self): @@ -93,29 +93,6 @@ def conditions(self, conditions): self._conditions = conditions - @property - def phase(self): - """Gets the phase of this JobsetV1alpha2JobSetStatus. # noqa: E501 - - Phase of the JobSet. # noqa: E501 - - :return: The phase of this JobsetV1alpha2JobSetStatus. # noqa: E501 - :rtype: str - """ - return self._phase - - @phase.setter - def phase(self, phase): - """Sets the phase of this JobsetV1alpha2JobSetStatus. - - Phase of the JobSet. # noqa: E501 - - :param phase: The phase of this JobsetV1alpha2JobSetStatus. # noqa: E501 - :type: str - """ - - self._phase = phase - @property def replicated_jobs_status(self): """Gets the replicated_jobs_status of this JobsetV1alpha2JobSetStatus. # noqa: E501 @@ -185,6 +162,29 @@ def restarts_count_towards_max(self, restarts_count_towards_max): self._restarts_count_towards_max = restarts_count_towards_max + @property + def terminal_state(self): + """Gets the terminal_state of this JobsetV1alpha2JobSetStatus. # noqa: E501 + + TerminalState the state of the JobSet when it finishes execution. It can be either Complete or Failed. Otherwise, it is empty by default. # noqa: E501 + + :return: The terminal_state of this JobsetV1alpha2JobSetStatus. # noqa: E501 + :rtype: str + """ + return self._terminal_state + + @terminal_state.setter + def terminal_state(self, terminal_state): + """Sets the terminal_state of this JobsetV1alpha2JobSetStatus. + + TerminalState the state of the JobSet when it finishes execution. It can be either Complete or Failed. Otherwise, it is empty by default. # noqa: E501 + + :param terminal_state: The terminal_state of this JobsetV1alpha2JobSetStatus. # noqa: E501 + :type: str + """ + + self._terminal_state = terminal_state + def to_dict(self): """Returns the model properties as a dict""" result = {} diff --git a/sdk/python/test/test_jobset_v1alpha2_job_set.py b/sdk/python/test/test_jobset_v1alpha2_job_set.py index 07b6ca63..a35e1ef4 100644 --- a/sdk/python/test/test_jobset_v1alpha2_job_set.py +++ b/sdk/python/test/test_jobset_v1alpha2_job_set.py @@ -76,7 +76,6 @@ def make_instance(self, include_optional): conditions = [ None ], - phase = '0', replicated_jobs_status = [ jobset.models.jobset_v1alpha2_replicated_job_status.JobsetV1alpha2ReplicatedJobStatus( active = 56, @@ -87,7 +86,8 @@ def make_instance(self, include_optional): suspended = 56, ) ], restarts = 56, - restarts_count_towards_max = 56, ) + restarts_count_towards_max = 56, + terminal_state = '0', ) ) else : return JobsetV1alpha2JobSet( diff --git a/sdk/python/test/test_jobset_v1alpha2_job_set_list.py b/sdk/python/test/test_jobset_v1alpha2_job_set_list.py index 4caa5513..583c4b73 100644 --- a/sdk/python/test/test_jobset_v1alpha2_job_set_list.py +++ b/sdk/python/test/test_jobset_v1alpha2_job_set_list.py @@ -79,7 +79,6 @@ def make_instance(self, include_optional): conditions = [ None ], - phase = '0', replicated_jobs_status = [ jobset.models.jobset_v1alpha2_replicated_job_status.JobsetV1alpha2ReplicatedJobStatus( active = 56, @@ -90,7 +89,8 @@ def make_instance(self, include_optional): suspended = 56, ) ], restarts = 56, - restarts_count_towards_max = 56, ), ) + restarts_count_towards_max = 56, + terminal_state = '0', ), ) ], kind = '0', metadata = None @@ -137,7 +137,6 @@ def make_instance(self, include_optional): conditions = [ None ], - phase = '0', replicated_jobs_status = [ jobset.models.jobset_v1alpha2_replicated_job_status.JobsetV1alpha2ReplicatedJobStatus( active = 56, @@ -148,7 +147,8 @@ def make_instance(self, include_optional): suspended = 56, ) ], restarts = 56, - restarts_count_towards_max = 56, ), ) + restarts_count_towards_max = 56, + terminal_state = '0', ), ) ], ) diff --git a/sdk/python/test/test_jobset_v1alpha2_job_set_status.py b/sdk/python/test/test_jobset_v1alpha2_job_set_status.py index 867c58dd..345a5b08 100644 --- a/sdk/python/test/test_jobset_v1alpha2_job_set_status.py +++ b/sdk/python/test/test_jobset_v1alpha2_job_set_status.py @@ -41,7 +41,6 @@ def make_instance(self, include_optional): conditions = [ None ], - phase = '0', replicated_jobs_status = [ jobset.models.jobset_v1alpha2_replicated_job_status.JobsetV1alpha2ReplicatedJobStatus( active = 56, @@ -52,7 +51,8 @@ def make_instance(self, include_optional): suspended = 56, ) ], restarts = 56, - restarts_count_towards_max = 56 + restarts_count_towards_max = 56, + terminal_state = '0' ) else : return JobsetV1alpha2JobSetStatus( diff --git a/test/integration/controller/jobset_controller_test.go b/test/integration/controller/jobset_controller_test.go index e6367bc8..46a598a4 100644 --- a/test/integration/controller/jobset_controller_test.go +++ b/test/integration/controller/jobset_controller_test.go @@ -1612,7 +1612,6 @@ var _ = ginkgo.Describe("JobSet controller", func() { LastTransitionTime: metav1.Now(), }, }, - Phase: string(jobset.JobSetRunning), Restarts: 1, ReplicatedJobsStatus: []jobset.ReplicatedJobStatus{ { From 674e80cb6354b860127d7554812a0fd7c071f3e7 Mon Sep 17 00:00:00 2001 From: googs1025 Date: Sun, 30 Jun 2024 08:20:24 +0800 Subject: [PATCH 5/6] set status terminalstate field in setJobSetCompletedCondition and setJobSetFailedCondition --- pkg/controllers/failure_policy.go | 14 ++++++------ pkg/controllers/jobset_controller.go | 28 ++++++++--------------- pkg/controllers/jobset_controller_test.go | 2 +- 3 files changed, 17 insertions(+), 27 deletions(-) diff --git a/pkg/controllers/failure_policy.go b/pkg/controllers/failure_policy.go index 0dc0245d..3b02fa40 100644 --- a/pkg/controllers/failure_policy.go +++ b/pkg/controllers/failure_policy.go @@ -51,7 +51,7 @@ func executeFailurePolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *chi // possible code paths here. firstFailedJob := findFirstFailedJob(ownedJobs.failed) msg := messageWithFirstFailedJob(constants.FailedJobsMessage, firstFailedJob.Name) - setJobSetFailedCondition(ctx, js, constants.FailedJobsReason, msg, updateStatusOpts) + setJobSetFailedCondition(js, constants.FailedJobsReason, msg, updateStatusOpts) return nil } @@ -183,7 +183,7 @@ var failJobSetActionApplier failurePolicyActionApplier = func(ctx context.Contex failureMessage := messageWithFirstFailedJob(failureBaseMessage, matchingFailedJob.Name) failureReason := constants.FailJobSetActionReason - setJobSetFailedCondition(ctx, js, failureReason, failureMessage, updateStatusOpts) + setJobSetFailedCondition(js, failureReason, failureMessage, updateStatusOpts) return nil } @@ -194,7 +194,7 @@ var restartJobSetActionApplier failurePolicyActionApplier = func(ctx context.Con failureMessage := messageWithFirstFailedJob(failureBaseMessage, matchingFailedJob.Name) failureReason := constants.ReachedMaxRestartsReason - setJobSetFailedCondition(ctx, js, failureReason, failureMessage, updateStatusOpts) + setJobSetFailedCondition(js, failureReason, failureMessage, updateStatusOpts) return nil } @@ -250,14 +250,14 @@ func makeFailedConditionOpts(reason, msg string) *conditionOpts { Reason: reason, Message: msg, }, - terminalState: string(jobset.JobSetFailed), - eventType: corev1.EventTypeWarning, + eventType: corev1.EventTypeWarning, } } -// setJobSetFailedCondition sets a condition on the JobSet status indicating it has failed. -func setJobSetFailedCondition(ctx context.Context, js *jobset.JobSet, reason, msg string, updateStatusOpts *statusUpdateOpts) { +// setJobSetFailedCondition sets a condition and terminal state on the JobSet status indicating it has failed. +func setJobSetFailedCondition(js *jobset.JobSet, reason, msg string, updateStatusOpts *statusUpdateOpts) { setCondition(js, makeFailedConditionOpts(reason, msg), updateStatusOpts) + js.Status.TerminalState = string(jobset.JobSetFailed) } // findJobFailureTimeAndReason is a helper function which extracts the Job failure condition from a Job, diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index 30cd6a12..86ce029a 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -866,16 +866,15 @@ func enqueueEvent(updateStatusOpts *statusUpdateOpts, event *eventParams) { // function parameters for setCondition type conditionOpts struct { - eventType string - terminalState string - condition *metav1.Condition + eventType string + condition *metav1.Condition } -// setCondition will add a new condition and terminalState to the JobSet status (or update an existing one), +// setCondition will add a new condition to the JobSet status (or update an existing one), // and enqueue an event for emission if the status update succeeds at the end of the reconcile. func setCondition(js *jobset.JobSet, condOpts *conditionOpts, updateStatusOpts *statusUpdateOpts) { - // Return early if no status update is required for this condition and terminalState. - if !updateConditionAndTerminalState(js, condOpts) { + // Return early if no status update is required for this condition. + if !updateCondition(js, condOpts) { return } @@ -898,14 +897,12 @@ func setCondition(js *jobset.JobSet, condOpts *conditionOpts, updateStatusOpts * enqueueEvent(updateStatusOpts, event) } -// updateConditionAndTerminalState accepts a condition and a terminalState, and does the following: +// updateCondition accepts a given condition and does one of the following: // 1. If an identical condition already exists, do nothing and return false (indicating // no change was made). // 2. If a condition of the same type exists but with a different status, update // the condition in place and return true (indicating a condition change was made). -// 3. If the specified terminalState is different from the current terminalState of JobSet, -// update the JobSet Status TerminalState -func updateConditionAndTerminalState(js *jobset.JobSet, opts *conditionOpts) bool { +func updateCondition(js *jobset.JobSet, opts *conditionOpts) bool { if opts == nil || opts.condition == nil { return false } @@ -944,19 +941,13 @@ func updateConditionAndTerminalState(js *jobset.JobSet, opts *conditionOpts) boo js.Status.Conditions = append(js.Status.Conditions, newCond) shouldUpdate = true } - - // If the jobset is in a terminal state, set the terminal state on the jobset. - if opts.terminalState != "" && js.Status.TerminalState != opts.terminalState { - js.Status.TerminalState = opts.terminalState - shouldUpdate = true - } - return shouldUpdate } -// setJobSetCompletedCondition sets a condition on the JobSet status indicating it has completed. +// setJobSetCompletedCondition sets a condition and terminal state on the JobSet status indicating it has completed. func setJobSetCompletedCondition(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { setCondition(js, makeCompletedConditionsOpts(), updateStatusOpts) + js.Status.TerminalState = string(jobset.JobSetCompleted) } // setJobSetSuspendedCondition sets a condition on the JobSet status indicating it is currently suspended. @@ -980,7 +971,6 @@ func makeCompletedConditionsOpts() *conditionOpts { Reason: constants.AllJobsCompletedReason, Message: constants.AllJobsCompletedMessage, }, - terminalState: string(jobset.JobSetCompleted), } } diff --git a/pkg/controllers/jobset_controller_test.go b/pkg/controllers/jobset_controller_test.go index c9239340..9c7058e7 100644 --- a/pkg/controllers/jobset_controller_test.go +++ b/pkg/controllers/jobset_controller_test.go @@ -752,7 +752,7 @@ func TestUpdateConditions(t *testing.T) { } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - gotUpdate := updateConditionAndTerminalState(tc.js, tc.opts) + gotUpdate := updateCondition(tc.js, tc.opts) if gotUpdate != tc.expectedUpdate { t.Errorf("updateCondition return mismatch (want: %v, got %v)", tc.expectedUpdate, gotUpdate) } From a1c05dc6dd15c14975b19ec4a2bb4ee440b6b2ad Mon Sep 17 00:00:00 2001 From: googs1025 Date: Sun, 30 Jun 2024 19:06:26 +0800 Subject: [PATCH 6/6] verify the terminalState during integration tests --- test/util/util.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/test/util/util.go b/test/util/util.go index c1a53bd8..8cd5fc2a 100644 --- a/test/util/util.go +++ b/test/util/util.go @@ -60,7 +60,9 @@ func JobSetCompleted(ctx context.Context, k8sClient client.Client, js *jobset.Jo Status: metav1.ConditionTrue, }, } + terminalState := string(jobset.JobSetCompleted) gomega.Eventually(checkJobSetStatus, timeout, interval).WithArguments(ctx, k8sClient, js, conditions).Should(gomega.Equal(true)) + gomega.Eventually(checkJobSetTerminalState, timeout, interval).WithArguments(ctx, k8sClient, js, terminalState).Should(gomega.Equal(true)) } func JobSetFailed(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, timeout time.Duration) { @@ -71,7 +73,9 @@ func JobSetFailed(ctx context.Context, k8sClient client.Client, js *jobset.JobSe Status: metav1.ConditionTrue, }, } + terminalState := string(jobset.JobSetFailed) gomega.Eventually(checkJobSetStatus, timeout, interval).WithArguments(ctx, k8sClient, js, conditions).Should(gomega.Equal(true)) + gomega.Eventually(checkJobSetTerminalState, timeout, interval).WithArguments(ctx, k8sClient, js, terminalState).Should(gomega.Equal(true)) } func JobSetSuspended(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, timeout time.Duration) { @@ -142,6 +146,7 @@ func checkJobSetActive(ctx context.Context, k8sClient client.Client, js *jobset. return true, nil } +// checkJobSetStatus check if the JobSet status matches the expected conditions. func checkJobSetStatus(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, conditions []metav1.Condition) (bool, error) { var fetchedJS jobset.JobSet if err := k8sClient.Get(ctx, types.NamespacedName{Namespace: js.Namespace, Name: js.Name}, &fetchedJS); err != nil { @@ -158,6 +163,15 @@ func checkJobSetStatus(ctx context.Context, k8sClient client.Client, js *jobset. return found == len(conditions), nil } +// checkJobSetTerminalState check if the JobSet is in the expected terminal state. +func checkJobSetTerminalState(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, terminalState string) (bool, error) { + var fetchedJS jobset.JobSet + if err := k8sClient.Get(ctx, types.NamespacedName{Namespace: js.Namespace, Name: js.Name}, &fetchedJS); err != nil { + return false, err + } + return fetchedJS.Status.TerminalState == terminalState, nil +} + // DeleteNamespace deletes all objects the tests typically create in the namespace. func DeleteNamespace(ctx context.Context, c client.Client, ns *corev1.Namespace) error { if ns == nil {