Skip to content

Commit

Permalink
Reset all Ready and Ready AdmissionChecks on Eviction as Pending (#3323)
Browse files Browse the repository at this point in the history
* Make AdmissionChecks Pending after one is in Retry state

* Reduce number of patch requests

* Improve message, fix e2e test

* Improve naming and message on retry

* Reset check on different evictions
  • Loading branch information
PBundyra authored Oct 28, 2024
1 parent 42c7b47 commit 07352e4
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 20 deletions.
4 changes: 4 additions & 0 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ func (r *WorkloadReconciler) reconcileCheckBasedEviction(ctx context.Context, wl
// at this point we know a Workload has at least one Retry AdmissionCheck
message := "At least one admission check is false"
workload.SetEvictedCondition(wl, kueue.WorkloadEvictedByAdmissionCheck, message)
workload.ResetChecksOnEviction(wl)
if err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true); err != nil {
return false, client.IgnoreNotFound(err)
}
Expand Down Expand Up @@ -416,6 +417,7 @@ func (r *WorkloadReconciler) reconcileOnLocalQueueActiveState(ctx context.Contex
}
log.V(3).Info("Workload is evicted because the LocalQueue is stopped", "localQueue", klog.KRef(wl.Namespace, wl.Spec.QueueName))
workload.SetEvictedCondition(wl, kueue.WorkloadEvictedByLocalQueueStopped, "The LocalQueue is stopped")
workload.ResetChecksOnEviction(wl)
err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true)
if err == nil {
cqName := string(lq.Spec.ClusterQueue)
Expand Down Expand Up @@ -463,6 +465,7 @@ func (r *WorkloadReconciler) reconcileOnClusterQueueActiveState(ctx context.Cont
log.V(3).Info("Workload is evicted because the ClusterQueue is stopped", "clusterQueue", klog.KRef("", cqName))
message := "The ClusterQueue is stopped"
workload.SetEvictedCondition(wl, kueue.WorkloadEvictedByClusterQueueStopped, message)
workload.ResetChecksOnEviction(wl)
err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true)
if err == nil {
workload.ReportEvictedWorkload(r.recorder, wl, cqName, kueue.WorkloadEvictedByClusterQueueStopped, message)
Expand Down Expand Up @@ -541,6 +544,7 @@ func (r *WorkloadReconciler) reconcileNotReadyTimeout(ctx context.Context, req c
}
message := fmt.Sprintf("Exceeded the PodsReady timeout %s", req.NamespacedName.String())
workload.SetEvictedCondition(wl, kueue.WorkloadEvictedByPodsReadyTimeout, message)
workload.ResetChecksOnEviction(wl)
err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true)
if err == nil {
cqName, _ := r.queues.ClusterQueueForWorkload(wl)
Expand Down
32 changes: 20 additions & 12 deletions pkg/controller/core/workload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,23 +525,29 @@ func TestReconcile(t *testing.T) {
},
},
},
"admitted workload with retry checks": {
"workload with retry checks should be evicted and checks should be pending": {
workload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid").
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
AdmissionChecks(kueue.AdmissionCheckState{
Name: "check-1",
State: kueue.CheckStateRetry,
}, kueue.AdmissionCheckState{
Name: "check-2",
State: kueue.CheckStateReady,
}).
Obj(),
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid").
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRetry,
AdmissionChecks(kueue.AdmissionCheckState{
Name: "check-1",
State: kueue.CheckStatePending,
Message: "Reset to Pending after eviction. Previously: Retry",
}, kueue.AdmissionCheckState{
Name: "check-2",
State: kueue.CheckStatePending,
Message: "Reset to Pending after eviction. Previously: Ready",
}).
Condition(metav1.Condition{
Type: "Evicted",
Expand Down Expand Up @@ -590,8 +596,9 @@ func TestReconcile(t *testing.T) {
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateReady,
Name: "check",
State: kueue.CheckStatePending,
Message: "Reset to Pending after eviction. Previously: Ready",
}).
Generation(1).
Condition(metav1.Condition{
Expand Down Expand Up @@ -684,8 +691,9 @@ func TestReconcile(t *testing.T) {
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateReady,
Name: "check",
State: kueue.CheckStatePending,
Message: "Reset to Pending after eviction. Previously: Ready",
}).
Generation(1).
Condition(metav1.Condition{
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.In
func (p *Preemptor) applyPreemptionWithSSA(ctx context.Context, w *kueue.Workload, reason, message string) error {
w = w.DeepCopy()
workload.SetEvictedCondition(w, kueue.WorkloadEvictedByPreemption, message)
workload.ResetChecksOnEviction(w)
workload.SetPreemptedCondition(w, reason, message)
return workload.ApplyAdmissionStatus(ctx, p.client, w, true)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,8 @@ func (s *Scheduler) requeueAndUpdate(ctx context.Context, e entry) {
log.V(2).Info("Workload re-queued", "workload", klog.KObj(e.Obj), "clusterQueue", klog.KRef("", e.ClusterQueue), "queue", klog.KRef(e.Obj.Namespace, e.Obj.Spec.QueueName), "requeueReason", e.requeueReason, "added", added, "status", e.status)

if e.status == notNominated || e.status == skipped {
patch := workload.AdmissionStatusPatch(e.Obj, true)
patch := workload.BaseSSAWorkload(e.Obj)
workload.AdmissionStatusPatch(e.Obj, patch, true)
reservationIsChanged := workload.UnsetQuotaReservationWithCondition(patch, "Pending", e.inadmissibleMsg, time.Now())
resourceRequestsIsChanged := workload.PropagateResourceRequests(patch, &e.Info)
if reservationIsChanged || resourceRequestsIsChanged {
Expand Down
13 changes: 13 additions & 0 deletions pkg/workload/admissionchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,19 @@ func FindAdmissionCheck(checks []kueue.AdmissionCheckState, checkName string) *k
return nil
}

// ResetChecksOnEviction sets all AdmissionChecks to Pending
func ResetChecksOnEviction(w *kueue.Workload) {
checks := w.Status.AdmissionChecks
for i := range checks {
checks[i] = kueue.AdmissionCheckState{
Name: checks[i].Name,
State: kueue.CheckStatePending,
LastTransitionTime: metav1.NewTime(time.Now()),
Message: "Reset to Pending after eviction. Previously: " + string(checks[i].State),
}
}
}

// SetAdmissionCheckState - adds or updates newCheck in the provided checks list.
func SetAdmissionCheckState(checks *[]kueue.AdmissionCheckState, newCheck kueue.AdmissionCheckState) {
if checks == nil {
Expand Down
19 changes: 14 additions & 5 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,8 +613,7 @@ func PropagateResourceRequests(w *kueue.Workload, info *Info) bool {
// AdmissionStatusPatch creates a new object based on the input workload that contains
// the admission and related conditions. The object can be used in Server-Side-Apply.
// If strict is true, resourceVersion will be part of the patch.
func AdmissionStatusPatch(w *kueue.Workload, strict bool) *kueue.Workload {
wlCopy := BaseSSAWorkload(w)
func AdmissionStatusPatch(w *kueue.Workload, wlCopy *kueue.Workload, strict bool) {
wlCopy.Status.Admission = w.Status.Admission.DeepCopy()
wlCopy.Status.RequeueState = w.Status.RequeueState.DeepCopy()
if wlCopy.Status.Admission != nil {
Expand All @@ -634,15 +633,25 @@ func AdmissionStatusPatch(w *kueue.Workload, strict bool) *kueue.Workload {
wlCopy.ResourceVersion = w.ResourceVersion
}
wlCopy.Status.AccumulatedPastExexcutionTimeSeconds = w.Status.AccumulatedPastExexcutionTimeSeconds
return wlCopy
}

func AdmissionChecksStatusPatch(w *kueue.Workload, wlCopy *kueue.Workload) {
if wlCopy.Status.AdmissionChecks == nil && w.Status.AdmissionChecks != nil {
wlCopy.Status.AdmissionChecks = make([]kueue.AdmissionCheckState, 0)
}
for _, ac := range w.Status.AdmissionChecks {
SetAdmissionCheckState(&wlCopy.Status.AdmissionChecks, ac)
}
}

// ApplyAdmissionStatus updated all the admission related status fields of a workload with SSA.
// If strict is true, resourceVersion will be part of the patch, make this call fail if Workload
// was changed.
func ApplyAdmissionStatus(ctx context.Context, c client.Client, w *kueue.Workload, strict bool) error {
patch := AdmissionStatusPatch(w, strict)
return ApplyAdmissionStatusPatch(ctx, c, patch)
wlCopy := BaseSSAWorkload(w)
AdmissionStatusPatch(w, wlCopy, strict)
AdmissionChecksStatusPatch(w, wlCopy)
return ApplyAdmissionStatusPatch(ctx, c, wlCopy)
}

// ApplyAdmissionStatusPatch applies the patch of admission related status fields of a workload with SSA.
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/singlecluster/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,13 +451,13 @@ var _ = ginkgo.Describe("Kueue", func() {
"instance-type": "on-demand",
})

ginkgo.By("setting the check as failed (Retry)", func() {
ginkgo.By("setting the check as Rejected", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed())
patch := workload.BaseSSAWorkload(createdWorkload)
workload.SetAdmissionCheckState(&patch.Status.AdmissionChecks, kueue.AdmissionCheckState{
Name: "check1",
State: kueue.CheckStateRetry,
State: kueue.CheckStateRejected,
})
g.Expect(k8sClient.Status().Patch(ctx, patch, client.Apply,
client.FieldOwner("test-admission-check-controller"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ var _ = ginkgo.Describe("Provisioning", ginkgo.Ordered, ginkgo.ContinueOnFailure

gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, wlKey, &updatedWl)).To(gomega.Succeed())
fmt.Printf("Workload %+v\n", updatedWl)

g.Expect(workload.IsEvictedByDeactivation(&updatedWl)).To(gomega.BeTrue())
util.ExpectEvictedWorkloadsTotalMetric(cq.Name, kueue.WorkloadEvictedByDeactivation, 1)
Expand Down

0 comments on commit 07352e4

Please sign in to comment.