diff --git a/pkg/scheduler/preemption/preemption.go b/pkg/scheduler/preemption/preemption.go index e5670de8b3..6cd9d3a0f7 100644 --- a/pkg/scheduler/preemption/preemption.go +++ b/pkg/scheduler/preemption/preemption.go @@ -80,6 +80,16 @@ func candidatesOnlyFromQueue(candidates []*workload.Info, clusterQueue string) [ return result } +func candidatesFromCQOrUnderThreshold(candidates []*workload.Info, clusterQueue string, threshold int32) []*workload.Info { + result := make([]*workload.Info, 0, len(candidates)) + for _, wi := range candidates { + if wi.ClusterQueue == clusterQueue || priority.Priority(wi.Obj) < threshold { + result = append(result, wi) + } + } + return result +} + // GetTargets returns the list of workloads that should be evicted in order to make room for wl. func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot) []*workload.Info { resPerFlv := resourcesRequiringPreemption(assignment) @@ -115,6 +125,10 @@ func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assig // have lower priority, and so they will not preempt the preemptor when // requeued. if borrowWithinCohort { + if !queueUnderNominalInAllRequestedResources(wlReq, cq) { + // It can only preempt workloads from another CQ if they are strictly under allowBorrowingBelowPriority. + candidates = candidatesFromCQOrUnderThreshold(candidates, wl.ClusterQueue, *thresholdPrio) + } return minimalPreemptions(wlReq, cq, snapshot, resPerFlv, candidates, true, thresholdPrio) } diff --git a/pkg/scheduler/preemption/preemption_test.go b/pkg/scheduler/preemption/preemption_test.go index 9de6c3f08d..93db8b152b 100644 --- a/pkg/scheduler/preemption/preemption_test.go +++ b/pkg/scheduler/preemption/preemption_test.go @@ -174,8 +174,8 @@ func TestPreemption(t *testing.T) { Obj(), ). Preemption(kueue.ClusterQueuePreemption{ - WithinClusterQueue: kueue.PreemptionPolicyNever, - ReclaimWithinCohort: kueue.PreemptionPolicyLowerPriority, + WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, + ReclaimWithinCohort: kueue.PreemptionPolicyAny, BorrowWithinCohort: &kueue.BorrowWithinCohort{ Policy: kueue.BorrowWithinCohortPolicyLowerPriority, MaxPriorityThreshold: ptr.To[int32](0), @@ -1021,6 +1021,78 @@ func TestPreemption(t *testing.T) { }, }), }, + "use BorrowWithinCohort; only preempt from CQ if no workloads below threshold and already above nominal": { + admitted: []kueue.Workload{ + *utiltesting.MakeWorkload("a_standard_1", ""). + Priority(1). + Request(corev1.ResourceCPU, "10"). + ReserveQuota(utiltesting.MakeAdmission("a_standard").Assignment(corev1.ResourceCPU, "default", "10").Obj()). + Obj(), + *utiltesting.MakeWorkload("a_standard_2", ""). + Priority(1). + Request(corev1.ResourceCPU, "1"). + ReserveQuota(utiltesting.MakeAdmission("a_standard").Assignment(corev1.ResourceCPU, "default", "1").Obj()). + Obj(), + *utiltesting.MakeWorkload("b_standard_1", ""). + Priority(1). + Request(corev1.ResourceCPU, "1"). + ReserveQuota(utiltesting.MakeAdmission("b_standard").Assignment(corev1.ResourceCPU, "default", "1").Obj()). + Obj(), + *utiltesting.MakeWorkload("b_standard_2", ""). + Priority(2). + Request(corev1.ResourceCPU, "1"). + ReserveQuota(utiltesting.MakeAdmission("b_standard").Assignment(corev1.ResourceCPU, "default", "1").Obj()). + Obj(), + }, + incoming: utiltesting.MakeWorkload("in", ""). + Priority(3). + Request(corev1.ResourceCPU, "1"). + Obj(), + targetCQ: "b_standard", + assignment: singlePodSetAssignment(flavorassigner.ResourceAssignment{ + corev1.ResourceCPU: &flavorassigner.FlavorAssignment{ + Name: "default", + Mode: flavorassigner.Preempt, + }, + }), + wantPreempted: sets.New("/b_standard_1"), + }, + "use BorrowWithinCohort; preempt from CQ and from other CQs with workloads below threshold": { + admitted: []kueue.Workload{ + *utiltesting.MakeWorkload("b_standard_high", ""). + Priority(2). + Request(corev1.ResourceCPU, "10"). + ReserveQuota(utiltesting.MakeAdmission("b_standard").Assignment(corev1.ResourceCPU, "default", "10").Obj()). + Obj(), + *utiltesting.MakeWorkload("b_standard_mid", ""). + Priority(1). + Request(corev1.ResourceCPU, "1"). + ReserveQuota(utiltesting.MakeAdmission("b_standard").Assignment(corev1.ResourceCPU, "default", "1").Obj()). + Obj(), + *utiltesting.MakeWorkload("a_best_effort_low", ""). + Priority(-1). + Request(corev1.ResourceCPU, "1"). + ReserveQuota(utiltesting.MakeAdmission("a_best_effort").Assignment(corev1.ResourceCPU, "default", "1").Obj()). + Obj(), + *utiltesting.MakeWorkload("a_best_effort_lower", ""). + Priority(-2). + Request(corev1.ResourceCPU, "1"). + ReserveQuota(utiltesting.MakeAdmission("a_best_effort").Assignment(corev1.ResourceCPU, "default", "1").Obj()). + Obj(), + }, + incoming: utiltesting.MakeWorkload("in", ""). + Priority(2). + Request(corev1.ResourceCPU, "2"). + Obj(), + targetCQ: "b_standard", + assignment: singlePodSetAssignment(flavorassigner.ResourceAssignment{ + corev1.ResourceCPU: &flavorassigner.FlavorAssignment{ + Name: "default", + Mode: flavorassigner.Preempt, + }, + }), + wantPreempted: sets.New("/b_standard_mid", "/a_best_effort_lower"), + }, "reclaim quota from lender": { admitted: []kueue.Workload{ *utiltesting.MakeWorkload("lend1-low", "").