Skip to content

Commit

Permalink
Realign with KEP
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed Oct 23, 2024
1 parent c509e04 commit ab6e7ee
Show file tree
Hide file tree
Showing 13 changed files with 138 additions and 97 deletions.
16 changes: 13 additions & 3 deletions apis/kueue/v1beta1/workload_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,14 @@ type WorkloadSpec struct {
// +kubebuilder:default=true
Active *bool `json:"active,omitempty"`

// MaximumExecutionTime if provided, determines the maximum time the workload can be admitted
// before it's automatically deactivated.
MaximumExecutionTime *metav1.Duration `json:"maximumExecutionTime,omitempty"`
// MaximumExecutionTimeSeconds if provided, determines the maximum time, in seconds,
// the workload can be admitted before it's automatically deactivated.
//
// If unspecified, no execution time limit is enforced on the Workload.
//
// +optional
// +kubebuilder:validation:Minimum=1
MaximumExecutionTimeSeconds *int32 `json:"maximumExecutionTimeSeconds,omitempty"`
}

// PodSetTopologyRequest defines the topology request for a PodSet.
Expand Down Expand Up @@ -524,6 +529,10 @@ const (
// WorkloadRequeuingLimitExceeded indicates that the workload exceeded max number
// of re-queuing retries.
WorkloadRequeuingLimitExceeded = "RequeuingLimitExceeded"

// WorkloadMaximumExecutionTimeExceeded indicates that the workload exceeded its
// maximum execution time.
WorkloadMaximumExecutionTimeExceeded = "MaximumExecutionTimeExceeded"
)

const (
Expand Down Expand Up @@ -556,6 +565,7 @@ const (
// +kubebuilder:validation:XValidation:rule="(has(oldSelf.status) && has(oldSelf.status.conditions) && oldSelf.status.conditions.exists(c, c.type == 'QuotaReserved' && c.status == 'True')) ? (oldSelf.spec.priorityClassSource == self.spec.priorityClassSource) : true", message="field is immutable"
// +kubebuilder:validation:XValidation:rule="(has(oldSelf.status) && has(oldSelf.status.conditions) && oldSelf.status.conditions.exists(c, c.type == 'QuotaReserved' && c.status == 'True') && has(oldSelf.spec.priorityClassName) && has(self.spec.priorityClassName)) ? (oldSelf.spec.priorityClassName == self.spec.priorityClassName) : true", message="field is immutable"
// +kubebuilder:validation:XValidation:rule="(has(oldSelf.status) && has(oldSelf.status.conditions) && oldSelf.status.conditions.exists(c, c.type == 'QuotaReserved' && c.status == 'True')) && (has(self.status) && has(self.status.conditions) && self.status.conditions.exists(c, c.type == 'QuotaReserved' && c.status == 'True')) && has(oldSelf.spec.queueName) && has(self.spec.queueName) ? oldSelf.spec.queueName == self.spec.queueName : true", message="field is immutable"
// +kubebuilder:validation:XValidation:rule="((has(oldSelf.status) && has(oldSelf.status.conditions) && oldSelf.status.conditions.exists(c, c.type == 'Admitted' && c.status == 'True')) && (has(self.status) && has(self.status.conditions) && self.status.conditions.exists(c, c.type == 'Admitted' && c.status == 'True')))?((has(oldSelf.spec.maximumExecutionTimeSeconds)?oldSelf.spec.maximumExecutionTimeSeconds:0) == (has(self.spec.maximumExecutionTimeSeconds)?self.spec.maximumExecutionTimeSeconds:0)):true", message="maximumExecutionTimeSeconds is immutable while admitted"
type Workload struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Expand Down
6 changes: 3 additions & 3 deletions apis/kueue/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 14 additions & 4 deletions charts/kueue/templates/crd/kueue.x-k8s.io_workloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,15 @@ spec:

Defaults to true
type: boolean
maximumExecutionTime:
maximumExecutionTimeSeconds:
description: |-
MaximumExecutionTime if provided, determines the maximum time the workload can be admitted
before it's automatically deactivated.
type: string
MaximumExecutionTimeSeconds if provided, determines the maximum time, in seconds,
the workload can be admitted before it's automatically deactivated.

If unspecified, no execution time limit is enforced on the Workload.
format: int32
minimum: 1
type: integer
podSets:
description: |-
podSets is a list of sets of homogeneous pods, each described by a Pod spec
Expand Down Expand Up @@ -8713,6 +8717,12 @@ spec:
== ''QuotaReserved'' && c.status == ''True'')) && has(oldSelf.spec.queueName)
&& has(self.spec.queueName) ? oldSelf.spec.queueName == self.spec.queueName
: true'
- message: maximumExecutionTimeSeconds is immutable while admitted
rule: ((has(oldSelf.status) && has(oldSelf.status.conditions) && oldSelf.status.conditions.exists(c,
c.type == 'Admitted' && c.status == 'True')) && (has(self.status) && has(self.status.conditions)
&& self.status.conditions.exists(c, c.type == 'Admitted' && c.status ==
'True')))?((has(oldSelf.spec.maximumExecutionTimeSeconds)?oldSelf.spec.maximumExecutionTimeSeconds:0)
== (has(self.spec.maximumExecutionTimeSeconds)?self.spec.maximumExecutionTimeSeconds:0)):true
served: true
storage: true
subresources:
Expand Down
26 changes: 11 additions & 15 deletions client-go/applyconfiguration/kueue/v1beta1/workloadspec.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 14 additions & 4 deletions config/components/crd/bases/kueue.x-k8s.io_workloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,15 @@ spec:

Defaults to true
type: boolean
maximumExecutionTime:
maximumExecutionTimeSeconds:
description: |-
MaximumExecutionTime if provided, determines the maximum time the workload can be admitted
before it's automatically deactivated.
type: string
MaximumExecutionTimeSeconds if provided, determines the maximum time, in seconds,
the workload can be admitted before it's automatically deactivated.

If unspecified, no execution time limit is enforced on the Workload.
format: int32
minimum: 1
type: integer
podSets:
description: |-
podSets is a list of sets of homogeneous pods, each described by a Pod spec
Expand Down Expand Up @@ -8698,6 +8702,12 @@ spec:
== ''QuotaReserved'' && c.status == ''True'')) && has(oldSelf.spec.queueName)
&& has(self.spec.queueName) ? oldSelf.spec.queueName == self.spec.queueName
: true'
- message: maximumExecutionTimeSeconds is immutable while admitted
rule: ((has(oldSelf.status) && has(oldSelf.status.conditions) && oldSelf.status.conditions.exists(c,
c.type == 'Admitted' && c.status == 'True')) && (has(self.status) && has(self.status.conditions)
&& self.status.conditions.exists(c, c.type == 'Admitted' && c.status ==
'True')))?((has(oldSelf.spec.maximumExecutionTimeSeconds)?oldSelf.spec.maximumExecutionTimeSeconds:0)
== (has(self.spec.maximumExecutionTimeSeconds)?self.spec.maximumExecutionTimeSeconds:0)):true
served: true
storage: true
subresources:
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,21 +341,22 @@ func isDisabledRequeuedByReason(w *kueue.Workload, reason string) bool {
// reconcileMaxExecutionTime deactivates the workload if its MaximumExecutionTime exceeded or returns a retry after value.
func (r *WorkloadReconciler) reconcileMaxExecutionTime(ctx context.Context, wl *kueue.Workload) (time.Duration, error) {
admittedCondition := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadAdmitted)
if admittedCondition == nil || admittedCondition.Status != metav1.ConditionTrue || wl.Spec.MaximumExecutionTime == nil {
if admittedCondition == nil || admittedCondition.Status != metav1.ConditionTrue || wl.Spec.MaximumExecutionTimeSeconds == nil {
return 0, nil
}

remainingTime := wl.Spec.MaximumExecutionTime.Duration - r.clock.Since(admittedCondition.LastTransitionTime.Time)
remainingTime := time.Duration(*wl.Spec.MaximumExecutionTimeSeconds)*time.Second - r.clock.Since(admittedCondition.LastTransitionTime.Time)
if remainingTime > 0 {
return remainingTime, nil
}

if ptr.Deref(wl.Spec.Active, true) {
wl.Spec.Active = ptr.To(false)
if err := r.client.Update(ctx, wl); err != nil {
workload.SetDeactivationTarget(wl, kueue.WorkloadMaximumExecutionTimeExceeded, "exceeding the maximum execution time")
if err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true); err != nil {
return 0, err
}
r.recorder.Eventf(wl, corev1.EventTypeWarning, "MaximumExecutionTimeExceeded", "The maximum execution time (%v) exceeded", wl.Spec.MaximumExecutionTime.Duration)
r.recorder.Eventf(wl, corev1.EventTypeWarning, kueue.WorkloadMaximumExecutionTimeExceeded, "The maximum execution time (%ds) exceeded", *wl.Spec.MaximumExecutionTimeSeconds)
}
return 0, nil
}
Expand Down
17 changes: 11 additions & 6 deletions pkg/controller/core/workload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1322,13 +1322,13 @@ func TestReconcile(t *testing.T) {
"admitted workload with max execution time": {
workload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
MaximumExecutionTime(2*time.Minute).
MaximumExecutionTimeSeconds(120).
AdmittedAt(true, testStartTime.Add(-time.Minute)).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid").
Obj(),
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
MaximumExecutionTime(2*time.Minute).
MaximumExecutionTimeSeconds(120).
AdmittedAt(true, testStartTime.Add(-time.Minute)).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid").
Obj(),
Expand All @@ -1338,23 +1338,28 @@ func TestReconcile(t *testing.T) {
"admitted workload with max execution time - expired": {
workload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
MaximumExecutionTime(time.Minute).
MaximumExecutionTimeSeconds(60).
AdmittedAt(true, testStartTime.Add(-2*time.Minute)).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid").
Obj(),
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Active(false).
MaximumExecutionTime(time.Minute).
MaximumExecutionTimeSeconds(60).
AdmittedAt(true, testStartTime.Add(-2*time.Minute)).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid").
Condition(metav1.Condition{
Type: kueue.WorkloadDeactivationTarget,
Status: metav1.ConditionTrue,
Reason: kueue.WorkloadMaximumExecutionTimeExceeded,
Message: "exceeding the maximum execution time",
}).
Obj(),
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Namespace: "ns", Name: "wl"},
EventType: "Warning",
Reason: "MaximumExecutionTimeExceeded",
Message: "The maximum execution time (1m0s) exceeded",
Message: "The maximum execution time (60s) exceeded",
},
},
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,8 @@ func (w *WorkloadWrapper) ResourceVersion(v string) *WorkloadWrapper {
return w
}

func (w *WorkloadWrapper) MaximumExecutionTime(v time.Duration) *WorkloadWrapper {
w.Spec.MaximumExecutionTime = ptr.To(metav1.Duration{Duration: v})
func (w *WorkloadWrapper) MaximumExecutionTimeSeconds(v int32) *WorkloadWrapper {
w.Spec.MaximumExecutionTimeSeconds = &v
return w
}

Expand Down
20 changes: 0 additions & 20 deletions pkg/webhooks/workload_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

corev1 "k8s.io/api/core/v1"
apivalidation "k8s.io/apimachinery/pkg/api/validation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1validation "k8s.io/apimachinery/pkg/apis/meta/v1/validation"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -112,7 +111,6 @@ func ValidateWorkload(obj *kueue.Workload) field.ErrorList {
if variableCountPodSets > 1 {
allErrs = append(allErrs, field.Invalid(specPath.Child("podSets"), variableCountPodSets, "at most one podSet can use minCount"))
}
allErrs = append(allErrs, validateMaximumExecutionTime(obj.Spec.MaximumExecutionTime, specPath.Child("maximumExecutionTime"))...)

statusPath := field.NewPath("status")
if workload.HasQuotaReservation(obj) {
Expand Down Expand Up @@ -185,13 +183,6 @@ func validatePodSetUpdates(acs *kueue.AdmissionCheckState, obj *kueue.Workload,
return allErrs
}

func validateMaximumExecutionTime(met *metav1.Duration, path *field.Path) field.ErrorList {
if met != nil && met.Duration <= 0 {
return field.ErrorList{field.Invalid(path, *met, "should be grater then 0 if specified")}
}
return nil
}

func validateImmutablePodSetUpdates(newObj, oldObj *kueue.Workload, basePath *field.Path) field.ErrorList {
var allErrs field.ErrorList
newAcs := slices.ToRefMap(newObj.Status.AdmissionChecks, func(f *kueue.AdmissionCheckState) string { return f.Name })
Expand Down Expand Up @@ -284,8 +275,6 @@ func ValidateWorkloadUpdate(newObj, oldObj *kueue.Workload) field.ErrorList {
if workload.HasQuotaReservation(oldObj) {
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PodSets, oldObj.Spec.PodSets, specPath.Child("podSets"))...)
}
allErrs = append(allErrs, validateMaximumExecutionTimeUpdate(newObj, oldObj, specPath.Child("maximumExecutionTime"))...)

if workload.HasQuotaReservation(newObj) && workload.HasQuotaReservation(oldObj) {
allErrs = append(allErrs, validateReclaimablePodsUpdate(newObj, oldObj, field.NewPath("status", "reclaimablePods"))...)
}
Expand Down Expand Up @@ -342,12 +331,3 @@ func validateReclaimablePodsUpdate(newObj, oldObj *kueue.Workload, basePath *fie
}
return ret
}

func validateMaximumExecutionTimeUpdate(newWl, oldWl *kueue.Workload, path *field.Path) field.ErrorList {
admitted := workload.IsAdmitted(oldWl) || workload.IsAdmitted(newWl)
changed := ptr.Deref(oldWl.Spec.MaximumExecutionTime, metav1.Duration{}) != ptr.Deref(newWl.Spec.MaximumExecutionTime, metav1.Duration{})
if admitted && changed {
return apivalidation.ValidateImmutableField(newWl.Spec.MaximumExecutionTime, oldWl.Spec.MaximumExecutionTime, path)
}
return nil
}
31 changes: 0 additions & 31 deletions pkg/webhooks/workload_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,15 +237,6 @@ func TestValidateWorkload(t *testing.T) {
field.Invalid(podSetsPath, nil, ""),
},
},

"valid maximum execution time": {
workload: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).MaximumExecutionTime(time.Microsecond).Obj(),
},

"invalid maximum execution time": {
workload: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).MaximumExecutionTime(0).Obj(),
wantErr: field.ErrorList{field.Invalid(specPath.Child("maximumExecutionTime"), 0, "")},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -401,28 +392,6 @@ func TestValidateWorkloadUpdate(t *testing.T) {
State: kueue.CheckStateReady,
}).Obj(),
},
"can add maximum execution time when not admitted": {
before: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).Obj(),
after: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).MaximumExecutionTime(time.Second).Obj(),
},
"can update maximum execution time when not admitted": {
before: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).MaximumExecutionTime(2 * time.Second).Obj(),
after: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).MaximumExecutionTime(time.Second).Obj(),
},
"cannot add maximum execution time when admitted": {
before: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).Obj(),
after: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).
ReserveQuota(testingutil.MakeAdmission("cq").Obj()).Admitted(true).
MaximumExecutionTime(time.Second).Obj(),
wantErr: field.ErrorList{field.Invalid(field.NewPath("spec", "maximumExecutionTime"), metav1.Duration{Duration: time.Second}, "")},
},
"cannot update maximum execution time when admitted": {
before: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).MaximumExecutionTime(2 * time.Second).Obj(),
after: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).
ReserveQuota(testingutil.MakeAdmission("cq").Obj()).Admitted(true).
MaximumExecutionTime(time.Second).Obj(),
wantErr: field.ErrorList{field.Invalid(field.NewPath("spec", "maximumExecutionTime"), metav1.Duration{Duration: time.Second}, "")},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
Expand Down
9 changes: 5 additions & 4 deletions site/content/en/docs/reference/kueue.v1beta1.md
Original file line number Diff line number Diff line change
Expand Up @@ -2355,12 +2355,13 @@ Possible values are:</p>
<p>Defaults to true</p>
</td>
</tr>
<tr><td><code>maximumExecutionTime</code> <B>[Required]</B><br/>
<a href="https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#duration-v1-meta"><code>k8s.io/apimachinery/pkg/apis/meta/v1.Duration</code></a>
<tr><td><code>maximumExecutionTimeSeconds</code><br/>
<code>int32</code>
</td>
<td>
<p>MaximumExecutionTime if provided, determines the maximum time the workload can be admitted
before it's automatically deactivated.</p>
<p>MaximumExecutionTimeSeconds if provided, determines the maximum time, in seconds,
the workload can be admitted before it's automatically deactivated.</p>
<p>If unspecified, no execution time limit is enforced on the Workload.</p>
</td>
</tr>
</tbody>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ var _ = ginkgo.Describe("Workload controller", ginkgo.Ordered, ginkgo.ContinueOn
maxExecTime := 2 * time.Second
wl := testing.MakeWorkload("wl", ns.Name).
Queue("lq").
MaximumExecutionTime(maxExecTime).
MaximumExecutionTimeSeconds(int32(maxExecTime.Seconds())).
Obj()
key := client.ObjectKeyFromObject(wl)
ginkgo.By("creating the workload and reserving its quota", func() {
Expand Down
Loading

0 comments on commit ab6e7ee

Please sign in to comment.