Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[workload] Add maximum execution time. #3184

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions apis/kueue/v1beta1/workload_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ type WorkloadSpec struct {
// Defaults to true
// +kubebuilder:default=true
Active *bool `json:"active,omitempty"`

// maximumExecutionTimeSeconds if provided, determines the maximum time, in seconds,
// the workload can be admitted before it's automatically deactivated.
//
// If unspecified, no execution time limit is enforced on the Workload.
//
// +optional
// +kubebuilder:validation:Minimum=1
MaximumExecutionTimeSeconds *int32 `json:"maximumExecutionTimeSeconds,omitempty"`
}

// PodSetTopologyRequest defines the topology request for a PodSet.
Expand Down Expand Up @@ -520,6 +529,10 @@ const (
// WorkloadRequeuingLimitExceeded indicates that the workload exceeded max number
// of re-queuing retries.
WorkloadRequeuingLimitExceeded = "RequeuingLimitExceeded"

// WorkloadMaximumExecutionTimeExceeded indicates that the workload exceeded its
// maximum execution time.
WorkloadMaximumExecutionTimeExceeded = "MaximumExecutionTimeExceeded"
)

const (
Expand Down Expand Up @@ -552,6 +565,7 @@ const (
// +kubebuilder:validation:XValidation:rule="(has(oldSelf.status) && has(oldSelf.status.conditions) && oldSelf.status.conditions.exists(c, c.type == 'QuotaReserved' && c.status == 'True')) ? (oldSelf.spec.priorityClassSource == self.spec.priorityClassSource) : true", message="field is immutable"
// +kubebuilder:validation:XValidation:rule="(has(oldSelf.status) && has(oldSelf.status.conditions) && oldSelf.status.conditions.exists(c, c.type == 'QuotaReserved' && c.status == 'True') && has(oldSelf.spec.priorityClassName) && has(self.spec.priorityClassName)) ? (oldSelf.spec.priorityClassName == self.spec.priorityClassName) : true", message="field is immutable"
// +kubebuilder:validation:XValidation:rule="(has(oldSelf.status) && has(oldSelf.status.conditions) && oldSelf.status.conditions.exists(c, c.type == 'QuotaReserved' && c.status == 'True')) && (has(self.status) && has(self.status.conditions) && self.status.conditions.exists(c, c.type == 'QuotaReserved' && c.status == 'True')) && has(oldSelf.spec.queueName) && has(self.spec.queueName) ? oldSelf.spec.queueName == self.spec.queueName : true", message="field is immutable"
// +kubebuilder:validation:XValidation:rule="((has(oldSelf.status) && has(oldSelf.status.conditions) && oldSelf.status.conditions.exists(c, c.type == 'Admitted' && c.status == 'True')) && (has(self.status) && has(self.status.conditions) && self.status.conditions.exists(c, c.type == 'Admitted' && c.status == 'True')))?((has(oldSelf.spec.maximumExecutionTimeSeconds)?oldSelf.spec.maximumExecutionTimeSeconds:0) == (has(self.spec.maximumExecutionTimeSeconds)?self.spec.maximumExecutionTimeSeconds:0)):true", message="maximumExecutionTimeSeconds is immutable while admitted"
type Workload struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions apis/kueue/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions charts/kueue/templates/crd/kueue.x-k8s.io_workloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ spec:

Defaults to true
type: boolean
maximumExecutionTimeSeconds:
description: |-
maximumExecutionTimeSeconds if provided, determines the maximum time, in seconds,
the workload can be admitted before it's automatically deactivated.

If unspecified, no execution time limit is enforced on the Workload.
format: int32
minimum: 1
type: integer
podSets:
description: |-
podSets is a list of sets of homogeneous pods, each described by a Pod spec
Expand Down Expand Up @@ -8708,6 +8717,12 @@ spec:
== ''QuotaReserved'' && c.status == ''True'')) && has(oldSelf.spec.queueName)
&& has(self.spec.queueName) ? oldSelf.spec.queueName == self.spec.queueName
: true'
- message: maximumExecutionTimeSeconds is immutable while admitted
rule: ((has(oldSelf.status) && has(oldSelf.status.conditions) && oldSelf.status.conditions.exists(c,
c.type == 'Admitted' && c.status == 'True')) && (has(self.status) && has(self.status.conditions)
&& self.status.conditions.exists(c, c.type == 'Admitted' && c.status ==
'True')))?((has(oldSelf.spec.maximumExecutionTimeSeconds)?oldSelf.spec.maximumExecutionTimeSeconds:0)
== (has(self.spec.maximumExecutionTimeSeconds)?self.spec.maximumExecutionTimeSeconds:0)):true
served: true
storage: true
subresources:
Expand Down
21 changes: 15 additions & 6 deletions client-go/applyconfiguration/kueue/v1beta1/workloadspec.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions config/components/crd/bases/kueue.x-k8s.io_workloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ spec:

Defaults to true
type: boolean
maximumExecutionTimeSeconds:
description: |-
maximumExecutionTimeSeconds if provided, determines the maximum time, in seconds,
the workload can be admitted before it's automatically deactivated.

If unspecified, no execution time limit is enforced on the Workload.
format: int32
minimum: 1
type: integer
podSets:
description: |-
podSets is a list of sets of homogeneous pods, each described by a Pod spec
Expand Down Expand Up @@ -8693,6 +8702,12 @@ spec:
== ''QuotaReserved'' && c.status == ''True'')) && has(oldSelf.spec.queueName)
&& has(self.spec.queueName) ? oldSelf.spec.queueName == self.spec.queueName
: true'
- message: maximumExecutionTimeSeconds is immutable while admitted
rule: ((has(oldSelf.status) && has(oldSelf.status.conditions) && oldSelf.status.conditions.exists(c,
c.type == 'Admitted' && c.status == 'True')) && (has(self.status) && has(self.status.conditions)
&& self.status.conditions.exists(c, c.type == 'Admitted' && c.status ==
'True')))?((has(oldSelf.spec.maximumExecutionTimeSeconds)?oldSelf.spec.maximumExecutionTimeSeconds:0)
== (has(self.spec.maximumExecutionTimeSeconds)?self.spec.maximumExecutionTimeSeconds:0)):true
served: true
storage: true
subresources:
Expand Down
50 changes: 43 additions & 7 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,21 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, err
}

return r.reconcileNotReadyTimeout(ctx, req, &wl)
podsReadyRecheckAfter, err := r.reconcileNotReadyTimeout(ctx, req, &wl)
if err != nil {
return ctrl.Result{}, err
}
maxExecRecheckAfter, err := r.reconcileMaxExecutionTime(ctx, &wl)
if err != nil {
return ctrl.Result{}, err
}

// get the minimun non-zero value
recheckAfter := min(podsReadyRecheckAfter, maxExecRecheckAfter)
if recheckAfter == 0 {
recheckAfter = max(podsReadyRecheckAfter, maxExecRecheckAfter)
}
return ctrl.Result{RequeueAfter: recheckAfter}, nil
}

switch {
Expand Down Expand Up @@ -324,6 +338,28 @@ func isDisabledRequeuedByReason(w *kueue.Workload, reason string) bool {
return cond != nil && cond.Status == metav1.ConditionFalse && cond.Reason == reason
}

// reconcileMaxExecutionTime deactivates the workload if its MaximumExecutionTimeSeconds is exceeded or returns a retry after value.
func (r *WorkloadReconciler) reconcileMaxExecutionTime(ctx context.Context, wl *kueue.Workload) (time.Duration, error) {
admittedCondition := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadAdmitted)
if admittedCondition == nil || admittedCondition.Status != metav1.ConditionTrue || wl.Spec.MaximumExecutionTimeSeconds == nil {
return 0, nil
}

remainingTime := time.Duration(*wl.Spec.MaximumExecutionTimeSeconds)*time.Second - r.clock.Since(admittedCondition.LastTransitionTime.Time)
if remainingTime > 0 {
return remainingTime, nil
}

if !apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadDeactivationTarget) {
workload.SetDeactivationTarget(wl, kueue.WorkloadMaximumExecutionTimeExceeded, "exceeding the maximum execution time")
if err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true); err != nil {
return 0, err
}
r.recorder.Eventf(wl, corev1.EventTypeWarning, kueue.WorkloadMaximumExecutionTimeExceeded, "The maximum execution time (%ds) exceeded", *wl.Spec.MaximumExecutionTimeSeconds)
}
return 0, nil
}

// reconcileCheckBasedEviction returns true if Workload has been deactivated or evicted
func (r *WorkloadReconciler) reconcileCheckBasedEviction(ctx context.Context, wl *kueue.Workload) (bool, error) {
if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadEvicted) || (!workload.HasRetryChecks(wl) && !workload.HasRejectedChecks(wl)) {
Expand Down Expand Up @@ -483,24 +519,24 @@ func syncAdmissionCheckConditions(conds []kueue.AdmissionCheckState, admissionCh
return conds, shouldUpdate
}

func (r *WorkloadReconciler) reconcileNotReadyTimeout(ctx context.Context, req ctrl.Request, wl *kueue.Workload) (ctrl.Result, error) {
func (r *WorkloadReconciler) reconcileNotReadyTimeout(ctx context.Context, req ctrl.Request, wl *kueue.Workload) (time.Duration, error) {
log := ctrl.LoggerFrom(ctx)

if !workload.IsActive(wl) || apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadEvicted) {
// the workload has already been evicted by the PodsReadyTimeout or been deactivated.
return ctrl.Result{}, nil
return 0, nil
}
countingTowardsTimeout, recheckAfter := r.admittedNotReadyWorkload(wl)
if !countingTowardsTimeout {
return ctrl.Result{}, nil
return 0, nil
}
if recheckAfter > 0 {
log.V(4).Info("Workload not yet ready and did not exceed its timeout", "recheckAfter", recheckAfter)
return ctrl.Result{RequeueAfter: recheckAfter}, nil
return recheckAfter, nil
}
log.V(2).Info("Start the eviction of the workload due to exceeding the PodsReady timeout")
if deactivated, err := r.triggerDeactivationOrBackoffRequeue(ctx, wl); deactivated || err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
return 0, client.IgnoreNotFound(err)
}
message := fmt.Sprintf("Exceeded the PodsReady timeout %s", req.NamespacedName.String())
workload.SetEvictedCondition(wl, kueue.WorkloadEvictedByPodsReadyTimeout, message)
Expand All @@ -509,7 +545,7 @@ func (r *WorkloadReconciler) reconcileNotReadyTimeout(ctx context.Context, req c
cqName, _ := r.queues.ClusterQueueForWorkload(wl)
workload.ReportEvictedWorkload(r.recorder, wl, cqName, kueue.WorkloadEvictedByPodsReadyTimeout, message)
}
return ctrl.Result{}, client.IgnoreNotFound(err)
return 0, client.IgnoreNotFound(err)
}

// triggerDeactivationOrBackoffRequeue trigger deactivation of workload
Expand Down
57 changes: 55 additions & 2 deletions pkg/controller/core/workload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,9 @@ var (
)

func TestReconcile(t *testing.T) {
testStartTime := time.Now()
// the clock is primarily used with second rounded times
// use the current time trimmed.
testStartTime := time.Now().Truncate(time.Second)
fakeClock := testingclock.NewFakeClock(testStartTime)

cases := map[string]struct {
Expand All @@ -327,6 +329,7 @@ func TestReconcile(t *testing.T) {
wantWorkload *kueue.Workload
wantError error
wantEvents []utiltesting.EventRecord
wantResult reconcile.Result
reconcilerOpts []Option
}{
"assign Admission Checks from ClusterQueue.spec.AdmissionCheckStrategy": {
Expand Down Expand Up @@ -745,6 +748,7 @@ func TestReconcile(t *testing.T) {
}).
RequeueState(ptr.To[int32](1), ptr.To(metav1.NewTime(testStartTime.Add(60*time.Second).Truncate(time.Second)))).
Obj(),
wantResult: reconcile.Result{RequeueAfter: time.Minute},
},
"should set the WorkloadRequeued condition when backoff expires": {
workload: utiltesting.MakeWorkload("wl", "ns").
Expand Down Expand Up @@ -1314,6 +1318,51 @@ func TestReconcile(t *testing.T) {
}).
Obj(),
},

"admitted workload with max execution time": {
workload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
MaximumExecutionTimeSeconds(120).
AdmittedAt(true, testStartTime.Add(-time.Minute)).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid").
Obj(),
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
MaximumExecutionTimeSeconds(120).
AdmittedAt(true, testStartTime.Add(-time.Minute)).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid").
Obj(),
wantResult: reconcile.Result{RequeueAfter: time.Minute},
},

"admitted workload with max execution time - expired": {
workload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
MaximumExecutionTimeSeconds(60).
AdmittedAt(true, testStartTime.Add(-2*time.Minute)).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid").
Obj(),
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
MaximumExecutionTimeSeconds(60).
AdmittedAt(true, testStartTime.Add(-2*time.Minute)).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid").
Condition(metav1.Condition{
Type: kueue.WorkloadDeactivationTarget,
Status: metav1.ConditionTrue,
Reason: kueue.WorkloadMaximumExecutionTimeExceeded,
Message: "exceeding the maximum execution time",
}).
Obj(),
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Namespace: "ns", Name: "wl"},
EventType: "Warning",
Reason: "MaximumExecutionTimeExceeded",
Message: "The maximum execution time (60s) exceeded",
},
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -1350,12 +1399,16 @@ func TestReconcile(t *testing.T) {
}
}

_, gotError := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(tc.workload)})
gotResult, gotError := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(tc.workload)})

if diff := cmp.Diff(tc.wantError, gotError); diff != "" {
t.Errorf("unexpected reconcile error (-want/+got):\n%s", diff)
}

if diff := cmp.Diff(tc.wantResult, gotResult); diff != "" {
t.Errorf("unexpected reconcile result (-want/+got):\n%s", diff)
}

gotWorkload := &kueue.Workload{}
if err := cl.Get(ctx, client.ObjectKeyFromObject(tc.workload), gotWorkload); err != nil {
if tc.wantWorkload != nil && !errors.IsNotFound(err) {
Expand Down
11 changes: 10 additions & 1 deletion pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,14 @@ func (w *WorkloadWrapper) QuotaReservedTime(t time.Time) *WorkloadWrapper {
}

func (w *WorkloadWrapper) Admitted(a bool) *WorkloadWrapper {
return w.AdmittedAt(a, time.Now())
}

func (w *WorkloadWrapper) AdmittedAt(a bool, t time.Time) *WorkloadWrapper {
cond := metav1.Condition{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Now(),
LastTransitionTime: metav1.NewTime(t),
Reason: "ByTest",
Message: fmt.Sprintf("Admitted by ClusterQueue %s", w.Status.Admission.ClusterQueue),
}
Expand Down Expand Up @@ -350,6 +354,11 @@ func (w *WorkloadWrapper) ResourceVersion(v string) *WorkloadWrapper {
return w
}

func (w *WorkloadWrapper) MaximumExecutionTimeSeconds(v int32) *WorkloadWrapper {
w.Spec.MaximumExecutionTimeSeconds = &v
return w
}

type PodSetWrapper struct{ kueue.PodSet }

func MakePodSet(name string, count int) *PodSetWrapper {
Expand Down
9 changes: 9 additions & 0 deletions site/content/en/docs/reference/kueue.v1beta1.md
Original file line number Diff line number Diff line change
Expand Up @@ -2355,6 +2355,15 @@ Possible values are:</p>
<p>Defaults to true</p>
</td>
</tr>
<tr><td><code>maximumExecutionTimeSeconds</code><br/>
<code>int32</code>
</td>
<td>
<p>maximumExecutionTimeSeconds if provided, determines the maximum time, in seconds,
the workload can be admitted before it's automatically deactivated.</p>
<p>If unspecified, no execution time limit is enforced on the Workload.</p>
</td>
</tr>
</tbody>
</table>

Expand Down
Loading