From a60e5f782f889f0436a5e8d718a42e31aa2e153a Mon Sep 17 00:00:00 2001 From: k8s-infra-cherrypick-robot <90416843+k8s-infra-cherrypick-robot@users.noreply.github.com> Date: Wed, 7 Feb 2024 09:10:10 -0800 Subject: [PATCH] Don't assign finished workloads to newly add CQs. (#1699) Co-authored-by: Traian Schiau --- pkg/cache/cache.go | 2 +- pkg/cache/cache_test.go | 82 +++++++++++++++++++++++++++++++++++- pkg/util/testing/wrappers.go | 11 +++++ pkg/workload/workload.go | 5 +++ 4 files changed, 97 insertions(+), 3 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index a9ceec2588..73aef4c690 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -317,7 +317,7 @@ func (c *Cache) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) err return fmt.Errorf("listing workloads that match the queue: %w", err) } for i, w := range workloads.Items { - if !workload.HasQuotaReservation(&w) { + if !workload.HasQuotaReservation(&w) || workload.IsFinished(&w) { continue } c.addOrUpdateWorkload(&workloads.Items[i]) diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 77d6676c59..80a2f54b52 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -91,6 +91,7 @@ func TestCacheClusterQueueOperations(t *testing.T) { cases := []struct { name string operation func(*Cache) error + clientObbjects []client.Object wantClusterQueues map[string]*ClusterQueue wantCohorts map[string]sets.Set[string] }{ @@ -958,16 +959,93 @@ func TestCacheClusterQueueOperations(t *testing.T) { }, wantCohorts: map[string]sets.Set[string]{}, }, + { + name: "add cluster queue after finished workloads", + clientObbjects: []client.Object{ + utiltesting.MakeLocalQueue("lq1", "ns").ClusterQueue("cq1").Obj(), + utiltesting.MakeWorkload("pending", "ns").Obj(), + utiltesting.MakeWorkload("reserving", "ns").ReserveQuota( + utiltesting.MakeAdmission("cq1").Assignment(corev1.ResourceCPU, "f1", "1").Obj(), + ).Obj(), + utiltesting.MakeWorkload("admitted", "ns").ReserveQuota( + utiltesting.MakeAdmission("cq1").Assignment(corev1.ResourceCPU, "f1", "1").Obj(), + ).Admitted(true).Obj(), + utiltesting.MakeWorkload("finished", "ns").ReserveQuota( + utiltesting.MakeAdmission("cq1").Assignment(corev1.ResourceCPU, "f1", "1").Obj(), + ).Admitted(true).Finished().Obj(), + }, + operation: func(cache *Cache) error { + cache.AddOrUpdateResourceFlavor(utiltesting.MakeResourceFlavor("f1").Obj()) + err := cache.AddClusterQueue(context.Background(), + utiltesting.MakeClusterQueue("cq1"). + ResourceGroup(kueue.FlavorQuotas{ + Name: "f1", + Resources: []kueue.ResourceQuota{ + { + Name: corev1.ResourceCPU, + NominalQuota: resource.MustParse("10"), + }, + }, + }). + Obj()) + if err != nil { + return fmt.Errorf("Adding ClusterQueue: %w", err) + } + return nil + }, + wantClusterQueues: map[string]*ClusterQueue{ + "cq1": { + Name: "cq1", + NamespaceSelector: labels.Everything(), + Status: active, + Preemption: defaultPreemption, + AllocatableResourceGeneration: 1, + FlavorFungibility: defaultFlavorFungibility, + Usage: FlavorResourceQuantities{"f1": {corev1.ResourceCPU: 2000}}, + AdmittedUsage: FlavorResourceQuantities{"f1": {corev1.ResourceCPU: 1000}}, + Workloads: map[string]*workload.Info{ + "ns/reserving": { + ClusterQueue: "cq1", + TotalRequests: []workload.PodSetResources{ + { + Name: "main", + Requests: workload.Requests{corev1.ResourceCPU: 1000}, + Count: 1, + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "f1", + }, + }, + }, + }, + "ns/admitted": { + ClusterQueue: "cq1", + TotalRequests: []workload.PodSetResources{ + { + Name: "main", + Requests: workload.Requests{corev1.ResourceCPU: 1000}, + Count: 1, + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "f1", + }, + }, + }, + }, + }, + }, + }, + wantCohorts: map[string]sets.Set[string]{}, + }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - cache := New(utiltesting.NewFakeClient()) + cache := New(utiltesting.NewFakeClient(tc.clientObbjects...)) if err := tc.operation(cache); err != nil { t.Errorf("Unexpected error during test operation: %s", err) } if diff := cmp.Diff(tc.wantClusterQueues, cache.clusterQueues, - cmpopts.IgnoreFields(ClusterQueue{}, "Cohort", "Workloads", "RGByResource"), + cmpopts.IgnoreFields(ClusterQueue{}, "Cohort", "RGByResource", "ResourceGroups"), + cmpopts.IgnoreFields(workload.Info{}, "Obj", "LastAssignment"), cmpopts.IgnoreUnexported(ClusterQueue{}), cmpopts.EquateEmpty()); diff != "" { t.Errorf("Unexpected clusterQueues (-want,+got):\n%s", diff) diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index b1881b6df5..62888a607c 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -129,6 +129,17 @@ func (w *WorkloadWrapper) Admitted(a bool) *WorkloadWrapper { apimeta.SetStatusCondition(&w.Status.Conditions, cond) return w } +func (w *WorkloadWrapper) Finished() *WorkloadWrapper { + cond := metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.Now(), + Reason: "ByTest", + Message: "Finished by test", + } + apimeta.SetStatusCondition(&w.Status.Conditions, cond) + return w +} func (w *WorkloadWrapper) Creation(t time.Time) *WorkloadWrapper { w.CreationTimestamp = metav1.NewTime(t) diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index 4fd4d4dd8e..57f2477a5a 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -436,6 +436,11 @@ func IsAdmitted(w *kueue.Workload) bool { return apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadAdmitted) } +// IsFinished returns true if the workload is finished. +func IsFinished(w *kueue.Workload) bool { + return apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadFinished) +} + func RemoveFinalizer(ctx context.Context, c client.Client, wl *kueue.Workload) error { if controllerutil.RemoveFinalizer(wl, kueue.ResourceInUseFinalizerName) { return c.Update(ctx, wl)