From 0283adfc126efe06151a323aea17c72f72953fab Mon Sep 17 00:00:00 2001 From: Justin Edwins Date: Fri, 19 Apr 2024 22:32:33 +0000 Subject: [PATCH] Implement configurable failure policy. --- api/jobset/v1alpha2/jobset_types.go | 54 ++- api/jobset/v1alpha2/openapi_generated.go | 81 ++++ api/jobset/v1alpha2/zz_generated.deepcopy.go | 34 +- .../jobset/v1alpha2/failurepolicy.go | 16 +- .../jobset/v1alpha2/failurepolicyrule.go | 61 +++ .../jobset/v1alpha2/jobsetstatus.go | 15 +- client-go/applyconfiguration/utils.go | 2 + .../crd/bases/jobset.x-k8s.io_jobsets.yaml | 47 +++ hack/python-sdk/swagger.json | 45 +++ pkg/controllers/failure_policy.go | 243 ++++++++++++ pkg/controllers/failure_policy_test.go | 278 ++++++++++++++ pkg/controllers/jobset_controller.go | 61 +-- pkg/controllers/jobset_controller_test.go | 34 +- pkg/webhooks/jobset_webhook.go | 30 ++ pkg/webhooks/jobset_webhook_test.go | 68 ++++ sdk/python/README.md | 1 + .../docs/JobsetV1alpha2FailurePolicy.md | 1 + .../docs/JobsetV1alpha2FailurePolicyRule.md | 13 + sdk/python/docs/JobsetV1alpha2JobSetStatus.md | 1 + sdk/python/jobset/__init__.py | 1 + sdk/python/jobset/models/__init__.py | 1 + .../models/jobset_v1alpha2_failure_policy.py | 34 +- .../jobset_v1alpha2_failure_policy_rule.py | 180 +++++++++ .../models/jobset_v1alpha2_job_set_status.py | 34 +- .../test_jobset_v1alpha2_failure_policy.py | 12 +- ...est_jobset_v1alpha2_failure_policy_rule.py | 64 ++++ .../test/test_jobset_v1alpha2_job_set.py | 20 +- .../test/test_jobset_v1alpha2_job_set_list.py | 40 +- .../test/test_jobset_v1alpha2_job_set_spec.py | 12 +- .../test_jobset_v1alpha2_job_set_status.py | 3 +- .../controller/jobset_controller_test.go | 362 +++++++++++++++++- 31 files changed, 1765 insertions(+), 83 deletions(-) create mode 100644 client-go/applyconfiguration/jobset/v1alpha2/failurepolicyrule.go create mode 100644 pkg/controllers/failure_policy.go create mode 100644 pkg/controllers/failure_policy_test.go create mode 100644 sdk/python/docs/JobsetV1alpha2FailurePolicyRule.md create mode 100644 sdk/python/jobset/models/jobset_v1alpha2_failure_policy_rule.py create mode 100644 sdk/python/test/test_jobset_v1alpha2_failure_policy_rule.py diff --git a/api/jobset/v1alpha2/jobset_types.go b/api/jobset/v1alpha2/jobset_types.go index e8efd9a0..53e562cb 100644 --- a/api/jobset/v1alpha2/jobset_types.go +++ b/api/jobset/v1alpha2/jobset_types.go @@ -22,10 +22,11 @@ import ( const ( JobSetNameKey string = "jobset.sigs.k8s.io/jobset-name" ReplicatedJobReplicas string = "jobset.sigs.k8s.io/replicatedjob-replicas" - ReplicatedJobNameKey string = "jobset.sigs.k8s.io/replicatedjob-name" - JobIndexKey string = "jobset.sigs.k8s.io/job-index" - JobKey string = "jobset.sigs.k8s.io/job-key" - JobNameKey string = "job-name" // TODO(#26): Migrate to the fully qualified label name. + // ReplicatedJobNameKey is used to index into a Jobs labels and retrieve the name of the parent ReplicatedJob + ReplicatedJobNameKey string = "jobset.sigs.k8s.io/replicatedjob-name" + JobIndexKey string = "jobset.sigs.k8s.io/job-index" + JobKey string = "jobset.sigs.k8s.io/job-key" + JobNameKey string = "job-name" // TODO(#26): Migrate to the fully qualified label name. // ExclusiveKey is an annotation that can be set on the JobSet or on a ReplicatedJob template. // If set at the JobSet level, all child jobs from all ReplicatedJobs will be scheduled using exclusive // job placement per topology group (defined as the label value). @@ -119,6 +120,9 @@ type JobSetStatus struct { // Restarts tracks the number of times the JobSet has restarted (i.e. recreated in case of RecreateAll policy). Restarts int32 `json:"restarts,omitempty"` + // RestartsCountTowardsMax tracks the number of times the JobSet has restarted that counts towards the maximum allowed number of restarts. + RestartsCountTowardsMax int32 `json:"restartsCountTowardsMax,omitempty"` + // ReplicatedJobsStatus track the number of JobsReady for each replicatedJob. // +optional // +listType=map @@ -218,10 +222,52 @@ const ( OperatorAny Operator = "Any" ) +// FailurePolicyAction defines the action the JobSet controller will take for +// a given FailurePolicyRule. +type FailurePolicyAction string + +const ( + // Fail the JobSet immediately, regardless of maxRestarts. + FailJobSet FailurePolicyAction = "FailJobSet" + + // Restart the JobSet if the number of restart attempts is less than MaxRestarts. + // Otherwise, fail the JobSet. + RestartJobSet FailurePolicyAction = "RestartJobSet" + + // Do not count the failure against maxRestarts. + RestartJobSetAndIgnoreMaxRestarts FailurePolicyAction = "RestartJobSetAndIgnoreMaxRestarts" +) + +// FailurePolicyRule defines a FailurePolicyAction to be executed if a child job +// fails due to a reason listed in OnJobFailureReasons. +type FailurePolicyRule struct { + // The action to take if the rule is matched. + // +kubebuilder:validation:Enum:=FailJobSet;RestartJobSet;RestartJobSetAndIgnoreMaxRestarts + Action FailurePolicyAction `json:"action"` + // The requirement on the job failure reasons. The requirement + // is satisfied if at least one reason matches the list. + // The rules are evaluated in order, and the first matching + // rule is executed. + // An empty list applies the rule to any job failure reason. + // +kubebuilder:validation:UniqueItems:true + OnJobFailureReasons []string `json:"onJobFailureReasons"` + // TargetReplicatedJobs are the names of the replicated jobs the operator applies to. + // An empty list will apply to all replicatedJobs. + // +optional + // +listType=atomic + TargetReplicatedJobs []string `json:"targetReplicatedJobs,omitempty"` +} + type FailurePolicy struct { // MaxRestarts defines the limit on the number of JobSet restarts. // A restart is achieved by recreating all active child jobs. MaxRestarts int32 `json:"maxRestarts,omitempty"` + + // List of failure policy rules for this JobSet. + // For a given Job failure, the rules will be evaluated in order, + // and only the first matching rule will be executed. + // If no matching rule is found, the RestartJobSet action is applied. + Rules []FailurePolicyRule `json:"rules,omitempty"` } type SuccessPolicy struct { diff --git a/api/jobset/v1alpha2/openapi_generated.go b/api/jobset/v1alpha2/openapi_generated.go index 58c809d2..a6ee583e 100644 --- a/api/jobset/v1alpha2/openapi_generated.go +++ b/api/jobset/v1alpha2/openapi_generated.go @@ -25,6 +25,7 @@ import ( func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenAPIDefinition { return map[string]common.OpenAPIDefinition{ "sigs.k8s.io/jobset/api/jobset/v1alpha2.FailurePolicy": schema_jobset_api_jobset_v1alpha2_FailurePolicy(ref), + "sigs.k8s.io/jobset/api/jobset/v1alpha2.FailurePolicyRule": schema_jobset_api_jobset_v1alpha2_FailurePolicyRule(ref), "sigs.k8s.io/jobset/api/jobset/v1alpha2.JobSet": schema_jobset_api_jobset_v1alpha2_JobSet(ref), "sigs.k8s.io/jobset/api/jobset/v1alpha2.JobSetList": schema_jobset_api_jobset_v1alpha2_JobSetList(ref), "sigs.k8s.io/jobset/api/jobset/v1alpha2.JobSetSpec": schema_jobset_api_jobset_v1alpha2_JobSetSpec(ref), @@ -50,7 +51,80 @@ func schema_jobset_api_jobset_v1alpha2_FailurePolicy(ref common.ReferenceCallbac Format: "int32", }, }, + "rules": { + SchemaProps: spec.SchemaProps{ + Description: "List of failure policy rules for this JobSet. For a given Job failure, the rules will be evaluated in order, and only the first matching rule will be executed. If no matching rule is found, the RestartJobSet action is applied.", + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("sigs.k8s.io/jobset/api/jobset/v1alpha2.FailurePolicyRule"), + }, + }, + }, + }, + }, + }, + }, + }, + Dependencies: []string{ + "sigs.k8s.io/jobset/api/jobset/v1alpha2.FailurePolicyRule"}, + } +} + +func schema_jobset_api_jobset_v1alpha2_FailurePolicyRule(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "FailurePolicyRule defines a FailurePolicyAction to be executed if a child job fails due to a reason listed in OnJobFailureReasons.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "action": { + SchemaProps: spec.SchemaProps{ + Description: "The action to take if the rule is matched.", + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + "onJobFailureReasons": { + SchemaProps: spec.SchemaProps{ + Description: "The requirement on the job failure reasons. The requirement is satisfied if at least one reason matches the list. The rules are evaluated in order, and the first matching rule is executed. An empty list applies the rule to any job failure reason.", + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, + "targetReplicatedJobs": { + VendorExtensible: spec.VendorExtensible{ + Extensions: spec.Extensions{ + "x-kubernetes-list-type": "atomic", + }, + }, + SchemaProps: spec.SchemaProps{ + Description: "TargetReplicatedJobs are the names of the replicated jobs the operator applies to. An empty list will apply to all replicatedJobs.", + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, }, + Required: []string{"action", "onJobFailureReasons"}, }, }, } @@ -269,6 +343,13 @@ func schema_jobset_api_jobset_v1alpha2_JobSetStatus(ref common.ReferenceCallback Format: "int32", }, }, + "restartsCountTowardsMax": { + SchemaProps: spec.SchemaProps{ + Description: "RestartsCountTowardsMax tracks the number of times the JobSet has restarted that counts towards the maximum allowed number of restarts.", + Type: []string{"integer"}, + Format: "int32", + }, + }, "replicatedJobsStatus": { VendorExtensible: spec.VendorExtensible{ Extensions: spec.Extensions{ diff --git a/api/jobset/v1alpha2/zz_generated.deepcopy.go b/api/jobset/v1alpha2/zz_generated.deepcopy.go index 7bc6f25b..3262f2a9 100644 --- a/api/jobset/v1alpha2/zz_generated.deepcopy.go +++ b/api/jobset/v1alpha2/zz_generated.deepcopy.go @@ -25,6 +25,13 @@ import ( // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *FailurePolicy) DeepCopyInto(out *FailurePolicy) { *out = *in + if in.Rules != nil { + in, out := &in.Rules, &out.Rules + *out = make([]FailurePolicyRule, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FailurePolicy. @@ -37,6 +44,31 @@ func (in *FailurePolicy) DeepCopy() *FailurePolicy { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FailurePolicyRule) DeepCopyInto(out *FailurePolicyRule) { + *out = *in + if in.OnJobFailureReasons != nil { + in, out := &in.OnJobFailureReasons, &out.OnJobFailureReasons + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.TargetReplicatedJobs != nil { + in, out := &in.TargetReplicatedJobs, &out.TargetReplicatedJobs + *out = make([]string, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FailurePolicyRule. +func (in *FailurePolicyRule) DeepCopy() *FailurePolicyRule { + if in == nil { + return nil + } + out := new(FailurePolicyRule) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *JobSet) DeepCopyInto(out *JobSet) { *out = *in @@ -119,7 +151,7 @@ func (in *JobSetSpec) DeepCopyInto(out *JobSetSpec) { if in.FailurePolicy != nil { in, out := &in.FailurePolicy, &out.FailurePolicy *out = new(FailurePolicy) - **out = **in + (*in).DeepCopyInto(*out) } if in.StartupPolicy != nil { in, out := &in.StartupPolicy, &out.StartupPolicy diff --git a/client-go/applyconfiguration/jobset/v1alpha2/failurepolicy.go b/client-go/applyconfiguration/jobset/v1alpha2/failurepolicy.go index 9d80d448..82b6756e 100644 --- a/client-go/applyconfiguration/jobset/v1alpha2/failurepolicy.go +++ b/client-go/applyconfiguration/jobset/v1alpha2/failurepolicy.go @@ -17,7 +17,8 @@ package v1alpha2 // FailurePolicyApplyConfiguration represents an declarative configuration of the FailurePolicy type for use // with apply. type FailurePolicyApplyConfiguration struct { - MaxRestarts *int32 `json:"maxRestarts,omitempty"` + MaxRestarts *int32 `json:"maxRestarts,omitempty"` + Rules []FailurePolicyRuleApplyConfiguration `json:"rules,omitempty"` } // FailurePolicyApplyConfiguration constructs an declarative configuration of the FailurePolicy type for use with @@ -33,3 +34,16 @@ func (b *FailurePolicyApplyConfiguration) WithMaxRestarts(value int32) *FailureP b.MaxRestarts = &value return b } + +// WithRules adds the given value to the Rules field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the Rules field. +func (b *FailurePolicyApplyConfiguration) WithRules(values ...*FailurePolicyRuleApplyConfiguration) *FailurePolicyApplyConfiguration { + for i := range values { + if values[i] == nil { + panic("nil value passed to WithRules") + } + b.Rules = append(b.Rules, *values[i]) + } + return b +} diff --git a/client-go/applyconfiguration/jobset/v1alpha2/failurepolicyrule.go b/client-go/applyconfiguration/jobset/v1alpha2/failurepolicyrule.go new file mode 100644 index 00000000..9bec63bf --- /dev/null +++ b/client-go/applyconfiguration/jobset/v1alpha2/failurepolicyrule.go @@ -0,0 +1,61 @@ +/* +Copyright 2023 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1alpha2 + +import ( + v1alpha2 "sigs.k8s.io/jobset/api/jobset/v1alpha2" +) + +// FailurePolicyRuleApplyConfiguration represents an declarative configuration of the FailurePolicyRule type for use +// with apply. +type FailurePolicyRuleApplyConfiguration struct { + Action *v1alpha2.FailurePolicyAction `json:"action,omitempty"` + OnJobFailureReasons []string `json:"onJobFailureReasons,omitempty"` + TargetReplicatedJobs []string `json:"targetReplicatedJobs,omitempty"` +} + +// FailurePolicyRuleApplyConfiguration constructs an declarative configuration of the FailurePolicyRule type for use with +// apply. +func FailurePolicyRule() *FailurePolicyRuleApplyConfiguration { + return &FailurePolicyRuleApplyConfiguration{} +} + +// WithAction sets the Action field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Action field is set to the value of the last call. +func (b *FailurePolicyRuleApplyConfiguration) WithAction(value v1alpha2.FailurePolicyAction) *FailurePolicyRuleApplyConfiguration { + b.Action = &value + return b +} + +// WithOnJobFailureReasons adds the given value to the OnJobFailureReasons field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the OnJobFailureReasons field. +func (b *FailurePolicyRuleApplyConfiguration) WithOnJobFailureReasons(values ...string) *FailurePolicyRuleApplyConfiguration { + for i := range values { + b.OnJobFailureReasons = append(b.OnJobFailureReasons, values[i]) + } + return b +} + +// WithTargetReplicatedJobs adds the given value to the TargetReplicatedJobs field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the TargetReplicatedJobs field. +func (b *FailurePolicyRuleApplyConfiguration) WithTargetReplicatedJobs(values ...string) *FailurePolicyRuleApplyConfiguration { + for i := range values { + b.TargetReplicatedJobs = append(b.TargetReplicatedJobs, values[i]) + } + return b +} diff --git a/client-go/applyconfiguration/jobset/v1alpha2/jobsetstatus.go b/client-go/applyconfiguration/jobset/v1alpha2/jobsetstatus.go index 3b18fb01..f31e5d99 100644 --- a/client-go/applyconfiguration/jobset/v1alpha2/jobsetstatus.go +++ b/client-go/applyconfiguration/jobset/v1alpha2/jobsetstatus.go @@ -21,9 +21,10 @@ import ( // JobSetStatusApplyConfiguration represents an declarative configuration of the JobSetStatus type for use // with apply. type JobSetStatusApplyConfiguration struct { - Conditions []v1.Condition `json:"conditions,omitempty"` - Restarts *int32 `json:"restarts,omitempty"` - ReplicatedJobsStatus []ReplicatedJobStatusApplyConfiguration `json:"replicatedJobsStatus,omitempty"` + Conditions []v1.Condition `json:"conditions,omitempty"` + Restarts *int32 `json:"restarts,omitempty"` + RestartsCountTowardsMax *int32 `json:"restartsCountTowardsMax,omitempty"` + ReplicatedJobsStatus []ReplicatedJobStatusApplyConfiguration `json:"replicatedJobsStatus,omitempty"` } // JobSetStatusApplyConfiguration constructs an declarative configuration of the JobSetStatus type for use with @@ -50,6 +51,14 @@ func (b *JobSetStatusApplyConfiguration) WithRestarts(value int32) *JobSetStatus return b } +// WithRestartsCountTowardsMax sets the RestartsCountTowardsMax field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the RestartsCountTowardsMax field is set to the value of the last call. +func (b *JobSetStatusApplyConfiguration) WithRestartsCountTowardsMax(value int32) *JobSetStatusApplyConfiguration { + b.RestartsCountTowardsMax = &value + return b +} + // WithReplicatedJobsStatus adds the given value to the ReplicatedJobsStatus field in the declarative configuration // and returns the receiver, so that objects can be build by chaining "With" function invocations. // If called multiple times, values provided by each call will be appended to the ReplicatedJobsStatus field. diff --git a/client-go/applyconfiguration/utils.go b/client-go/applyconfiguration/utils.go index c79d195f..99de9197 100644 --- a/client-go/applyconfiguration/utils.go +++ b/client-go/applyconfiguration/utils.go @@ -27,6 +27,8 @@ func ForKind(kind schema.GroupVersionKind) interface{} { // Group=jobset.x-k8s.io, Version=v1alpha2 case v1alpha2.SchemeGroupVersion.WithKind("FailurePolicy"): return &jobsetv1alpha2.FailurePolicyApplyConfiguration{} + case v1alpha2.SchemeGroupVersion.WithKind("FailurePolicyRule"): + return &jobsetv1alpha2.FailurePolicyRuleApplyConfiguration{} case v1alpha2.SchemeGroupVersion.WithKind("JobSet"): return &jobsetv1alpha2.JobSetApplyConfiguration{} case v1alpha2.SchemeGroupVersion.WithKind("JobSetSpec"): diff --git a/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml b/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml index e0f3fc6a..e4d796ad 100644 --- a/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml +++ b/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml @@ -68,6 +68,47 @@ spec: A restart is achieved by recreating all active child jobs. format: int32 type: integer + rules: + description: |- + List of failure policy rules for this JobSet. + For a given Job failure, the rules will be evaluated in order, + and only the first matching rule will be executed. + If no matching rule is found, the RestartJobSet action is applied. + items: + description: |- + FailurePolicyRule defines a FailurePolicyAction to be executed if a child job + fails due to a reason listed in OnJobFailureReasons. + properties: + action: + description: The action to take if the rule is matched. + enum: + - FailJobSet + - RestartJobSet + - RestartJobSetAndIgnoreMaxRestarts + type: string + onJobFailureReasons: + description: |- + The requirement on the job failure reasons. The requirement + is satisfied if at least one reason matches the list. + The rules are evaluated in order, and the first matching + rule is executed. + An empty list applies the rule to any job failure reason. + items: + type: string + type: array + targetReplicatedJobs: + description: |- + TargetReplicatedJobs are the names of the replicated jobs the operator applies to. + An empty list will apply to all replicatedJobs. + items: + type: string + type: array + x-kubernetes-list-type: atomic + required: + - action + - onJobFailureReasons + type: object + type: array type: object x-kubernetes-validations: - message: Value is immutable @@ -8482,6 +8523,12 @@ spec: (i.e. recreated in case of RecreateAll policy). format: int32 type: integer + restartsCountTowardsMax: + description: RestartsCountTowardsMax tracks the number of times the + JobSet has restarted that counts towards the maximum allowed number + of restarts. + format: int32 + type: integer type: object type: object served: true diff --git a/hack/python-sdk/swagger.json b/hack/python-sdk/swagger.json index 0bcad8ec..d6f7bedf 100644 --- a/hack/python-sdk/swagger.json +++ b/hack/python-sdk/swagger.json @@ -14,6 +14,46 @@ "description": "MaxRestarts defines the limit on the number of JobSet restarts. A restart is achieved by recreating all active child jobs.", "type": "integer", "format": "int32" + }, + "rules": { + "description": "List of failure policy rules for this JobSet. For a given Job failure, the rules will be evaluated in order, and only the first matching rule will be executed. If no matching rule is found, the RestartJobSet action is applied.", + "type": "array", + "items": { + "default": {}, + "$ref": "#/definitions/jobset.v1alpha2.FailurePolicyRule" + } + } + } + }, + "jobset.v1alpha2.FailurePolicyRule": { + "description": "FailurePolicyRule defines a FailurePolicyAction to be executed if a child job fails due to a reason listed in OnJobFailureReasons.", + "type": "object", + "required": [ + "action", + "onJobFailureReasons" + ], + "properties": { + "action": { + "description": "The action to take if the rule is matched.", + "type": "string", + "default": "" + }, + "onJobFailureReasons": { + "description": "The requirement on the job failure reasons. The requirement is satisfied if at least one reason matches the list. The rules are evaluated in order, and the first matching rule is executed. An empty list applies the rule to any job failure reason.", + "type": "array", + "items": { + "type": "string", + "default": "" + } + }, + "targetReplicatedJobs": { + "description": "TargetReplicatedJobs are the names of the replicated jobs the operator applies to. An empty list will apply to all replicatedJobs.", + "type": "array", + "items": { + "type": "string", + "default": "" + }, + "x-kubernetes-list-type": "atomic" } } }, @@ -149,6 +189,11 @@ "description": "Restarts tracks the number of times the JobSet has restarted (i.e. recreated in case of RecreateAll policy).", "type": "integer", "format": "int32" + }, + "restartsCountTowardsMax": { + "description": "RestartsCountTowardsMax tracks the number of times the JobSet has restarted that counts towards the maximum allowed number of restarts.", + "type": "integer", + "format": "int32" } } }, diff --git a/pkg/controllers/failure_policy.go b/pkg/controllers/failure_policy.go new file mode 100644 index 00000000..bf555365 --- /dev/null +++ b/pkg/controllers/failure_policy.go @@ -0,0 +1,243 @@ +/* +Copyright 2023 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + "slices" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + + jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" + "sigs.k8s.io/jobset/pkg/constants" +) + +// actionFunctionMap relates jobset failure policy action names to the appropriate behavior during jobset reconciliation. +var actionFunctionMap = map[jobset.FailurePolicyAction]failurePolicyActionApplier{ + jobset.FailJobSet: failJobSetActionApplier, + jobset.RestartJobSet: restartJobSetActionApplier, + jobset.RestartJobSetAndIgnoreMaxRestarts: restartJobSetAndIgnoreMaxRestartsActionApplier, +} + +// If the failure policy rule is not nil, then the functions returns: +// - true if the rule is applicable to the jobFailureReason; +// - false other. +// If the rule is nil, we return true. +func ruleIsApplicable(rule *jobset.FailurePolicyRule, failedJob *batchv1.Job, jobFailureReason string) bool { + if rule == nil { + return true + } + + ruleAppliesToJobFailureReason := len(rule.OnJobFailureReasons) == 0 || slices.Contains(rule.OnJobFailureReasons, jobFailureReason) + if !ruleAppliesToJobFailureReason { + return false + } + + parentReplicatedJob, exists := parentReplicatedJobName(failedJob) + if !exists { + // If we cannot find the parent ReplicatedJob, we assume the rule does not apply. + // TODO: Add a log statement that the failedJob does not appear to have a parent replicated job. + // This error should not happen, but was a pain to debug when adding unit tests. + return false + } + + ruleAppliesToParentReplicatedJob := len(rule.TargetReplicatedJobs) == 0 || slices.Contains(rule.TargetReplicatedJobs, parentReplicatedJob) + return ruleAppliesToParentReplicatedJob +} + +// The function findFirstFailedPolicyRuleAndJob returns the first failure policy rule matching a failed child job. +// The function also returns the first child job matching the failure policy rule returned. +// If there does not exist a matching failure policy rule, the function returns nil for the failure policy rule +// accompanied with the first failing job. This function assumes that the jobset has a non nil failure policy. +func findFirstFailedPolicyRuleAndJob(js *jobset.JobSet, failedOwnedJobs []*batchv1.Job) (*jobset.FailurePolicyRule, *batchv1.Job) { + rulesExist := len(js.Spec.FailurePolicy.Rules) > 0 + if !rulesExist { + firstFailedJob := findFirstFailedJob(failedOwnedJobs) + return nil, firstFailedJob + } + + // These variables are only to make the ensuing lines of code shorter. + rules := js.Spec.FailurePolicy.Rules + numRules := len(js.Spec.FailurePolicy.Rules) + + // bucket[i] corresponds to js.Spec.FailurePolicy.Rules[i] + type bucket struct { + firstFailedJob *batchv1.Job + firstFailureTime *metav1.Time + } + + // We make a bucket for each rule and then an extra bucket to represent the default rule + numBuckets := numRules + 1 + defaultRuleIndex := numRules + var buckets = make([]bucket, numRules+1) + + for _, failedJob := range failedOwnedJobs { + jobFailureCondition := findJobFailureCondition(failedJob) + // This means that the Job has not failed. + if jobFailureCondition == nil { + continue + } + + jobFailureTime, jobFailureReason := ptr.To(jobFailureCondition.LastTransitionTime), jobFailureCondition.Reason + for i := 0; i < numBuckets; i++ { + // We use nil to represent the default rule that + // applies to all job failure reasons. + var rule *jobset.FailurePolicyRule + if i < numRules { + rule = ptr.To(rules[i]) + } + if ruleIsApplicable(rule, failedJob, jobFailureReason) && (buckets[i].firstFailureTime == nil || jobFailureTime.Before(buckets[i].firstFailureTime)) { + buckets[i].firstFailedJob = failedJob + buckets[i].firstFailureTime = jobFailureTime + } + } + } + + // Checking if any failure policy rules were matched + // and returning the rule along with the first + // failed job to match it. + for i := 0; i < numRules; i++ { + if buckets[i].firstFailedJob != nil { + return &rules[i], buckets[i].firstFailedJob + } + } + + // We get here when no rule matched any of the failure policy rules + fmt.Println("never found a matching rule and returning nil to represent the default rule.") + return nil, buckets[defaultRuleIndex].firstFailedJob +} + +// recreateAll triggers a JobSet restart for the next reconcillation loop. +func failurePolicyRecreateAll(ctx context.Context, js *jobset.JobSet, shouldCountTowardsMax bool, updateStatusOpts *statusUpdateOpts) { + log := ctrl.LoggerFrom(ctx) + + // Increment JobSet restarts. This will trigger reconciliation and result in deletions + // of old jobs not part of the current jobSet run. + js.Status.Restarts += 1 + + if shouldCountTowardsMax { + js.Status.RestartsCountTowardsMax += 1 + } + + updateStatusOpts.shouldUpdate = true + + // Emit event for each JobSet restarts for observability and debugability. + enqueueEvent(updateStatusOpts, &eventParams{ + object: js, + eventType: corev1.EventTypeWarning, + eventReason: fmt.Sprintf("restarting jobset, attempt %d", js.Status.Restarts), + }) + log.V(2).Info("attempting restart", "restart attempt", js.Status.Restarts) +} + +// parentReplicatedJobName returns the name of the parent +// ReplicatedJob and true if it is able to retrieve the parent. +// The empty string and false are returned otherwise. +func parentReplicatedJobName(job *batchv1.Job) (string, bool) { + if job == nil { + return "", false + } + + replicatedJobName, ok := job.Labels[jobset.ReplicatedJobNameKey] + replicatedJobNameIsUnset := !ok || replicatedJobName == "" + return replicatedJobName, !replicatedJobNameIsUnset +} + +// The type failurePolicyActionApplier applies a FailurePolicyAction and returns nil if the FailurePolicyAction was successfully applied. +// The function returns an error otherwise. +type failurePolicyActionApplier = func(ctx context.Context, js *jobset.JobSet, matchingFailedJob *batchv1.Job, updateStatusOpts *statusUpdateOpts) error + +// failJobSetActionApplier applies the FailJobSet FailurePolicyAction +var failJobSetActionApplier failurePolicyActionApplier = func(ctx context.Context, js *jobset.JobSet, matchingFailedJob *batchv1.Job, updateStatusOpts *statusUpdateOpts) error { + failureMessage := messageWithFirstFailedJob(constants.ReachedMaxRestartsMessage, matchingFailedJob.Name) + setJobSetFailedCondition(ctx, js, constants.ReachedMaxRestartsReason, failureMessage, updateStatusOpts) + return nil +} + +// restartJobSetActionApplier applies the RestartJobSet FailurePolicyAction +var restartJobSetActionApplier failurePolicyActionApplier = func(ctx context.Context, js *jobset.JobSet, matchingFailedJob *batchv1.Job, updateStatusOpts *statusUpdateOpts) error { + if js.Status.RestartsCountTowardsMax >= js.Spec.FailurePolicy.MaxRestarts { + failureMessage := messageWithFirstFailedJob(constants.ReachedMaxRestartsMessage, matchingFailedJob.Name) + setJobSetFailedCondition(ctx, js, constants.ReachedMaxRestartsReason, failureMessage, updateStatusOpts) + return nil + } + + shouldCountTowardsMax := true + failurePolicyRecreateAll(ctx, js, shouldCountTowardsMax, updateStatusOpts) + return nil +} + +// restartJobSetAndIgnoreMaxRestartsActionApplier applies the RestartJobSetAndIgnoreMaxRestarts FailurePolicyAction +var restartJobSetAndIgnoreMaxRestartsActionApplier failurePolicyActionApplier = func(ctx context.Context, js *jobset.JobSet, matchingFailedJob *batchv1.Job, updateStatusOpts *statusUpdateOpts) error { + shouldCountTowardsMax := false + failurePolicyRecreateAll(ctx, js, shouldCountTowardsMax, updateStatusOpts) + return nil +} + +// applyFailurePolicyRuleAction applies the supplied FailurePolicyRuleAction. +func applyFailurePolicyRuleAction(ctx context.Context, js *jobset.JobSet, matchingFailedJob *batchv1.Job, updateStatusOps *statusUpdateOpts, failurePolicyRuleAction jobset.FailurePolicyAction) error { + log := ctrl.LoggerFrom(ctx) + + applier, ok := actionFunctionMap[failurePolicyRuleAction] + if !ok { + err := fmt.Errorf("failed to find a corresponding action for the FailurePolicyRuleAction %v", failurePolicyRuleAction) + log.Error(err, "retrieving information for FailurePolicyRuleAction") + return err + } + + err := applier(ctx, js, matchingFailedJob, updateStatusOps) + if err != nil { + log.Error(err, "error applying the FailurePolicyRuleAction: %v", failurePolicyRuleAction) + return err + } + + return nil +} + +// executeFailurePolicy applies the Failure Policy of a JobSet when a failed child Job is found. +// This function is run only when a failed child job has already been found. +func executeFailurePolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs, updateStatusOpts *statusUpdateOpts) error { + log := ctrl.LoggerFrom(ctx) + + // If no failure policy is defined, mark the JobSet as failed. + if js.Spec.FailurePolicy == nil { + // firstFailedJob is only computed if necessary since it is expensive to compute + // for JobSets with many child jobs. This is why we don't unconditionally compute + // it once at the beginning of the function and share the results between the different + // possible code paths here. + firstFailedJob := findFirstFailedJob(ownedJobs.failed) + setJobSetFailedCondition(ctx, js, constants.FailedJobsReason, messageWithFirstFailedJob(constants.FailedJobsMessage, firstFailedJob.Name), updateStatusOpts) + return nil + } + + // Check for matching Failure Policy Rule + failurePolicyRule, matchingFailedJob := findFirstFailedPolicyRuleAndJob(js, ownedJobs.failed) + + failurePolicyRuleAction := jobset.RestartJobSet + if failurePolicyRule != nil { + failurePolicyRuleAction = failurePolicyRule.Action + } + + if err := applyFailurePolicyRuleAction(ctx, js, matchingFailedJob, updateStatusOpts, failurePolicyRuleAction); err != nil { + log.Error(err, "applying FailurePolicyRuleAction %v", failurePolicyRuleAction) + return err + } + + return nil +} diff --git a/pkg/controllers/failure_policy_test.go b/pkg/controllers/failure_policy_test.go new file mode 100644 index 00000000..83824a35 --- /dev/null +++ b/pkg/controllers/failure_policy_test.go @@ -0,0 +1,278 @@ +/* +Copyright 2023 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "testing" + "time" + + "github.com/google/go-cmp/cmp" + batchv1 "k8s.io/api/batch/v1" + "k8s.io/utils/ptr" + + jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" + testutils "sigs.k8s.io/jobset/pkg/util/testing" +) + +func TestFailurePolicyRuleIsApplicable(t *testing.T) { + var ( + replicatedJobName1 = "test-replicated-job-1" + replicatedJobName2 = "test-replicated-job-2" + jobName = "test-job" + ns = "default" + ) + + tests := []struct { + name string + rule *jobset.FailurePolicyRule + failedJob *batchv1.Job + jobFailureReason string + expected bool + }{ + { + name: "failure policy rule is nil", + expected: true, + }, + { + name: "failure policy rule matches on job failure reason", + rule: &jobset.FailurePolicyRule{ + OnJobFailureReasons: []string{batchv1.JobReasonBackoffLimitExceeded}, + TargetReplicatedJobs: []string{replicatedJobName1}, + }, + failedJob: testutils.MakeJob(jobName, ns).JobLabels( + map[string]string{jobset.ReplicatedJobNameKey: replicatedJobName1}, + ).Obj(), + jobFailureReason: batchv1.JobReasonBackoffLimitExceeded, + expected: true, + }, + { + name: "failure policy rule matches all on job failure reason", + rule: &jobset.FailurePolicyRule{ + TargetReplicatedJobs: []string{replicatedJobName1}, + }, + failedJob: testutils.MakeJob(jobName, ns).JobLabels( + map[string]string{jobset.ReplicatedJobNameKey: replicatedJobName1}, + ).Obj(), + jobFailureReason: batchv1.JobReasonMaxFailedIndexesExceeded, + expected: true, + }, + { + name: "failure policy rule does not match on job failure reason", + rule: &jobset.FailurePolicyRule{ + OnJobFailureReasons: []string{batchv1.JobReasonBackoffLimitExceeded}, + TargetReplicatedJobs: []string{replicatedJobName1}, + }, + failedJob: testutils.MakeJob(jobName, ns).JobLabels( + map[string]string{jobset.ReplicatedJobNameKey: replicatedJobName1}, + ).Obj(), + jobFailureReason: batchv1.JobReasonDeadlineExceeded, + expected: false, + }, + { + name: "failure policy rule is not applicable to parent replicatedJob of failed job", + rule: &jobset.FailurePolicyRule{ + OnJobFailureReasons: []string{batchv1.JobReasonBackoffLimitExceeded}, + TargetReplicatedJobs: []string{replicatedJobName1}, + }, + jobFailureReason: batchv1.JobReasonBackoffLimitExceeded, + failedJob: testutils.MakeJob(jobName, ns).JobLabels( + map[string]string{jobset.ReplicatedJobNameKey: replicatedJobName2}, + ).Obj(), + expected: false, + }, + { + name: "failure policy rule is applicable to all replicatedjobs when targetedReplicatedJobs is omitted", + rule: &jobset.FailurePolicyRule{ + OnJobFailureReasons: []string{batchv1.JobReasonBackoffLimitExceeded}, + }, + failedJob: testutils.MakeJob(jobName, ns).JobLabels( + map[string]string{jobset.ReplicatedJobNameKey: replicatedJobName1}, + ).Obj(), + jobFailureReason: batchv1.JobReasonBackoffLimitExceeded, + expected: true, + }, + { + name: "failure policy rule is applicable to parent replicatedJob when targetedReplicatedJobs is specified", + rule: &jobset.FailurePolicyRule{ + OnJobFailureReasons: []string{batchv1.JobReasonBackoffLimitExceeded}, + TargetReplicatedJobs: []string{replicatedJobName1}, + }, + failedJob: testutils.MakeJob(jobName, ns).JobLabels( + map[string]string{jobset.ReplicatedJobNameKey: replicatedJobName1}, + ).Obj(), + jobFailureReason: batchv1.JobReasonBackoffLimitExceeded, + expected: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + actual := ruleIsApplicable(tc.rule, tc.failedJob, tc.jobFailureReason) + if diff := cmp.Diff(tc.expected, actual); diff != "" { + t.Errorf("unexpected finished value (+got/-want): %s", diff) + } + }) + } +} + +func TestFindFirstFailedPolicyRuleAndJob(t *testing.T) { + var ( + replicatedJobName = "test-replicatedJob" + jobSetName = "test-jobset" + ns = "default" + + failedJobNoReason1 = jobWithFailedCondition("job1", time.Now().Add(-6*time.Hour)) + failedJobNoReason2 = jobWithFailedCondition("job2", time.Now().Add(-3*time.Hour)) + failedJobNoReason3 = jobWithFailedCondition("job2", time.Now().Add(-1*time.Hour)) + + failedJob1 = jobWithFailedConditionAndOpts("job1", time.Now().Add(-6*time.Hour), + &failJobOptions{ + reason: ptr.To(batchv1.JobReasonBackoffLimitExceeded), + parentReplicatedJobName: ptr.To(replicatedJobName), + }, + ) + failedJob2 = jobWithFailedConditionAndOpts("job2", time.Now().Add(-3*time.Hour), + &failJobOptions{ + reason: ptr.To(batchv1.JobReasonDeadlineExceeded), + parentReplicatedJobName: ptr.To(replicatedJobName), + }, + ) + failedJob3 = jobWithFailedConditionAndOpts("job3", time.Now().Add(-1*time.Hour), + &failJobOptions{ + reason: ptr.To(batchv1.JobReasonFailedIndexes), + parentReplicatedJobName: ptr.To(replicatedJobName), + }, + ) + + // ruleN matches failedJobN + rule1 = jobset.FailurePolicyRule{ + Action: jobset.RestartJobSet, + OnJobFailureReasons: []string{batchv1.JobReasonBackoffLimitExceeded}, + } + rule2 = jobset.FailurePolicyRule{ + Action: jobset.RestartJobSet, + OnJobFailureReasons: []string{batchv1.JobReasonDeadlineExceeded}, + } + + unmatchedRule = jobset.FailurePolicyRule{ + Action: jobset.RestartJobSet, + OnJobFailureReasons: []string{batchv1.JobReasonMaxFailedIndexesExceeded, batchv1.JobReasonPodFailurePolicy}, + } + + extraFailedJob = jobWithFailedConditionAndOpts("extra-job1", time.Now().Add(3*time.Hour), + &failJobOptions{ + reason: ptr.To(batchv1.JobReasonDeadlineExceeded), + parentReplicatedJobName: ptr.To(replicatedJobName), + }, + ) + ) + tests := []struct { + name string + js *jobset.JobSet + failedOwnedJobs []*batchv1.Job + + expectedFailurePolicyRule *jobset.FailurePolicyRule + expectedJob *batchv1.Job + }{ + { + name: "failure policy rules are empty with no failed jobs", + js: testutils.MakeJobSet(jobSetName, ns).FailurePolicy(&jobset.FailurePolicy{}).Obj(), + failedOwnedJobs: []*batchv1.Job{}, + + expectedFailurePolicyRule: nil, + expectedJob: nil, + }, + { + name: "failure policy rules are empty with one failed job", + js: testutils.MakeJobSet(jobSetName, ns).FailurePolicy(&jobset.FailurePolicy{}).Obj(), + failedOwnedJobs: []*batchv1.Job{ + failedJobNoReason1, + }, + + expectedFailurePolicyRule: nil, + expectedJob: failedJobNoReason1, + }, + { + name: "failure policy rules are empty with multiple failed jobs", + js: testutils.MakeJobSet(jobSetName, ns).FailurePolicy(&jobset.FailurePolicy{}).Obj(), + failedOwnedJobs: []*batchv1.Job{failedJobNoReason3, failedJobNoReason1, failedJobNoReason2}, + + expectedFailurePolicyRule: nil, + expectedJob: failedJobNoReason1, + }, + { + name: "failure policy rule does not match on job failure reasons", + js: testutils.MakeJobSet(jobSetName, ns).FailurePolicy(&jobset.FailurePolicy{ + Rules: []jobset.FailurePolicyRule{unmatchedRule}, + }).Obj(), + failedOwnedJobs: []*batchv1.Job{failedJob3, failedJob1, failedJob2}, + + expectedFailurePolicyRule: nil, + expectedJob: failedJob1, + }, + { + name: "failure policy rule matches first job to fail out of all jobs", + js: testutils.MakeJobSet(jobSetName, ns).FailurePolicy(&jobset.FailurePolicy{ + Rules: []jobset.FailurePolicyRule{rule1}, + }).Obj(), + failedOwnedJobs: []*batchv1.Job{failedJob3, failedJob1, failedJob2}, + + expectedFailurePolicyRule: &rule1, + expectedJob: failedJob1, + }, + { + name: "failure policy rule matches second job to fail out of all jobs", + js: testutils.MakeJobSet(jobSetName, ns).FailurePolicy(&jobset.FailurePolicy{ + Rules: []jobset.FailurePolicyRule{rule2}, + }).Obj(), + failedOwnedJobs: []*batchv1.Job{failedJob3, failedJob1, failedJob2}, + + expectedFailurePolicyRule: &rule2, + expectedJob: failedJob2, + }, + { + name: "failure policy rule matches multiple jobs and first failed job is the last one", + js: testutils.MakeJobSet(jobSetName, ns).FailurePolicy(&jobset.FailurePolicy{ + Rules: []jobset.FailurePolicyRule{rule2}, + }).Obj(), + failedOwnedJobs: []*batchv1.Job{extraFailedJob, failedJob3, failedJob1, failedJob2}, + + expectedFailurePolicyRule: &rule2, + expectedJob: failedJob2, + }, + { + name: "first failed job that matches a failure policy rule is different from the first job to fail that matches the first matched failure policy rule", + js: testutils.MakeJobSet(jobSetName, ns).FailurePolicy(&jobset.FailurePolicy{ + Rules: []jobset.FailurePolicyRule{rule2, rule1}, + }).Obj(), + // failedJob1 is the first failedJob1 but does not match rule2 which is the first failure policy rule to be matched + failedOwnedJobs: []*batchv1.Job{extraFailedJob, failedJob3, failedJob1, failedJob2}, + + expectedFailurePolicyRule: &rule2, + expectedJob: failedJob2, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + actualRule, actualJob := findFirstFailedPolicyRuleAndJob(tc.js, tc.failedOwnedJobs) + if diff := cmp.Diff(tc.expectedJob, actualJob); diff != "" { + t.Errorf("unexpected finished value (+got/-want): %s", diff) + } + if diff := cmp.Diff(tc.expectedFailurePolicyRule, actualRule); diff != "" { + t.Errorf("unexpected finished value (+got/-want): %s", diff) + } + }) + } +} diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index 7bcacaa3..6f5bc251 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -167,7 +167,10 @@ func (r *JobSetReconciler) reconcile(ctx context.Context, js *jobset.JobSet, upd // If any jobs have failed, execute the JobSet failure policy (if any). if len(ownedJobs.failed) > 0 { - executeFailurePolicy(ctx, js, ownedJobs, updateStatusOpts) + if err := executeFailurePolicy(ctx, js, ownedJobs, updateStatusOpts); err != nil { + log.Error(err, "executing failure policy") + return ctrl.Result{}, err + } return ctrl.Result{}, nil } @@ -603,46 +606,6 @@ func executeSuccessPolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *chi return false } -func executeFailurePolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs, updateStatusOpts *statusUpdateOpts) { - // If no failure policy is defined, mark the JobSet as failed. - if js.Spec.FailurePolicy == nil { - // firstFailedJob is only computed if necessary since it is expensive to compute - // for JobSets with many child jobs. This is why we don't unconditionally compute - // it once at the beginning of the function and share the results between the different - // possible code paths here. - firstFailedJob := findFirstFailedJob(ownedJobs.failed) - setJobSetFailedCondition(ctx, js, constants.FailedJobsReason, messageWithFirstFailedJob(constants.FailedJobsMessage, firstFailedJob.Name), updateStatusOpts) - return - } - - // If JobSet has reached max restarts, fail the JobSet. - if js.Status.Restarts >= js.Spec.FailurePolicy.MaxRestarts { - firstFailedJob := findFirstFailedJob(ownedJobs.failed) - setJobSetFailedCondition(ctx, js, constants.ReachedMaxRestartsReason, messageWithFirstFailedJob(constants.ReachedMaxRestartsMessage, firstFailedJob.Name), updateStatusOpts) - return - } - - // To reach this point a job must have failed. - failurePolicyRecreateAll(ctx, js, updateStatusOpts) -} - -func failurePolicyRecreateAll(ctx context.Context, js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) { - log := ctrl.LoggerFrom(ctx) - - // Increment JobSet restarts. This will trigger reconciliation and result in deletions - // of old jobs not part of the current jobSet run. - js.Status.Restarts += 1 - updateStatusOpts.shouldUpdate = true - - // Emit event for each JobSet restarts for observability and debugability. - enqueueEvent(updateStatusOpts, &eventParams{ - object: js, - eventType: corev1.EventTypeWarning, - eventReason: fmt.Sprintf("restarting jobset, attempt %d", js.Status.Restarts), - }) - log.V(2).Info("attempting restart", "restart attempt", js.Status.Restarts) -} - func constructJobsFromTemplate(js *jobset.JobSet, rjob *jobset.ReplicatedJob, ownedJobs *childJobs) ([]*batchv1.Job, error) { var jobs []*batchv1.Job for jobIdx := 0; jobIdx < int(rjob.Replicas); jobIdx++ { @@ -874,21 +837,31 @@ func findFirstFailedJob(failedJobs []*batchv1.Job) *batchv1.Job { return firstFailedJob } -// findJobFailureTime is a helper function which extracts the Job failure time from a Job, +// findJobFailureTimeAndReason is a helper function which extracts the Job failure condition from a Job, // if the JobFailed condition exists and is true. -func findJobFailureTime(job *batchv1.Job) *metav1.Time { +func findJobFailureCondition(job *batchv1.Job) *batchv1.JobCondition { if job == nil { return nil } for _, c := range job.Status.Conditions { // If this Job failed before the oldest known Job failiure, update the first failed job. if c.Type == batchv1.JobFailed && c.Status == corev1.ConditionTrue { - return &c.LastTransitionTime + return &c } } return nil } +// findJobFailureTime is a helper function which extracts the Job failure time from a Job, +// if the JobFailed condition exists and is true. +func findJobFailureTime(job *batchv1.Job) *metav1.Time { + failureCondition := findJobFailureCondition(job) + if failureCondition == nil { + return nil + } + return &failureCondition.LastTransitionTime +} + // managedByExternalController returns a pointer to the name of the external controller managing // the JobSet, if one exists. Otherwise, it returns nil. func managedByExternalController(js *jobset.JobSet) *string { diff --git a/pkg/controllers/jobset_controller_test.go b/pkg/controllers/jobset_controller_test.go index 7d00e35a..c4bc3f5c 100644 --- a/pkg/controllers/jobset_controller_test.go +++ b/pkg/controllers/jobset_controller_test.go @@ -1158,18 +1158,48 @@ func TestFindFirstFailedJob(t *testing.T) { // Helper function to create a job object with a failed condition func jobWithFailedCondition(name string, failureTime time.Time) *batchv1.Job { - return &batchv1.Job{ - ObjectMeta: metav1.ObjectMeta{Name: name}, + return jobWithFailedConditionAndOpts(name, failureTime, nil) +} + +type failJobOptions struct { + reason *string + parentReplicatedJobName *string +} + +// Helper function to create a job object with a failed condition +func jobWithFailedConditionAndOpts(name string, failureTime time.Time, opts *failJobOptions) *batchv1.Job { + var reason string + labels := make(map[string]string) + applyOpts := func() { + if opts == nil { + return + } + + if opts.reason != nil { + reason = *opts.reason + } + + if opts.parentReplicatedJobName != nil { + labels[jobset.ReplicatedJobNameKey] = *opts.parentReplicatedJobName + } + } + applyOpts() + + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{Name: name, Labels: labels}, Status: batchv1.JobStatus{ Conditions: []batchv1.JobCondition{ { Type: batchv1.JobFailed, Status: corev1.ConditionTrue, LastTransitionTime: metav1.NewTime(failureTime), + Reason: reason, }, }, }, } + + return job } type makeJobArgs struct { diff --git a/pkg/webhooks/jobset_webhook.go b/pkg/webhooks/jobset_webhook.go index e26cd114..edef50df 100644 --- a/pkg/webhooks/jobset_webhook.go +++ b/pkg/webhooks/jobset_webhook.go @@ -61,6 +61,17 @@ const ( subdomainTooLongErrMsg = ".spec.network.subdomain is too long, must be less than 63 characters" ) +// validOnJobFailureReasons stores supported values of the reason field of the condition of +// a failed job. See https://github.com/kubernetes/api/blob/2676848ed8201866119a94759a2d525ffc7396c0/batch/v1/types.go#L632 +// for more details. +var validOnJobFailureReasons = []string{ + batchv1.JobReasonBackoffLimitExceeded, + batchv1.JobReasonDeadlineExceeded, + batchv1.JobReasonFailedIndexes, + batchv1.JobReasonMaxFailedIndexesExceeded, + batchv1.JobReasonPodFailurePolicy, +} + //+kubebuilder:webhook:path=/mutate-jobset-x-k8s-io-v1alpha2-jobset,mutating=true,failurePolicy=fail,sideEffects=None,groups=jobset.x-k8s.io,resources=jobsets,verbs=create;update,versions=v1alpha2,name=mjobset.kb.io,admissionReviewVersions=v1 // jobSetWebhook for defaulting and admission. @@ -207,6 +218,25 @@ func (j *jobSetWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) allErrs = append(allErrs, fmt.Errorf("invalid replicatedJob name '%s' does not appear in .spec.ReplicatedJobs", rjobName)) } } + + // Validate failure policy + if js.Spec.FailurePolicy != nil { + for _, rule := range js.Spec.FailurePolicy.Rules { + // Validate the rules target replicated jobs are valid + for _, rjobName := range rule.TargetReplicatedJobs { + if !collections.Contains(validReplicatedJobs, rjobName) { + allErrs = append(allErrs, fmt.Errorf("invalid replicatedJob name '%s' in failure policy does not appear in .spec.ReplicatedJobs", rjobName)) + } + } + + // Validate the rules on job failure reasons are valid + for _, failureReason := range rule.OnJobFailureReasons { + if !collections.Contains(validOnJobFailureReasons, failureReason) { + allErrs = append(allErrs, fmt.Errorf("invalid job failure reason '%s' in failure policy is not a recognized job failure reason", failureReason)) + } + } + } + } return nil, errors.Join(allErrs...) } diff --git a/pkg/webhooks/jobset_webhook_test.go b/pkg/webhooks/jobset_webhook_test.go index 52cbcd80..9068bf90 100644 --- a/pkg/webhooks/jobset_webhook_test.go +++ b/pkg/webhooks/jobset_webhook_test.go @@ -969,6 +969,74 @@ func TestValidateCreate(t *testing.T) { }, want: errors.Join(), }, + { + name: "jobset failure policy has an invalid on job failure reason", + js: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ + FailurePolicy: &jobset.FailurePolicy{ + MaxRestarts: 1, + Rules: []jobset.FailurePolicyRule{ + { + Action: jobset.FailJobSet, + OnJobFailureReasons: []string{"fakeReason"}, + }, + }, + }, + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "rj", + Replicas: 1, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + CompletionMode: ptr.To(batchv1.IndexedCompletion), + Completions: ptr.To(int32(1)), + Parallelism: ptr.To(int32(1)), + }, + }, + }, + }, + SuccessPolicy: &jobset.SuccessPolicy{}, + }, + }, + want: errors.Join( + fmt.Errorf("invalid job failure reason '%s' in failure policy is not a recognized job failure reason", "fakeReason"), + ), + }, + { + name: "jobset failure policy has an invalid replicated job", + js: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ + FailurePolicy: &jobset.FailurePolicy{ + MaxRestarts: 1, + Rules: []jobset.FailurePolicyRule{ + { + Action: jobset.FailJobSet, + TargetReplicatedJobs: []string{"fakeReplicatedJob"}, + }, + }, + }, + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "rj", + Replicas: 1, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + CompletionMode: ptr.To(batchv1.IndexedCompletion), + Completions: ptr.To(int32(1)), + Parallelism: ptr.To(int32(1)), + }, + }, + }, + }, + SuccessPolicy: &jobset.SuccessPolicy{}, + }, + }, + want: errors.Join( + fmt.Errorf("invalid replicatedJob name '%s' in failure policy does not appear in .spec.ReplicatedJobs", "fakeReplicatedJob"), + ), + }, } fakeClient := fake.NewFakeClient() webhook, err := NewJobSetWebhook(fakeClient) diff --git a/sdk/python/README.md b/sdk/python/README.md index affd3472..9d6faf71 100644 --- a/sdk/python/README.md +++ b/sdk/python/README.md @@ -65,6 +65,7 @@ Class | Method | HTTP request | Description ## Documentation For Models - [JobsetV1alpha2FailurePolicy](docs/JobsetV1alpha2FailurePolicy.md) + - [JobsetV1alpha2FailurePolicyRule](docs/JobsetV1alpha2FailurePolicyRule.md) - [JobsetV1alpha2JobSet](docs/JobsetV1alpha2JobSet.md) - [JobsetV1alpha2JobSetList](docs/JobsetV1alpha2JobSetList.md) - [JobsetV1alpha2JobSetSpec](docs/JobsetV1alpha2JobSetSpec.md) diff --git a/sdk/python/docs/JobsetV1alpha2FailurePolicy.md b/sdk/python/docs/JobsetV1alpha2FailurePolicy.md index 4df59ee1..e7b5a2d6 100644 --- a/sdk/python/docs/JobsetV1alpha2FailurePolicy.md +++ b/sdk/python/docs/JobsetV1alpha2FailurePolicy.md @@ -4,6 +4,7 @@ Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **max_restarts** | **int** | MaxRestarts defines the limit on the number of JobSet restarts. A restart is achieved by recreating all active child jobs. | [optional] +**rules** | [**list[JobsetV1alpha2FailurePolicyRule]**](JobsetV1alpha2FailurePolicyRule.md) | List of failure policy rules for this JobSet. For a given Job failure, the rules will be evaluated in order, and only the first matching rule will be executed. If no matching rule is found, the RestartJobSet action is applied. | [optional] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/sdk/python/docs/JobsetV1alpha2FailurePolicyRule.md b/sdk/python/docs/JobsetV1alpha2FailurePolicyRule.md new file mode 100644 index 00000000..1b98164a --- /dev/null +++ b/sdk/python/docs/JobsetV1alpha2FailurePolicyRule.md @@ -0,0 +1,13 @@ +# JobsetV1alpha2FailurePolicyRule + +FailurePolicyRule defines a FailurePolicyAction to be executed if a child job fails due to a reason listed in OnJobFailureReasons. +## Properties +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**action** | **str** | The action to take if the rule is matched. | [default to ''] +**on_job_failure_reasons** | **list[str]** | The requirement on the job failure reasons. The requirement is satisfied if at least one reason matches the list. The rules are evaluated in order, and the first matching rule is executed. An empty list applies the rule to any job failure reason. | +**target_replicated_jobs** | **list[str]** | TargetReplicatedJobs are the names of the replicated jobs the operator applies to. An empty list will apply to all replicatedJobs. | [optional] + +[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) + + diff --git a/sdk/python/docs/JobsetV1alpha2JobSetStatus.md b/sdk/python/docs/JobsetV1alpha2JobSetStatus.md index b5262dab..3a223368 100644 --- a/sdk/python/docs/JobsetV1alpha2JobSetStatus.md +++ b/sdk/python/docs/JobsetV1alpha2JobSetStatus.md @@ -7,6 +7,7 @@ Name | Type | Description | Notes **conditions** | [**list[V1Condition]**](V1Condition.md) | | [optional] **replicated_jobs_status** | [**list[JobsetV1alpha2ReplicatedJobStatus]**](JobsetV1alpha2ReplicatedJobStatus.md) | ReplicatedJobsStatus track the number of JobsReady for each replicatedJob. | [optional] **restarts** | **int** | Restarts tracks the number of times the JobSet has restarted (i.e. recreated in case of RecreateAll policy). | [optional] +**restarts_count_towards_max** | **int** | RestartsCountTowardsMax tracks the number of times the JobSet has restarted that counts towards the maximum allowed number of restarts. | [optional] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/sdk/python/jobset/__init__.py b/sdk/python/jobset/__init__.py index 591721ac..4f0e78d6 100644 --- a/sdk/python/jobset/__init__.py +++ b/sdk/python/jobset/__init__.py @@ -28,6 +28,7 @@ from jobset.exceptions import ApiException # import models into sdk package from jobset.models.jobset_v1alpha2_failure_policy import JobsetV1alpha2FailurePolicy +from jobset.models.jobset_v1alpha2_failure_policy_rule import JobsetV1alpha2FailurePolicyRule from jobset.models.jobset_v1alpha2_job_set import JobsetV1alpha2JobSet from jobset.models.jobset_v1alpha2_job_set_list import JobsetV1alpha2JobSetList from jobset.models.jobset_v1alpha2_job_set_spec import JobsetV1alpha2JobSetSpec diff --git a/sdk/python/jobset/models/__init__.py b/sdk/python/jobset/models/__init__.py index c3632357..26cb70c1 100644 --- a/sdk/python/jobset/models/__init__.py +++ b/sdk/python/jobset/models/__init__.py @@ -18,6 +18,7 @@ # import models into model package from jobset.models.jobset_v1alpha2_failure_policy import JobsetV1alpha2FailurePolicy +from jobset.models.jobset_v1alpha2_failure_policy_rule import JobsetV1alpha2FailurePolicyRule from jobset.models.jobset_v1alpha2_job_set import JobsetV1alpha2JobSet from jobset.models.jobset_v1alpha2_job_set_list import JobsetV1alpha2JobSetList from jobset.models.jobset_v1alpha2_job_set_spec import JobsetV1alpha2JobSetSpec diff --git a/sdk/python/jobset/models/jobset_v1alpha2_failure_policy.py b/sdk/python/jobset/models/jobset_v1alpha2_failure_policy.py index 5b57e6ca..0e768064 100644 --- a/sdk/python/jobset/models/jobset_v1alpha2_failure_policy.py +++ b/sdk/python/jobset/models/jobset_v1alpha2_failure_policy.py @@ -33,24 +33,29 @@ class JobsetV1alpha2FailurePolicy(object): and the value is json key in definition. """ openapi_types = { - 'max_restarts': 'int' + 'max_restarts': 'int', + 'rules': 'list[JobsetV1alpha2FailurePolicyRule]' } attribute_map = { - 'max_restarts': 'maxRestarts' + 'max_restarts': 'maxRestarts', + 'rules': 'rules' } - def __init__(self, max_restarts=None, local_vars_configuration=None): # noqa: E501 + def __init__(self, max_restarts=None, rules=None, local_vars_configuration=None): # noqa: E501 """JobsetV1alpha2FailurePolicy - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration() self.local_vars_configuration = local_vars_configuration self._max_restarts = None + self._rules = None self.discriminator = None if max_restarts is not None: self.max_restarts = max_restarts + if rules is not None: + self.rules = rules @property def max_restarts(self): @@ -75,6 +80,29 @@ def max_restarts(self, max_restarts): self._max_restarts = max_restarts + @property + def rules(self): + """Gets the rules of this JobsetV1alpha2FailurePolicy. # noqa: E501 + + List of failure policy rules for this JobSet. For a given Job failure, the rules will be evaluated in order, and only the first matching rule will be executed. If no matching rule is found, the RestartJobSet action is applied. # noqa: E501 + + :return: The rules of this JobsetV1alpha2FailurePolicy. # noqa: E501 + :rtype: list[JobsetV1alpha2FailurePolicyRule] + """ + return self._rules + + @rules.setter + def rules(self, rules): + """Sets the rules of this JobsetV1alpha2FailurePolicy. + + List of failure policy rules for this JobSet. For a given Job failure, the rules will be evaluated in order, and only the first matching rule will be executed. If no matching rule is found, the RestartJobSet action is applied. # noqa: E501 + + :param rules: The rules of this JobsetV1alpha2FailurePolicy. # noqa: E501 + :type: list[JobsetV1alpha2FailurePolicyRule] + """ + + self._rules = rules + def to_dict(self): """Returns the model properties as a dict""" result = {} diff --git a/sdk/python/jobset/models/jobset_v1alpha2_failure_policy_rule.py b/sdk/python/jobset/models/jobset_v1alpha2_failure_policy_rule.py new file mode 100644 index 00000000..3d5871d4 --- /dev/null +++ b/sdk/python/jobset/models/jobset_v1alpha2_failure_policy_rule.py @@ -0,0 +1,180 @@ +# coding: utf-8 + +""" + JobSet SDK + + Python SDK for the JobSet API # noqa: E501 + + The version of the OpenAPI document: v0.1.4 + Generated by: https://openapi-generator.tech +""" + + +import pprint +import re # noqa: F401 + +import six + +from jobset.configuration import Configuration + + +class JobsetV1alpha2FailurePolicyRule(object): + """NOTE: This class is auto generated by OpenAPI Generator. + Ref: https://openapi-generator.tech + + Do not edit the class manually. + """ + + """ + Attributes: + openapi_types (dict): The key is attribute name + and the value is attribute type. + attribute_map (dict): The key is attribute name + and the value is json key in definition. + """ + openapi_types = { + 'action': 'str', + 'on_job_failure_reasons': 'list[str]', + 'target_replicated_jobs': 'list[str]' + } + + attribute_map = { + 'action': 'action', + 'on_job_failure_reasons': 'onJobFailureReasons', + 'target_replicated_jobs': 'targetReplicatedJobs' + } + + def __init__(self, action='', on_job_failure_reasons=None, target_replicated_jobs=None, local_vars_configuration=None): # noqa: E501 + """JobsetV1alpha2FailurePolicyRule - a model defined in OpenAPI""" # noqa: E501 + if local_vars_configuration is None: + local_vars_configuration = Configuration() + self.local_vars_configuration = local_vars_configuration + + self._action = None + self._on_job_failure_reasons = None + self._target_replicated_jobs = None + self.discriminator = None + + self.action = action + self.on_job_failure_reasons = on_job_failure_reasons + if target_replicated_jobs is not None: + self.target_replicated_jobs = target_replicated_jobs + + @property + def action(self): + """Gets the action of this JobsetV1alpha2FailurePolicyRule. # noqa: E501 + + The action to take if the rule is matched. # noqa: E501 + + :return: The action of this JobsetV1alpha2FailurePolicyRule. # noqa: E501 + :rtype: str + """ + return self._action + + @action.setter + def action(self, action): + """Sets the action of this JobsetV1alpha2FailurePolicyRule. + + The action to take if the rule is matched. # noqa: E501 + + :param action: The action of this JobsetV1alpha2FailurePolicyRule. # noqa: E501 + :type: str + """ + if self.local_vars_configuration.client_side_validation and action is None: # noqa: E501 + raise ValueError("Invalid value for `action`, must not be `None`") # noqa: E501 + + self._action = action + + @property + def on_job_failure_reasons(self): + """Gets the on_job_failure_reasons of this JobsetV1alpha2FailurePolicyRule. # noqa: E501 + + The requirement on the job failure reasons. The requirement is satisfied if at least one reason matches the list. The rules are evaluated in order, and the first matching rule is executed. An empty list applies the rule to any job failure reason. # noqa: E501 + + :return: The on_job_failure_reasons of this JobsetV1alpha2FailurePolicyRule. # noqa: E501 + :rtype: list[str] + """ + return self._on_job_failure_reasons + + @on_job_failure_reasons.setter + def on_job_failure_reasons(self, on_job_failure_reasons): + """Sets the on_job_failure_reasons of this JobsetV1alpha2FailurePolicyRule. + + The requirement on the job failure reasons. The requirement is satisfied if at least one reason matches the list. The rules are evaluated in order, and the first matching rule is executed. An empty list applies the rule to any job failure reason. # noqa: E501 + + :param on_job_failure_reasons: The on_job_failure_reasons of this JobsetV1alpha2FailurePolicyRule. # noqa: E501 + :type: list[str] + """ + if self.local_vars_configuration.client_side_validation and on_job_failure_reasons is None: # noqa: E501 + raise ValueError("Invalid value for `on_job_failure_reasons`, must not be `None`") # noqa: E501 + + self._on_job_failure_reasons = on_job_failure_reasons + + @property + def target_replicated_jobs(self): + """Gets the target_replicated_jobs of this JobsetV1alpha2FailurePolicyRule. # noqa: E501 + + TargetReplicatedJobs are the names of the replicated jobs the operator applies to. An empty list will apply to all replicatedJobs. # noqa: E501 + + :return: The target_replicated_jobs of this JobsetV1alpha2FailurePolicyRule. # noqa: E501 + :rtype: list[str] + """ + return self._target_replicated_jobs + + @target_replicated_jobs.setter + def target_replicated_jobs(self, target_replicated_jobs): + """Sets the target_replicated_jobs of this JobsetV1alpha2FailurePolicyRule. + + TargetReplicatedJobs are the names of the replicated jobs the operator applies to. An empty list will apply to all replicatedJobs. # noqa: E501 + + :param target_replicated_jobs: The target_replicated_jobs of this JobsetV1alpha2FailurePolicyRule. # noqa: E501 + :type: list[str] + """ + + self._target_replicated_jobs = target_replicated_jobs + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.openapi_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + + return result + + def to_str(self): + """Returns the string representation of the model""" + return pprint.pformat(self.to_dict()) + + def __repr__(self): + """For `print` and `pprint`""" + return self.to_str() + + def __eq__(self, other): + """Returns true if both objects are equal""" + if not isinstance(other, JobsetV1alpha2FailurePolicyRule): + return False + + return self.to_dict() == other.to_dict() + + def __ne__(self, other): + """Returns true if both objects are not equal""" + if not isinstance(other, JobsetV1alpha2FailurePolicyRule): + return True + + return self.to_dict() != other.to_dict() diff --git a/sdk/python/jobset/models/jobset_v1alpha2_job_set_status.py b/sdk/python/jobset/models/jobset_v1alpha2_job_set_status.py index 1b50496e..e6486617 100644 --- a/sdk/python/jobset/models/jobset_v1alpha2_job_set_status.py +++ b/sdk/python/jobset/models/jobset_v1alpha2_job_set_status.py @@ -35,16 +35,18 @@ class JobsetV1alpha2JobSetStatus(object): openapi_types = { 'conditions': 'list[V1Condition]', 'replicated_jobs_status': 'list[JobsetV1alpha2ReplicatedJobStatus]', - 'restarts': 'int' + 'restarts': 'int', + 'restarts_count_towards_max': 'int' } attribute_map = { 'conditions': 'conditions', 'replicated_jobs_status': 'replicatedJobsStatus', - 'restarts': 'restarts' + 'restarts': 'restarts', + 'restarts_count_towards_max': 'restartsCountTowardsMax' } - def __init__(self, conditions=None, replicated_jobs_status=None, restarts=None, local_vars_configuration=None): # noqa: E501 + def __init__(self, conditions=None, replicated_jobs_status=None, restarts=None, restarts_count_towards_max=None, local_vars_configuration=None): # noqa: E501 """JobsetV1alpha2JobSetStatus - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration() @@ -53,6 +55,7 @@ def __init__(self, conditions=None, replicated_jobs_status=None, restarts=None, self._conditions = None self._replicated_jobs_status = None self._restarts = None + self._restarts_count_towards_max = None self.discriminator = None if conditions is not None: @@ -61,6 +64,8 @@ def __init__(self, conditions=None, replicated_jobs_status=None, restarts=None, self.replicated_jobs_status = replicated_jobs_status if restarts is not None: self.restarts = restarts + if restarts_count_towards_max is not None: + self.restarts_count_towards_max = restarts_count_towards_max @property def conditions(self): @@ -129,6 +134,29 @@ def restarts(self, restarts): self._restarts = restarts + @property + def restarts_count_towards_max(self): + """Gets the restarts_count_towards_max of this JobsetV1alpha2JobSetStatus. # noqa: E501 + + RestartsCountTowardsMax tracks the number of times the JobSet has restarted that counts towards the maximum allowed number of restarts. # noqa: E501 + + :return: The restarts_count_towards_max of this JobsetV1alpha2JobSetStatus. # noqa: E501 + :rtype: int + """ + return self._restarts_count_towards_max + + @restarts_count_towards_max.setter + def restarts_count_towards_max(self, restarts_count_towards_max): + """Sets the restarts_count_towards_max of this JobsetV1alpha2JobSetStatus. + + RestartsCountTowardsMax tracks the number of times the JobSet has restarted that counts towards the maximum allowed number of restarts. # noqa: E501 + + :param restarts_count_towards_max: The restarts_count_towards_max of this JobsetV1alpha2JobSetStatus. # noqa: E501 + :type: int + """ + + self._restarts_count_towards_max = restarts_count_towards_max + def to_dict(self): """Returns the model properties as a dict""" result = {} diff --git a/sdk/python/test/test_jobset_v1alpha2_failure_policy.py b/sdk/python/test/test_jobset_v1alpha2_failure_policy.py index f0b6d68c..b0498b70 100644 --- a/sdk/python/test/test_jobset_v1alpha2_failure_policy.py +++ b/sdk/python/test/test_jobset_v1alpha2_failure_policy.py @@ -38,7 +38,17 @@ def make_instance(self, include_optional): # model = jobset.models.jobset_v1alpha2_failure_policy.JobsetV1alpha2FailurePolicy() # noqa: E501 if include_optional : return JobsetV1alpha2FailurePolicy( - max_restarts = 56 + max_restarts = 56, + rules = [ + jobset.models.jobset_v1alpha2_failure_policy_rule.JobsetV1alpha2FailurePolicyRule( + action = '0', + on_job_failure_reasons = [ + '0' + ], + target_replicated_jobs = [ + '0' + ], ) + ] ) else : return JobsetV1alpha2FailurePolicy( diff --git a/sdk/python/test/test_jobset_v1alpha2_failure_policy_rule.py b/sdk/python/test/test_jobset_v1alpha2_failure_policy_rule.py new file mode 100644 index 00000000..f5740fc7 --- /dev/null +++ b/sdk/python/test/test_jobset_v1alpha2_failure_policy_rule.py @@ -0,0 +1,64 @@ +# coding: utf-8 + +""" + JobSet SDK + + Python SDK for the JobSet API # noqa: E501 + + The version of the OpenAPI document: v0.1.4 + Generated by: https://openapi-generator.tech +""" + + +from __future__ import absolute_import + +# Kubernetes imports +from kubernetes.client.models.v1_job_template_spec import V1JobTemplateSpec +import unittest +import datetime + +import jobset +from jobset.models.jobset_v1alpha2_failure_policy_rule import JobsetV1alpha2FailurePolicyRule # noqa: E501 +from jobset.rest import ApiException + +class TestJobsetV1alpha2FailurePolicyRule(unittest.TestCase): + """JobsetV1alpha2FailurePolicyRule unit test stubs""" + + def setUp(self): + pass + + def tearDown(self): + pass + + def make_instance(self, include_optional): + """Test JobsetV1alpha2FailurePolicyRule + include_option is a boolean, when False only required + params are included, when True both required and + optional params are included """ + # model = jobset.models.jobset_v1alpha2_failure_policy_rule.JobsetV1alpha2FailurePolicyRule() # noqa: E501 + if include_optional : + return JobsetV1alpha2FailurePolicyRule( + action = '0', + on_job_failure_reasons = [ + '0' + ], + target_replicated_jobs = [ + '0' + ] + ) + else : + return JobsetV1alpha2FailurePolicyRule( + action = '0', + on_job_failure_reasons = [ + '0' + ], + ) + + def testJobsetV1alpha2FailurePolicyRule(self): + """Test JobsetV1alpha2FailurePolicyRule""" + inst_req_only = self.make_instance(include_optional=False) + inst_req_and_optional = self.make_instance(include_optional=True) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdk/python/test/test_jobset_v1alpha2_job_set.py b/sdk/python/test/test_jobset_v1alpha2_job_set.py index c16e2b37..80bb1478 100644 --- a/sdk/python/test/test_jobset_v1alpha2_job_set.py +++ b/sdk/python/test/test_jobset_v1alpha2_job_set.py @@ -43,7 +43,17 @@ def make_instance(self, include_optional): metadata = None, spec = jobset.models.jobset_v1alpha2_job_set_spec.JobsetV1alpha2JobSetSpec( failure_policy = jobset.models.jobset_v1alpha2_failure_policy.JobsetV1alpha2FailurePolicy( - max_restarts = 56, ), + max_restarts = 56, + rules = [ + jobset.models.jobset_v1alpha2_failure_policy_rule.JobsetV1alpha2FailurePolicyRule( + action = '0', + on_job_failure_reasons = [ + '0' + ], + target_replicated_jobs = [ + '0' + ], ) + ], ), managed_by = '0', network = jobset.models.jobset_v1alpha2_network.JobsetV1alpha2Network( enable_dns_hostnames = True, @@ -58,10 +68,7 @@ def make_instance(self, include_optional): startup_policy = jobset.models.jobset_v1alpha2_startup_policy.JobsetV1alpha2StartupPolicy( startup_policy_order = '0', ), success_policy = jobset.models.jobset_v1alpha2_success_policy.JobsetV1alpha2SuccessPolicy( - operator = '0', - target_replicated_jobs = [ - '0' - ], ), + operator = '0', ), suspend = True, ttl_seconds_after_finished = 56, ), status = jobset.models.jobset_v1alpha2_job_set_status.JobsetV1alpha2JobSetStatus( @@ -77,7 +84,8 @@ def make_instance(self, include_optional): succeeded = 56, suspended = 56, ) ], - restarts = 56, ) + restarts = 56, + restarts_count_towards_max = 56, ) ) else : return JobsetV1alpha2JobSet( diff --git a/sdk/python/test/test_jobset_v1alpha2_job_set_list.py b/sdk/python/test/test_jobset_v1alpha2_job_set_list.py index 662f81d9..d1be43fe 100644 --- a/sdk/python/test/test_jobset_v1alpha2_job_set_list.py +++ b/sdk/python/test/test_jobset_v1alpha2_job_set_list.py @@ -46,7 +46,17 @@ def make_instance(self, include_optional): metadata = None, spec = jobset.models.jobset_v1alpha2_job_set_spec.JobsetV1alpha2JobSetSpec( failure_policy = jobset.models.jobset_v1alpha2_failure_policy.JobsetV1alpha2FailurePolicy( - max_restarts = 56, ), + max_restarts = 56, + rules = [ + jobset.models.jobset_v1alpha2_failure_policy_rule.JobsetV1alpha2FailurePolicyRule( + action = '0', + on_job_failure_reasons = [ + '0' + ], + target_replicated_jobs = [ + '0' + ], ) + ], ), managed_by = '0', network = jobset.models.jobset_v1alpha2_network.JobsetV1alpha2Network( enable_dns_hostnames = True, @@ -61,10 +71,7 @@ def make_instance(self, include_optional): startup_policy = jobset.models.jobset_v1alpha2_startup_policy.JobsetV1alpha2StartupPolicy( startup_policy_order = '0', ), success_policy = jobset.models.jobset_v1alpha2_success_policy.JobsetV1alpha2SuccessPolicy( - operator = '0', - target_replicated_jobs = [ - '0' - ], ), + operator = '0', ), suspend = True, ttl_seconds_after_finished = 56, ), status = jobset.models.jobset_v1alpha2_job_set_status.JobsetV1alpha2JobSetStatus( @@ -80,7 +87,8 @@ def make_instance(self, include_optional): succeeded = 56, suspended = 56, ) ], - restarts = 56, ), ) + restarts = 56, + restarts_count_towards_max = 56, ), ) ], kind = '0', metadata = None @@ -94,7 +102,17 @@ def make_instance(self, include_optional): metadata = None, spec = jobset.models.jobset_v1alpha2_job_set_spec.JobsetV1alpha2JobSetSpec( failure_policy = jobset.models.jobset_v1alpha2_failure_policy.JobsetV1alpha2FailurePolicy( - max_restarts = 56, ), + max_restarts = 56, + rules = [ + jobset.models.jobset_v1alpha2_failure_policy_rule.JobsetV1alpha2FailurePolicyRule( + action = '0', + on_job_failure_reasons = [ + '0' + ], + target_replicated_jobs = [ + '0' + ], ) + ], ), managed_by = '0', network = jobset.models.jobset_v1alpha2_network.JobsetV1alpha2Network( enable_dns_hostnames = True, @@ -109,10 +127,7 @@ def make_instance(self, include_optional): startup_policy = jobset.models.jobset_v1alpha2_startup_policy.JobsetV1alpha2StartupPolicy( startup_policy_order = '0', ), success_policy = jobset.models.jobset_v1alpha2_success_policy.JobsetV1alpha2SuccessPolicy( - operator = '0', - target_replicated_jobs = [ - '0' - ], ), + operator = '0', ), suspend = True, ttl_seconds_after_finished = 56, ), status = jobset.models.jobset_v1alpha2_job_set_status.JobsetV1alpha2JobSetStatus( @@ -128,7 +143,8 @@ def make_instance(self, include_optional): succeeded = 56, suspended = 56, ) ], - restarts = 56, ), ) + restarts = 56, + restarts_count_towards_max = 56, ), ) ], ) diff --git a/sdk/python/test/test_jobset_v1alpha2_job_set_spec.py b/sdk/python/test/test_jobset_v1alpha2_job_set_spec.py index 6671320c..4010e5fd 100644 --- a/sdk/python/test/test_jobset_v1alpha2_job_set_spec.py +++ b/sdk/python/test/test_jobset_v1alpha2_job_set_spec.py @@ -39,7 +39,17 @@ def make_instance(self, include_optional): if include_optional : return JobsetV1alpha2JobSetSpec( failure_policy = jobset.models.jobset_v1alpha2_failure_policy.JobsetV1alpha2FailurePolicy( - max_restarts = 56, ), + max_restarts = 56, + rules = [ + jobset.models.jobset_v1alpha2_failure_policy_rule.JobsetV1alpha2FailurePolicyRule( + action = '0', + on_job_failure_reasons = [ + '0' + ], + target_replicated_jobs = [ + '0' + ], ) + ], ), managed_by = '0', network = jobset.models.jobset_v1alpha2_network.JobsetV1alpha2Network( enable_dns_hostnames = True, diff --git a/sdk/python/test/test_jobset_v1alpha2_job_set_status.py b/sdk/python/test/test_jobset_v1alpha2_job_set_status.py index 293a2ee8..d7457001 100644 --- a/sdk/python/test/test_jobset_v1alpha2_job_set_status.py +++ b/sdk/python/test/test_jobset_v1alpha2_job_set_status.py @@ -50,7 +50,8 @@ def make_instance(self, include_optional): succeeded = 56, suspended = 56, ) ], - restarts = 56 + restarts = 56, + restarts_count_towards_max = 56 ) else : return JobsetV1alpha2JobSetStatus( diff --git a/test/integration/controller/jobset_controller_test.go b/test/integration/controller/jobset_controller_test.go index 5afa01df..ddf73701 100644 --- a/test/integration/controller/jobset_controller_test.go +++ b/test/integration/controller/jobset_controller_test.go @@ -443,6 +443,328 @@ var _ = ginkgo.Describe("JobSet controller", func() { }, }, }), + ginkgo.Entry("jobset fails immediately with FailJobSet failure policy action.", &testCase{ + makeJobSet: func(ns *corev1.Namespace) *testing.JobSetWrapper { + return testJobSet(ns). + FailurePolicy(&jobset.FailurePolicy{ + MaxRestarts: 1, + Rules: []jobset.FailurePolicyRule{ + { + Action: jobset.FailJobSet, + OnJobFailureReasons: []string{}, + }, + }, + }) + }, + updates: []*update{ + { + jobUpdateFn: func(jobList *batchv1.JobList) { + failJobWithOptions(&jobList.Items[0], &failJobOptions{reason: ptr.To(batchv1.JobReasonPodFailurePolicy)}) + }, + checkJobSetCondition: testutil.JobSetFailed, + }, + }, + }), + ginkgo.Entry("jobset does not fail immediately with FailJobSet failure policy action.", &testCase{ + makeJobSet: func(ns *corev1.Namespace) *testing.JobSetWrapper { + return testJobSet(ns). + FailurePolicy(&jobset.FailurePolicy{ + MaxRestarts: 1, + Rules: []jobset.FailurePolicyRule{ + { + Action: jobset.FailJobSet, + OnJobFailureReasons: []string{batchv1.JobReasonBackoffLimitExceeded}, + }, + }, + }) + }, + updates: []*update{ + { + jobUpdateFn: func(jobList *batchv1.JobList) { + failJobWithOptions(&jobList.Items[0], &failJobOptions{reason: ptr.To(batchv1.JobReasonPodFailurePolicy)}) + }, + checkJobSetCondition: testutil.JobSetActive, + }, + { + jobSetUpdateFn: func(js *jobset.JobSet) { + // For a restart, all jobs will be deleted and recreated, so we expect a + // foreground deletion finalizer for every job. + removeForegroundDeletionFinalizers(js, testutil.NumExpectedJobs(js)) + }, + }, + }, + }), + ginkgo.Entry("jobset restarts with RestartJobSet Failure Policy Action.", &testCase{ + makeJobSet: func(ns *corev1.Namespace) *testing.JobSetWrapper { + return testJobSet(ns). + FailurePolicy(&jobset.FailurePolicy{ + MaxRestarts: 1, + Rules: []jobset.FailurePolicyRule{ + { + Action: jobset.RestartJobSet, + OnJobFailureReasons: []string{batchv1.JobReasonPodFailurePolicy}, + }, + { + Action: jobset.FailJobSet, + OnJobFailureReasons: []string{}, + }, + }, + }) + }, + updates: []*update{ + { + jobUpdateFn: func(jobList *batchv1.JobList) { + failJobWithOptions(&jobList.Items[0], &failJobOptions{reason: ptr.To(batchv1.JobReasonPodFailurePolicy)}) + }, + checkJobSetCondition: testutil.JobSetActive, + }, + }, + }), + ginkgo.Entry("jobset restarts with RestartJobSetAndIgnoremaxRestarts Failure Policy Action.", &testCase{ + makeJobSet: func(ns *corev1.Namespace) *testing.JobSetWrapper { + return testJobSet(ns). + FailurePolicy(&jobset.FailurePolicy{ + MaxRestarts: 1, + Rules: []jobset.FailurePolicyRule{ + { + Action: jobset.RestartJobSetAndIgnoreMaxRestarts, + OnJobFailureReasons: []string{batchv1.JobReasonPodFailurePolicy}, + }, + { + Action: jobset.FailJobSet, + OnJobFailureReasons: []string{}, + }, + }, + }) + }, + updates: []*update{ + { + jobUpdateFn: func(jobList *batchv1.JobList) { + failJobWithOptions(&jobList.Items[0], &failJobOptions{reason: ptr.To(batchv1.JobReasonPodFailurePolicy)}) + }, + checkJobSetCondition: testutil.JobSetActive, + }, + { + jobSetUpdateFn: func(js *jobset.JobSet) { + // For a restart, all jobs will be deleted and recreated, so we expect a + // foreground deletion finalizer for every job. + removeForegroundDeletionFinalizers(js, testutil.NumExpectedJobs(js)) + }, + }, + { + jobUpdateFn: func(jobList *batchv1.JobList) { + failJobWithOptions(&jobList.Items[0], &failJobOptions{reason: ptr.To(batchv1.JobReasonPodFailurePolicy)}) + }, + checkJobSetCondition: testutil.JobSetActive, + }, + { + jobSetUpdateFn: func(js *jobset.JobSet) { + // For a restart, all jobs will be deleted and recreated, so we expect a + // foreground deletion finalizer for every job. + removeForegroundDeletionFinalizers(js, testutil.NumExpectedJobs(js)) + }, + }, + { + jobUpdateFn: func(jobList *batchv1.JobList) { + failJobWithOptions(&jobList.Items[0], &failJobOptions{reason: ptr.To(batchv1.JobReasonPodFailurePolicy)}) + }, + checkJobSetCondition: testutil.JobSetActive, + }, + { + jobSetUpdateFn: func(js *jobset.JobSet) { + // For a restart, all jobs will be deleted and recreated, so we expect a + // foreground deletion finalizer for every job. + removeForegroundDeletionFinalizers(js, testutil.NumExpectedJobs(js)) + }, + }, + }, + }), + ginkgo.Entry("job fails and the parent replicated job is contained in TargetReplicatedJobs.", &testCase{ + makeJobSet: func(ns *corev1.Namespace) *testing.JobSetWrapper { + return testJobSet(ns). + FailurePolicy(&jobset.FailurePolicy{ + MaxRestarts: 1, + Rules: []jobset.FailurePolicyRule{ + { + Action: jobset.FailJobSet, + OnJobFailureReasons: []string{batchv1.JobReasonFailedIndexes}, + TargetReplicatedJobs: []string{"replicated-job-b"}, + }, + }, + }) + }, + updates: []*update{ + { + jobUpdateFn: func(jobList *batchv1.JobList) { + failFirstMatchingJobWithOptions(jobList, "replicated-job-b", &failJobOptions{reason: ptr.To(batchv1.JobReasonFailedIndexes)}) + }, + checkJobSetCondition: testutil.JobSetFailed, + }, + }, + }), + ginkgo.Entry("job fails and the parent replicated job is not contained in TargetReplicatedJobs.", &testCase{ + makeJobSet: func(ns *corev1.Namespace) *testing.JobSetWrapper { + return testJobSet(ns). + FailurePolicy(&jobset.FailurePolicy{ + MaxRestarts: 1, + Rules: []jobset.FailurePolicyRule{ + { + Action: jobset.FailJobSet, + OnJobFailureReasons: []string{batchv1.JobReasonBackoffLimitExceeded}, + TargetReplicatedJobs: []string{"replicated-job-a"}, + }, + }, + }) + }, + updates: []*update{ + { + jobUpdateFn: func(jobList *batchv1.JobList) { + failFirstMatchingJobWithOptions(jobList, "replicated-job-b", &failJobOptions{reason: ptr.To(batchv1.JobReasonBackoffLimitExceeded)}) + }, + checkJobSetCondition: testutil.JobSetActive, + }, + { + jobSetUpdateFn: func(js *jobset.JobSet) { + // For a restart, all jobs will be deleted and recreated, so we expect a + // foreground deletion finalizer for every job. + removeForegroundDeletionFinalizers(js, testutil.NumExpectedJobs(js)) + }, + }, + { + checkJobSetCondition: testutil.JobSetActive, + }, + }, + }), + ginkgo.Entry("failure policy rules order verification test 1", &testCase{ + makeJobSet: func(ns *corev1.Namespace) *testing.JobSetWrapper { + return testJobSet(ns). + FailurePolicy(&jobset.FailurePolicy{ + MaxRestarts: 1, + Rules: []jobset.FailurePolicyRule{ + { + Action: jobset.FailJobSet, + OnJobFailureReasons: []string{batchv1.JobReasonMaxFailedIndexesExceeded}, + TargetReplicatedJobs: []string{"replicated-job-a"}, + }, + { + Action: jobset.RestartJobSet, + OnJobFailureReasons: []string{batchv1.JobReasonMaxFailedIndexesExceeded}, + TargetReplicatedJobs: []string{"replicated-job-a"}, + }, + }, + }) + }, + updates: []*update{ + { + jobUpdateFn: func(jobList *batchv1.JobList) { + failFirstMatchingJobWithOptions(jobList, "replicated-job-a", &failJobOptions{reason: ptr.To(batchv1.JobReasonMaxFailedIndexesExceeded)}) + }, + checkJobSetCondition: testutil.JobSetFailed, + }, + }, + }), + ginkgo.Entry("failure policy rules order verification test 2", &testCase{ + makeJobSet: func(ns *corev1.Namespace) *testing.JobSetWrapper { + return testJobSet(ns). + FailurePolicy(&jobset.FailurePolicy{ + MaxRestarts: 1, + Rules: []jobset.FailurePolicyRule{ + { + Action: jobset.RestartJobSet, + OnJobFailureReasons: []string{batchv1.JobReasonMaxFailedIndexesExceeded}, + TargetReplicatedJobs: []string{"replicated-job-a"}, + }, + { + Action: jobset.FailJobSet, + OnJobFailureReasons: []string{batchv1.JobReasonMaxFailedIndexesExceeded}, + TargetReplicatedJobs: []string{"replicated-job-a"}, + }, + }, + }) + }, + updates: []*update{ + { + jobUpdateFn: func(jobList *batchv1.JobList) { + failFirstMatchingJobWithOptions(jobList, "replicated-job-a", &failJobOptions{reason: ptr.To(batchv1.JobReasonMaxFailedIndexesExceeded)}) + }, + checkJobSetCondition: testutil.JobSetActive, + }, + { + jobSetUpdateFn: func(js *jobset.JobSet) { + // For a restart, all jobs will be deleted and recreated, so we expect a + // foreground deletion finalizer for every job. + removeForegroundDeletionFinalizers(js, testutil.NumExpectedJobs(js)) + }, + }, + }, + }), + ginkgo.Entry("failure policy rules order verification test 3", &testCase{ + makeJobSet: func(ns *corev1.Namespace) *testing.JobSetWrapper { + return testJobSet(ns). + FailurePolicy(&jobset.FailurePolicy{ + MaxRestarts: 1, + Rules: []jobset.FailurePolicyRule{ + { + Action: jobset.RestartJobSetAndIgnoreMaxRestarts, + OnJobFailureReasons: []string{batchv1.JobReasonMaxFailedIndexesExceeded}, + TargetReplicatedJobs: []string{"replicated-job-a"}, + }, + { + Action: jobset.FailJobSet, + OnJobFailureReasons: []string{}, + TargetReplicatedJobs: []string{}, + }, + }, + }) + }, + updates: []*update{ + { + jobUpdateFn: func(jobList *batchv1.JobList) { + failFirstMatchingJobWithOptions(jobList, "replicated-job-a", &failJobOptions{reason: ptr.To(batchv1.JobReasonMaxFailedIndexesExceeded)}) + }, + checkJobSetCondition: testutil.JobSetActive, + }, + { + jobSetUpdateFn: func(js *jobset.JobSet) { + // For a restart, all jobs will be deleted and recreated, so we expect a + // foreground deletion finalizer for every job. + removeForegroundDeletionFinalizers(js, testutil.NumExpectedJobs(js)) + }, + }, + { + jobUpdateFn: func(jobList *batchv1.JobList) { + failFirstMatchingJobWithOptions(jobList, "replicated-job-a", &failJobOptions{reason: ptr.To(batchv1.JobReasonMaxFailedIndexesExceeded)}) + }, + checkJobSetCondition: testutil.JobSetActive, + }, + { + jobSetUpdateFn: func(js *jobset.JobSet) { + // For a restart, all jobs will be deleted and recreated, so we expect a + // foreground deletion finalizer for every job. + removeForegroundDeletionFinalizers(js, testutil.NumExpectedJobs(js)) + }, + }, + { + jobUpdateFn: func(jobList *batchv1.JobList) { + failFirstMatchingJobWithOptions(jobList, "replicated-job-a", &failJobOptions{reason: ptr.To(batchv1.JobReasonMaxFailedIndexesExceeded)}) + }, + checkJobSetCondition: testutil.JobSetActive, + }, + { + jobSetUpdateFn: func(js *jobset.JobSet) { + // For a restart, all jobs will be deleted and recreated, so we expect a + // foreground deletion finalizer for every job. + removeForegroundDeletionFinalizers(js, testutil.NumExpectedJobs(js)) + }, + }, + { + jobUpdateFn: func(jobList *batchv1.JobList) { + failFirstMatchingJob(jobList, "replicated-job-b") + }, + checkJobSetCondition: testutil.JobSetFailed, + }, + }, + }), ginkgo.Entry("job succeeds after one failure", &testCase{ makeJobSet: func(ns *corev1.Namespace) *testing.JobSetWrapper { return testJobSet(ns). @@ -1442,17 +1764,55 @@ func updateJobStatus(job *batchv1.Job, status batchv1.JobStatus) { jobGet.Status = status return k8sClient.Status().Update(ctx, &jobGet) }, timeout, interval).Should(gomega.Succeed()) + } -func failJob(job *batchv1.Job) { +type failJobOptions struct { + reason *string +} + +func failJobWithOptions(job *batchv1.Job, failJobOpts *failJobOptions) { + if failJobOpts == nil { + failJobOpts = &failJobOptions{} + } updateJobStatus(job, batchv1.JobStatus{ Conditions: append(job.Status.Conditions, batchv1.JobCondition{ Type: batchv1.JobFailed, Status: corev1.ConditionTrue, + Reason: ptr.Deref(failJobOpts.reason, ""), }), }) } +func failJob(job *batchv1.Job) { + failJobWithOptions(job, nil) +} + +// failFirstMatchingJobWithOptions fails the first matching job (in terms of index in jobList) that is a child of +// replicatedJobName with extra options. No job is failed if a matching job does not exist. +func failFirstMatchingJobWithOptions(jobList *batchv1.JobList, replicatedJobName string, failJobOpts *failJobOptions) { + if jobList == nil { + return + } + if failJobOpts == nil { + failJobOpts = &failJobOptions{} + } + + for _, job := range jobList.Items { + parentReplicatedJob := job.Labels[jobset.ReplicatedJobNameKey] + if parentReplicatedJob == replicatedJobName { + failJobWithOptions(&job, failJobOpts) + return + } + } +} + +// failFirstMatchingJob fails the first matching job (in terms of index in jobList) that is a child of +// replicatedJobName. No job is failed if a matching job does not exist. +func failFirstMatchingJob(jobList *batchv1.JobList, replicatedJobName string) { + failFirstMatchingJobWithOptions(jobList, replicatedJobName, nil) +} + func suspendJobSet(js *jobset.JobSet, suspend bool) { gomega.Eventually(func() error { var jsGet jobset.JobSet