From 3354d629e102c7a161b87db98f6d34ac5257b8aa Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Mon, 12 Aug 2024 19:04:22 +0300 Subject: [PATCH 1/4] Fix preemption while partially admitting. Add an unit test for preemption while partially admitting. --- .../flavorassigner/flavorassigner.go | 7 +++- pkg/scheduler/preemption/preemption_test.go | 3 ++ pkg/scheduler/scheduler.go | 7 +++- pkg/scheduler/scheduler_test.go | 40 +++++++++++++++++++ 4 files changed, 54 insertions(+), 3 deletions(-) diff --git a/pkg/scheduler/flavorassigner/flavorassigner.go b/pkg/scheduler/flavorassigner/flavorassigner.go index 0c69ec615c..e3c88f1019 100644 --- a/pkg/scheduler/flavorassigner/flavorassigner.go +++ b/pkg/scheduler/flavorassigner/flavorassigner.go @@ -110,7 +110,12 @@ func (a *Assignment) TotalRequestsFor(wl *workload.Info) resources.FlavorResourc usage := make(resources.FlavorResourceQuantities) for i, ps := range wl.TotalRequests { for res, q := range ps.Requests { - flv := a.PodSets[i].Flavors[res].Name + aps := a.PodSets[i] + flv := aps.Flavors[res].Name + // in case of partial admission scale down the quantity + if aps.Count != ps.Count { + q = q * int64(aps.Count) / int64(ps.Count) + } usage[resources.FlavorResource{Flavor: flv, Resource: res}] += q } } diff --git a/pkg/scheduler/preemption/preemption_test.go b/pkg/scheduler/preemption/preemption_test.go index 73bc94ac63..ab65edab5f 100644 --- a/pkg/scheduler/preemption/preemption_test.go +++ b/pkg/scheduler/preemption/preemption_test.go @@ -910,6 +910,7 @@ func TestPreemption(t *testing.T) { Mode: flavorassigner.Preempt, }, }, + Count: 1, }, { Name: "workers", @@ -919,6 +920,7 @@ func TestPreemption(t *testing.T) { Mode: flavorassigner.Preempt, }, }, + Count: 2, }, }, }, @@ -2044,6 +2046,7 @@ func singlePodSetAssignment(assignments flavorassigner.ResourceAssignment) flavo PodSets: []flavorassigner.PodSetAssignment{{ Name: kueue.DefaultPodSetName, Flavors: assignments, + Count: 1, }}, } } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 65fa68f18e..a8eeb78467 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -484,9 +484,12 @@ func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cac if wl.CanBePartiallyAdmitted() { reducer := flavorassigner.NewPodSetReducer(wl.Obj.Spec.PodSets, func(nextCounts []int32) (*partialAssignment, bool) { assignment := flvAssigner.Assign(log, nextCounts) - if assignment.RepresentativeMode() == flavorassigner.Fit { + mode := assignment.RepresentativeMode() + if mode == flavorassigner.Fit { return &partialAssignment{assignment: assignment}, true - } else if assignment.RepresentativeMode() == flavorassigner.Preempt { + } + + if mode == flavorassigner.Preempt { preemptionTargets := s.preemptor.GetTargets(log, *wl, assignment, snap) if len(preemptionTargets) > 0 { return &partialAssignment{assignment: assignment, preemptionTargets: preemptionTargets}, true diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index ec49c5a3a3..92065696bd 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -1245,6 +1245,46 @@ func TestSchedule(t *testing.T) { "eng-beta": {"eng-beta/new"}, }, }, + "partial admission single variable pod set, preempt with partial admission": { + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("new", "eng-beta"). + Queue("main"). + Priority(4). + PodSets(*utiltesting.MakePodSet("one", 30). + SetMinimumCount(10). + Request("example.com/gpu", "1"). + Obj()). + Obj(), + *utiltesting.MakeWorkload("old", "eng-beta"). + Priority(-4). + PodSets(*utiltesting.MakePodSet("one", 10). + Request("example.com/gpu", "1"). + Obj()). + ReserveQuota(utiltesting.MakeAdmission("eng-beta", "one").Assignment("example.com/gpu", "model-a", "10").AssignmentPodCount(10).Obj()). + Obj(), + }, + wantAssignments: map[string]kueue.Admission{ + "eng-beta/old": { + ClusterQueue: "eng-beta", + PodSetAssignments: []kueue.PodSetAssignment{ + { + Name: "one", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + "example.com/gpu": "model-a", + }, + ResourceUsage: corev1.ResourceList{ + "example.com/gpu": resource.MustParse("10"), + }, + Count: ptr.To[int32](10), + }, + }, + }, + }, + wantPreempted: sets.New("eng-beta/old"), + wantLeft: map[string][]string{ + "eng-beta": {"eng-beta/new"}, + }, + }, "partial admission multiple variable pod sets": { workloads: []kueue.Workload{ *utiltesting.MakeWorkload("new", "sales"). From bc9f76b44725043406e377a340fdf9b8e9fcb49a Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Tue, 13 Aug 2024 10:36:37 +0300 Subject: [PATCH 2/4] Review Remarks --- pkg/scheduler/flavorassigner/flavorassigner.go | 10 +++++----- pkg/workload/workload.go | 8 +++++--- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/scheduler/flavorassigner/flavorassigner.go b/pkg/scheduler/flavorassigner/flavorassigner.go index e3c88f1019..d73c5186ab 100644 --- a/pkg/scheduler/flavorassigner/flavorassigner.go +++ b/pkg/scheduler/flavorassigner/flavorassigner.go @@ -109,13 +109,13 @@ func (a *Assignment) ToAPI() []kueue.PodSetAssignment { func (a *Assignment) TotalRequestsFor(wl *workload.Info) resources.FlavorResourceQuantities { usage := make(resources.FlavorResourceQuantities) for i, ps := range wl.TotalRequests { + // in case of partial admission scale down the quantity + aps := a.PodSets[i] + if aps.Count != ps.Count { + ps = *ps.ScaledTo(aps.Count) + } for res, q := range ps.Requests { - aps := a.PodSets[i] flv := aps.Flavors[res].Name - // in case of partial admission scale down the quantity - if aps.Count != ps.Count { - q = q * int64(aps.Count) / int64(ps.Count) - } usage[resources.FlavorResource{Flavor: flv, Resource: res}] += q } } diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index 3b40a6589d..821e527847 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -170,9 +170,11 @@ func (psr *PodSetResources) ScaledTo(newCount int32) *PodSetResources { Flavors: maps.Clone(psr.Flavors), } - scaleDown(ret.Requests, int64(ret.Count)) - scaleUp(ret.Requests, int64(newCount)) - ret.Count = newCount + if psr.Count != 0 { + scaleDown(ret.Requests, int64(ret.Count)) + scaleUp(ret.Requests, int64(newCount)) + ret.Count = newCount + } return ret } From 75d105d83fbfb2191072c82598e1325886f4b08d Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Mon, 2 Sep 2024 10:19:50 +0300 Subject: [PATCH 3/4] Review remarks --- pkg/scheduler/flavorassigner/flavorassigner.go | 2 ++ pkg/workload/workload.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/scheduler/flavorassigner/flavorassigner.go b/pkg/scheduler/flavorassigner/flavorassigner.go index d73c5186ab..52a2d06724 100644 --- a/pkg/scheduler/flavorassigner/flavorassigner.go +++ b/pkg/scheduler/flavorassigner/flavorassigner.go @@ -106,6 +106,8 @@ func (a *Assignment) ToAPI() []kueue.PodSetAssignment { return psFlavors } +// TotalRequestsFor - returns the total quota needs of the wl, taking into account the potential +// scaling needed in case of partial admission. func (a *Assignment) TotalRequestsFor(wl *workload.Info) resources.FlavorResourceQuantities { usage := make(resources.FlavorResourceQuantities) for i, ps := range wl.TotalRequests { diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index 821e527847..cb572df184 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -170,7 +170,7 @@ func (psr *PodSetResources) ScaledTo(newCount int32) *PodSetResources { Flavors: maps.Clone(psr.Flavors), } - if psr.Count != 0 { + if psr.Count != 0 && psr.Count != newCount { scaleDown(ret.Requests, int64(ret.Count)) scaleUp(ret.Requests, int64(newCount)) ret.Count = newCount From f87de466481e0d9bbc7bea4f98430b115f4a208d Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Tue, 17 Sep 2024 16:50:59 +0300 Subject: [PATCH 4/4] Review remarks. --- pkg/scheduler/scheduler.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index a8eeb78467..65fa68f18e 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -484,12 +484,9 @@ func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cac if wl.CanBePartiallyAdmitted() { reducer := flavorassigner.NewPodSetReducer(wl.Obj.Spec.PodSets, func(nextCounts []int32) (*partialAssignment, bool) { assignment := flvAssigner.Assign(log, nextCounts) - mode := assignment.RepresentativeMode() - if mode == flavorassigner.Fit { + if assignment.RepresentativeMode() == flavorassigner.Fit { return &partialAssignment{assignment: assignment}, true - } - - if mode == flavorassigner.Preempt { + } else if assignment.RepresentativeMode() == flavorassigner.Preempt { preemptionTargets := s.preemptor.GetTargets(log, *wl, assignment, snap) if len(preemptionTargets) > 0 { return &partialAssignment{assignment: assignment, preemptionTargets: preemptionTargets}, true