Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add terminalState to jobset status #594

Merged
merged 6 commits into from
Jun 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/jobset/v1alpha2/jobset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ type JobSetStatus struct {
// RestartsCountTowardsMax tracks the number of times the JobSet has restarted that counts towards the maximum allowed number of restarts.
RestartsCountTowardsMax int32 `json:"restartsCountTowardsMax,omitempty"`

// TerminalState the state of the JobSet when it finishes execution.
// It can be either Complete or Failed. Otherwise, it is empty by default.
TerminalState string `json:"terminalState,omitempty"`

// ReplicatedJobsStatus track the number of JobsReady for each replicatedJob.
// +optional
// +listType=map
Expand Down Expand Up @@ -169,6 +173,7 @@ type ReplicatedJobStatus struct {
// +k8s:openapi-gen=true
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="TerminalState",JSONPath=".status.terminalState",type=string,description="Final state of JobSet"
// +kubebuilder:printcolumn:name="Restarts",JSONPath=".status.restarts",type=string,description="Number of restarts"
// +kubebuilder:printcolumn:name="Completed",type="string",priority=0,JSONPath=".status.conditions[?(@.type==\"Completed\")].status"
// +kubebuilder:printcolumn:name="Suspended",type="string",JSONPath=".spec.suspend",description="JobSet suspended"
Expand Down
7 changes: 7 additions & 0 deletions api/jobset/v1alpha2/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions client-go/applyconfiguration/jobset/v1alpha2/jobsetstatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ spec:
scope: Namespaced
versions:
- additionalPrinterColumns:
- description: Final state of JobSet
jsonPath: .status.terminalState
name: TerminalState
type: string
- description: Number of restarts
jsonPath: .status.restarts
name: Restarts
Expand Down Expand Up @@ -8547,6 +8551,11 @@ spec:
of restarts.
format: int32
type: integer
terminalState:
description: |-
TerminalState the state of the JobSet when it finishes execution.
It can be either Complete or Failed. Otherwise, it is empty by default.
type: string
type: object
type: object
served: true
Expand Down
4 changes: 4 additions & 0 deletions hack/python-sdk/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@
"description": "RestartsCountTowardsMax tracks the number of times the JobSet has restarted that counts towards the maximum allowed number of restarts.",
"type": "integer",
"format": "int32"
},
"terminalState": {
"description": "TerminalState the state of the JobSet when it finishes execution. It can be either Complete or Failed. Otherwise, it is empty by default.",
"type": "string"
}
}
},
Expand Down
11 changes: 6 additions & 5 deletions pkg/controllers/failure_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func executeFailurePolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *chi
// possible code paths here.
firstFailedJob := findFirstFailedJob(ownedJobs.failed)
msg := messageWithFirstFailedJob(constants.FailedJobsMessage, firstFailedJob.Name)
setJobSetFailedCondition(ctx, js, constants.FailedJobsReason, msg, updateStatusOpts)
setJobSetFailedCondition(js, constants.FailedJobsReason, msg, updateStatusOpts)
return nil
}

Expand Down Expand Up @@ -183,7 +183,7 @@ var failJobSetActionApplier failurePolicyActionApplier = func(ctx context.Contex
failureMessage := messageWithFirstFailedJob(failureBaseMessage, matchingFailedJob.Name)

failureReason := constants.FailJobSetActionReason
setJobSetFailedCondition(ctx, js, failureReason, failureMessage, updateStatusOpts)
setJobSetFailedCondition(js, failureReason, failureMessage, updateStatusOpts)
return nil
}

Expand All @@ -194,7 +194,7 @@ var restartJobSetActionApplier failurePolicyActionApplier = func(ctx context.Con
failureMessage := messageWithFirstFailedJob(failureBaseMessage, matchingFailedJob.Name)

failureReason := constants.ReachedMaxRestartsReason
setJobSetFailedCondition(ctx, js, failureReason, failureMessage, updateStatusOpts)
setJobSetFailedCondition(js, failureReason, failureMessage, updateStatusOpts)
return nil
}

Expand Down Expand Up @@ -254,9 +254,10 @@ func makeFailedConditionOpts(reason, msg string) *conditionOpts {
}
}

// setJobSetFailedCondition sets a condition on the JobSet status indicating it has failed.
func setJobSetFailedCondition(ctx context.Context, js *jobset.JobSet, reason, msg string, updateStatusOpts *statusUpdateOpts) {
// setJobSetFailedCondition sets a condition and terminal state on the JobSet status indicating it has failed.
func setJobSetFailedCondition(js *jobset.JobSet, reason, msg string, updateStatusOpts *statusUpdateOpts) {
setCondition(js, makeFailedConditionOpts(reason, msg), updateStatusOpts)
js.Status.TerminalState = string(jobset.JobSetFailed)
}

// findJobFailureTimeAndReason is a helper function which extracts the Job failure condition from a Job,
Expand Down
11 changes: 6 additions & 5 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (r *JobSetReconciler) reconcile(ctx context.Context, js *jobset.JobSet, upd

// Calculate JobsReady and update statuses for each ReplicatedJob.
rjobStatuses := r.calculateReplicatedJobStatuses(ctx, js, ownedJobs)
updateReplicatedJobsStatuses(ctx, js, rjobStatuses, updateStatusOpts)
updateReplicatedJobsStatuses(js, rjobStatuses, updateStatusOpts)

// If JobSet is already completed or failed, clean up active child jobs and requeue if TTLSecondsAfterFinished is set.
if jobSetFinished(js) {
Expand Down Expand Up @@ -185,7 +185,7 @@ func (r *JobSetReconciler) reconcile(ctx context.Context, js *jobset.JobSet, upd

// If any jobs have succeeded, execute the JobSet success policy.
if len(ownedJobs.successful) > 0 {
if completed := executeSuccessPolicy(ctx, js, ownedJobs, updateStatusOpts); completed {
if completed := executeSuccessPolicy(js, ownedJobs, updateStatusOpts); completed {
return ctrl.Result{}, nil
}
}
Expand Down Expand Up @@ -304,7 +304,7 @@ func (r *JobSetReconciler) getChildJobs(ctx context.Context, js *jobset.JobSet)
}

// updateReplicatedJobsStatuses updates the replicatedJob statuses if they have changed.
func updateReplicatedJobsStatuses(ctx context.Context, js *jobset.JobSet, statuses []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) {
func updateReplicatedJobsStatuses(js *jobset.JobSet, statuses []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) {
// If replicated job statuses haven't changed, there's nothing to do here.
if replicatedJobStatusesEqual(js.Status.ReplicatedJobsStatus, statuses) {
return
Expand Down Expand Up @@ -630,7 +630,7 @@ func (r *JobSetReconciler) createHeadlessSvcIfNecessary(ctx context.Context, js
// executeSuccessPolicy checks the completed jobs against the jobset success policy
// and updates the jobset status to completed if the success policy conditions are met.
// Returns a boolean value indicating if the jobset was completed or not.
func executeSuccessPolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs, updateStatusOpts *statusUpdateOpts) bool {
func executeSuccessPolicy(js *jobset.JobSet, ownedJobs *childJobs, updateStatusOpts *statusUpdateOpts) bool {
if numJobsMatchingSuccessPolicy(js, ownedJobs.successful) >= numJobsExpectedToSucceed(js) {
setJobSetCompletedCondition(js, updateStatusOpts)
return true
Expand Down Expand Up @@ -944,9 +944,10 @@ func updateCondition(js *jobset.JobSet, opts *conditionOpts) bool {
return shouldUpdate
}

// setJobSetCompletedCondition sets a condition on the JobSet status indicating it has completed.
// setJobSetCompletedCondition sets a condition and terminal state on the JobSet status indicating it has completed.
func setJobSetCompletedCondition(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) {
setCondition(js, makeCompletedConditionsOpts(), updateStatusOpts)
js.Status.TerminalState = string(jobset.JobSetCompleted)
}

// setJobSetSuspendedCondition sets a condition on the JobSet status indicating it is currently suspended.
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ func TestUpdateConditions(t *testing.T) {
ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName).
Job(testutils.MakeJobTemplate(jobName, ns).Obj()).
Replicas(1).
Obj()).
Obj()).TerminalState(jobset.JobSetCompleted).
Conditions([]metav1.Condition{
// JobSet is completed..
{
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ func (j *JobSetWrapper) FailedCondition(failedAt metav1.Time) *JobSetWrapper {
return j
}

// TerminalState sets the value of JobSet.Status.TerminalState.
func (j *JobSetWrapper) TerminalState(terminalState jobset.JobSetConditionType) *JobSetWrapper {
j.Status.TerminalState = string(terminalState)
return j
}

func (j *JobSetWrapper) DeletionTimestamp(deletionTimestamp *metav1.Time) *JobSetWrapper {
j.ObjectMeta.DeletionTimestamp = deletionTimestamp
return j
Expand Down
1 change: 1 addition & 0 deletions sdk/python/docs/JobsetV1alpha2JobSetStatus.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Name | Type | Description | Notes
**replicated_jobs_status** | [**list[JobsetV1alpha2ReplicatedJobStatus]**](JobsetV1alpha2ReplicatedJobStatus.md) | ReplicatedJobsStatus track the number of JobsReady for each replicatedJob. | [optional]
**restarts** | **int** | Restarts tracks the number of times the JobSet has restarted (i.e. recreated in case of RecreateAll policy). | [optional]
**restarts_count_towards_max** | **int** | RestartsCountTowardsMax tracks the number of times the JobSet has restarted that counts towards the maximum allowed number of restarts. | [optional]
**terminal_state** | **str** | TerminalState the state of the JobSet when it finishes execution. It can be either Complete or Failed. Otherwise, it is empty by default. | [optional]

[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)

Expand Down
34 changes: 31 additions & 3 deletions sdk/python/jobset/models/jobset_v1alpha2_job_set_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,19 @@ class JobsetV1alpha2JobSetStatus(object):
'conditions': 'list[V1Condition]',
'replicated_jobs_status': 'list[JobsetV1alpha2ReplicatedJobStatus]',
'restarts': 'int',
'restarts_count_towards_max': 'int'
'restarts_count_towards_max': 'int',
'terminal_state': 'str'
}

attribute_map = {
'conditions': 'conditions',
'replicated_jobs_status': 'replicatedJobsStatus',
'restarts': 'restarts',
'restarts_count_towards_max': 'restartsCountTowardsMax'
'restarts_count_towards_max': 'restartsCountTowardsMax',
'terminal_state': 'terminalState'
}

def __init__(self, conditions=None, replicated_jobs_status=None, restarts=None, restarts_count_towards_max=None, local_vars_configuration=None): # noqa: E501
def __init__(self, conditions=None, replicated_jobs_status=None, restarts=None, restarts_count_towards_max=None, terminal_state=None, local_vars_configuration=None): # noqa: E501
"""JobsetV1alpha2JobSetStatus - a model defined in OpenAPI""" # noqa: E501
if local_vars_configuration is None:
local_vars_configuration = Configuration()
Expand All @@ -56,6 +58,7 @@ def __init__(self, conditions=None, replicated_jobs_status=None, restarts=None,
self._replicated_jobs_status = None
self._restarts = None
self._restarts_count_towards_max = None
self._terminal_state = None
self.discriminator = None

if conditions is not None:
Expand All @@ -66,6 +69,8 @@ def __init__(self, conditions=None, replicated_jobs_status=None, restarts=None,
self.restarts = restarts
if restarts_count_towards_max is not None:
self.restarts_count_towards_max = restarts_count_towards_max
if terminal_state is not None:
self.terminal_state = terminal_state

@property
def conditions(self):
Expand Down Expand Up @@ -157,6 +162,29 @@ def restarts_count_towards_max(self, restarts_count_towards_max):

self._restarts_count_towards_max = restarts_count_towards_max

@property
def terminal_state(self):
"""Gets the terminal_state of this JobsetV1alpha2JobSetStatus. # noqa: E501

TerminalState the state of the JobSet when it finishes execution. It can be either Complete or Failed. Otherwise, it is empty by default. # noqa: E501

:return: The terminal_state of this JobsetV1alpha2JobSetStatus. # noqa: E501
:rtype: str
"""
return self._terminal_state

@terminal_state.setter
def terminal_state(self, terminal_state):
"""Sets the terminal_state of this JobsetV1alpha2JobSetStatus.

TerminalState the state of the JobSet when it finishes execution. It can be either Complete or Failed. Otherwise, it is empty by default. # noqa: E501

:param terminal_state: The terminal_state of this JobsetV1alpha2JobSetStatus. # noqa: E501
:type: str
"""

self._terminal_state = terminal_state

def to_dict(self):
"""Returns the model properties as a dict"""
result = {}
Expand Down
3 changes: 2 additions & 1 deletion sdk/python/test/test_jobset_v1alpha2_job_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ def make_instance(self, include_optional):
suspended = 56, )
],
restarts = 56,
restarts_count_towards_max = 56, )
restarts_count_towards_max = 56,
terminal_state = '0', )
)
else :
return JobsetV1alpha2JobSet(
Expand Down
6 changes: 4 additions & 2 deletions sdk/python/test/test_jobset_v1alpha2_job_set_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ def make_instance(self, include_optional):
suspended = 56, )
],
restarts = 56,
restarts_count_towards_max = 56, ), )
restarts_count_towards_max = 56,
terminal_state = '0', ), )
],
kind = '0',
metadata = None
Expand Down Expand Up @@ -146,7 +147,8 @@ def make_instance(self, include_optional):
suspended = 56, )
],
restarts = 56,
restarts_count_towards_max = 56, ), )
restarts_count_towards_max = 56,
terminal_state = '0', ), )
],
)

Expand Down
3 changes: 2 additions & 1 deletion sdk/python/test/test_jobset_v1alpha2_job_set_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ def make_instance(self, include_optional):
suspended = 56, )
],
restarts = 56,
restarts_count_towards_max = 56
restarts_count_towards_max = 56,
terminal_state = '0'
)
else :
return JobsetV1alpha2JobSetStatus(
Expand Down
14 changes: 14 additions & 0 deletions test/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ func JobSetCompleted(ctx context.Context, k8sClient client.Client, js *jobset.Jo
Status: metav1.ConditionTrue,
},
}
terminalState := string(jobset.JobSetCompleted)
gomega.Eventually(checkJobSetStatus, timeout, interval).WithArguments(ctx, k8sClient, js, conditions).Should(gomega.Equal(true))
gomega.Eventually(checkJobSetTerminalState, timeout, interval).WithArguments(ctx, k8sClient, js, terminalState).Should(gomega.Equal(true))
}

func JobSetFailed(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, timeout time.Duration) {
Expand All @@ -71,7 +73,9 @@ func JobSetFailed(ctx context.Context, k8sClient client.Client, js *jobset.JobSe
Status: metav1.ConditionTrue,
},
}
terminalState := string(jobset.JobSetFailed)
gomega.Eventually(checkJobSetStatus, timeout, interval).WithArguments(ctx, k8sClient, js, conditions).Should(gomega.Equal(true))
gomega.Eventually(checkJobSetTerminalState, timeout, interval).WithArguments(ctx, k8sClient, js, terminalState).Should(gomega.Equal(true))
}

func JobSetSuspended(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, timeout time.Duration) {
Expand Down Expand Up @@ -142,6 +146,7 @@ func checkJobSetActive(ctx context.Context, k8sClient client.Client, js *jobset.
return true, nil
}

// checkJobSetStatus check if the JobSet status matches the expected conditions.
func checkJobSetStatus(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, conditions []metav1.Condition) (bool, error) {
var fetchedJS jobset.JobSet
if err := k8sClient.Get(ctx, types.NamespacedName{Namespace: js.Namespace, Name: js.Name}, &fetchedJS); err != nil {
Expand All @@ -158,6 +163,15 @@ func checkJobSetStatus(ctx context.Context, k8sClient client.Client, js *jobset.
return found == len(conditions), nil
}

// checkJobSetTerminalState check if the JobSet is in the expected terminal state.
func checkJobSetTerminalState(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, terminalState string) (bool, error) {
var fetchedJS jobset.JobSet
if err := k8sClient.Get(ctx, types.NamespacedName{Namespace: js.Namespace, Name: js.Name}, &fetchedJS); err != nil {
return false, err
}
return fetchedJS.Status.TerminalState == terminalState, nil
}

// DeleteNamespace deletes all objects the tests typically create in the namespace.
func DeleteNamespace(ctx context.Context, c client.Client, ns *corev1.Namespace) error {
if ns == nil {
Expand Down