Skip to content

Commit

Permalink
Reduce preemptions within ClusterQueue when preemption with borrowing…
Browse files Browse the repository at this point in the history
… is enabled (#2110)

Change-Id: I4153eb526354d0a8cd5487b47753e5a5f2213482
  • Loading branch information
alculquicondor authored May 1, 2024
1 parent 8b7dea3 commit 3e044fc
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 2 deletions.
14 changes: 14 additions & 0 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ func candidatesOnlyFromQueue(candidates []*workload.Info, clusterQueue string) [
return result
}

func candidatesFromCQOrUnderThreshold(candidates []*workload.Info, clusterQueue string, threshold int32) []*workload.Info {
result := make([]*workload.Info, 0, len(candidates))
for _, wi := range candidates {
if wi.ClusterQueue == clusterQueue || priority.Priority(wi.Obj) < threshold {
result = append(result, wi)
}
}
return result
}

// GetTargets returns the list of workloads that should be evicted in order to make room for wl.
func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot) []*workload.Info {
resPerFlv := resourcesRequiringPreemption(assignment)
Expand Down Expand Up @@ -115,6 +125,10 @@ func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assig
// have lower priority, and so they will not preempt the preemptor when
// requeued.
if borrowWithinCohort {
if !queueUnderNominalInAllRequestedResources(wlReq, cq) {
// It can only preempt workloads from another CQ if they are strictly under allowBorrowingBelowPriority.
candidates = candidatesFromCQOrUnderThreshold(candidates, wl.ClusterQueue, *thresholdPrio)
}
return minimalPreemptions(wlReq, cq, snapshot, resPerFlv, candidates, true, thresholdPrio)
}

Expand Down
76 changes: 74 additions & 2 deletions pkg/scheduler/preemption/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ func TestPreemption(t *testing.T) {
Obj(),
).
Preemption(kueue.ClusterQueuePreemption{
WithinClusterQueue: kueue.PreemptionPolicyNever,
ReclaimWithinCohort: kueue.PreemptionPolicyLowerPriority,
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
ReclaimWithinCohort: kueue.PreemptionPolicyAny,
BorrowWithinCohort: &kueue.BorrowWithinCohort{
Policy: kueue.BorrowWithinCohortPolicyLowerPriority,
MaxPriorityThreshold: ptr.To[int32](0),
Expand Down Expand Up @@ -1021,6 +1021,78 @@ func TestPreemption(t *testing.T) {
},
}),
},
"use BorrowWithinCohort; only preempt from CQ if no workloads below threshold and already above nominal": {
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("a_standard_1", "").
Priority(1).
Request(corev1.ResourceCPU, "10").
ReserveQuota(utiltesting.MakeAdmission("a_standard").Assignment(corev1.ResourceCPU, "default", "10").Obj()).
Obj(),
*utiltesting.MakeWorkload("a_standard_2", "").
Priority(1).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("a_standard").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("b_standard_1", "").
Priority(1).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("b_standard").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("b_standard_2", "").
Priority(2).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("b_standard").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
},
incoming: utiltesting.MakeWorkload("in", "").
Priority(3).
Request(corev1.ResourceCPU, "1").
Obj(),
targetCQ: "b_standard",
assignment: singlePodSetAssignment(flavorassigner.ResourceAssignment{
corev1.ResourceCPU: &flavorassigner.FlavorAssignment{
Name: "default",
Mode: flavorassigner.Preempt,
},
}),
wantPreempted: sets.New("/b_standard_1"),
},
"use BorrowWithinCohort; preempt from CQ and from other CQs with workloads below threshold": {
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("b_standard_high", "").
Priority(2).
Request(corev1.ResourceCPU, "10").
ReserveQuota(utiltesting.MakeAdmission("b_standard").Assignment(corev1.ResourceCPU, "default", "10").Obj()).
Obj(),
*utiltesting.MakeWorkload("b_standard_mid", "").
Priority(1).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("b_standard").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("a_best_effort_low", "").
Priority(-1).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("a_best_effort").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("a_best_effort_lower", "").
Priority(-2).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("a_best_effort").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
},
incoming: utiltesting.MakeWorkload("in", "").
Priority(2).
Request(corev1.ResourceCPU, "2").
Obj(),
targetCQ: "b_standard",
assignment: singlePodSetAssignment(flavorassigner.ResourceAssignment{
corev1.ResourceCPU: &flavorassigner.FlavorAssignment{
Name: "default",
Mode: flavorassigner.Preempt,
},
}),
wantPreempted: sets.New("/b_standard_mid", "/a_best_effort_lower"),
},
"reclaim quota from lender": {
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("lend1-low", "").
Expand Down

0 comments on commit 3e044fc

Please sign in to comment.