Skip to content

Commit

Permalink
Don't assign finished workloads to newly add CQs. (#1699)
Browse files Browse the repository at this point in the history
Co-authored-by: Traian Schiau <[email protected]>
  • Loading branch information
k8s-infra-cherrypick-robot and trasc authored Feb 7, 2024
1 parent 5a6a389 commit a60e5f7
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 3 deletions.
2 changes: 1 addition & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (c *Cache) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) err
return fmt.Errorf("listing workloads that match the queue: %w", err)
}
for i, w := range workloads.Items {
if !workload.HasQuotaReservation(&w) {
if !workload.HasQuotaReservation(&w) || workload.IsFinished(&w) {
continue
}
c.addOrUpdateWorkload(&workloads.Items[i])
Expand Down
82 changes: 80 additions & 2 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
cases := []struct {
name string
operation func(*Cache) error
clientObbjects []client.Object
wantClusterQueues map[string]*ClusterQueue
wantCohorts map[string]sets.Set[string]
}{
Expand Down Expand Up @@ -958,16 +959,93 @@ func TestCacheClusterQueueOperations(t *testing.T) {
},
wantCohorts: map[string]sets.Set[string]{},
},
{
name: "add cluster queue after finished workloads",
clientObbjects: []client.Object{
utiltesting.MakeLocalQueue("lq1", "ns").ClusterQueue("cq1").Obj(),
utiltesting.MakeWorkload("pending", "ns").Obj(),
utiltesting.MakeWorkload("reserving", "ns").ReserveQuota(
utiltesting.MakeAdmission("cq1").Assignment(corev1.ResourceCPU, "f1", "1").Obj(),
).Obj(),
utiltesting.MakeWorkload("admitted", "ns").ReserveQuota(
utiltesting.MakeAdmission("cq1").Assignment(corev1.ResourceCPU, "f1", "1").Obj(),
).Admitted(true).Obj(),
utiltesting.MakeWorkload("finished", "ns").ReserveQuota(
utiltesting.MakeAdmission("cq1").Assignment(corev1.ResourceCPU, "f1", "1").Obj(),
).Admitted(true).Finished().Obj(),
},
operation: func(cache *Cache) error {
cache.AddOrUpdateResourceFlavor(utiltesting.MakeResourceFlavor("f1").Obj())
err := cache.AddClusterQueue(context.Background(),
utiltesting.MakeClusterQueue("cq1").
ResourceGroup(kueue.FlavorQuotas{
Name: "f1",
Resources: []kueue.ResourceQuota{
{
Name: corev1.ResourceCPU,
NominalQuota: resource.MustParse("10"),
},
},
}).
Obj())
if err != nil {
return fmt.Errorf("Adding ClusterQueue: %w", err)
}
return nil
},
wantClusterQueues: map[string]*ClusterQueue{
"cq1": {
Name: "cq1",
NamespaceSelector: labels.Everything(),
Status: active,
Preemption: defaultPreemption,
AllocatableResourceGeneration: 1,
FlavorFungibility: defaultFlavorFungibility,
Usage: FlavorResourceQuantities{"f1": {corev1.ResourceCPU: 2000}},
AdmittedUsage: FlavorResourceQuantities{"f1": {corev1.ResourceCPU: 1000}},
Workloads: map[string]*workload.Info{
"ns/reserving": {
ClusterQueue: "cq1",
TotalRequests: []workload.PodSetResources{
{
Name: "main",
Requests: workload.Requests{corev1.ResourceCPU: 1000},
Count: 1,
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "f1",
},
},
},
},
"ns/admitted": {
ClusterQueue: "cq1",
TotalRequests: []workload.PodSetResources{
{
Name: "main",
Requests: workload.Requests{corev1.ResourceCPU: 1000},
Count: 1,
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "f1",
},
},
},
},
},
},
},
wantCohorts: map[string]sets.Set[string]{},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
cache := New(utiltesting.NewFakeClient())
cache := New(utiltesting.NewFakeClient(tc.clientObbjects...))
if err := tc.operation(cache); err != nil {
t.Errorf("Unexpected error during test operation: %s", err)
}
if diff := cmp.Diff(tc.wantClusterQueues, cache.clusterQueues,
cmpopts.IgnoreFields(ClusterQueue{}, "Cohort", "Workloads", "RGByResource"),
cmpopts.IgnoreFields(ClusterQueue{}, "Cohort", "RGByResource", "ResourceGroups"),
cmpopts.IgnoreFields(workload.Info{}, "Obj", "LastAssignment"),
cmpopts.IgnoreUnexported(ClusterQueue{}),
cmpopts.EquateEmpty()); diff != "" {
t.Errorf("Unexpected clusterQueues (-want,+got):\n%s", diff)
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,17 @@ func (w *WorkloadWrapper) Admitted(a bool) *WorkloadWrapper {
apimeta.SetStatusCondition(&w.Status.Conditions, cond)
return w
}
func (w *WorkloadWrapper) Finished() *WorkloadWrapper {
cond := metav1.Condition{
Type: kueue.WorkloadFinished,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Reason: "ByTest",
Message: "Finished by test",
}
apimeta.SetStatusCondition(&w.Status.Conditions, cond)
return w
}

func (w *WorkloadWrapper) Creation(t time.Time) *WorkloadWrapper {
w.CreationTimestamp = metav1.NewTime(t)
Expand Down
5 changes: 5 additions & 0 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,11 @@ func IsAdmitted(w *kueue.Workload) bool {
return apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadAdmitted)
}

// IsFinished returns true if the workload is finished.
func IsFinished(w *kueue.Workload) bool {
return apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadFinished)
}

func RemoveFinalizer(ctx context.Context, c client.Client, wl *kueue.Workload) error {
if controllerutil.RemoveFinalizer(wl, kueue.ResourceInUseFinalizerName) {
return c.Update(ctx, wl)
Expand Down

0 comments on commit a60e5f7

Please sign in to comment.