From eca1f7d0d57c4b8b9624890d45e8dbd100c06e18 Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Thu, 31 Aug 2023 14:54:00 +0100 Subject: [PATCH 01/14] Initial commit --- internal/armada/configuration/types.go | 12 ++- internal/scheduler/constraints/constraints.go | 41 +++++---- .../scheduler/constraints/constraints_test.go | 2 +- internal/scheduler/context/context.go | 14 ++- internal/scheduler/context/context_test.go | 3 +- internal/scheduler/gang_scheduler.go | 2 +- internal/scheduler/gang_scheduler_test.go | 3 +- .../preempting_queue_scheduler_test.go | 9 +- internal/scheduler/queue_scheduler.go | 92 +++++++++++-------- internal/scheduler/queue_scheduler_test.go | 3 +- internal/scheduler/reports_test.go | 7 +- internal/scheduler/scheduling_algo.go | 2 + internal/scheduler/simulator/simulator.go | 2 + 13 files changed, 123 insertions(+), 69 deletions(-) diff --git a/internal/armada/configuration/types.go b/internal/armada/configuration/types.go index 77fc34a548c..9c53ca2bbdb 100644 --- a/internal/armada/configuration/types.go +++ b/internal/armada/configuration/types.go @@ -124,10 +124,14 @@ type SchedulingConfig struct { MaximumResourceFractionToSchedule map[string]float64 // Overrides MaximalClusterFractionToSchedule if set for the current pool. MaximumResourceFractionToScheduleByPool map[string]map[string]float64 - // Max number of jobs to schedule in each invocation of the scheduler. - MaximumJobsToSchedule uint - // Max number of gangs to schedule in each invocation of the scheduler. - MaximumGangsToSchedule uint + // Token bucket global job scheduling rate limiter settings; see + // https://pkg.go.dev/golang.org/x/time/rate#Limiter + MaximumSchedulingRate float64 `validate:"gt=0"` + MaximumSchedulingBurst int `validate:"gt=0"` + // Token bucket per-queue job scheduling rate limiter settings; see + // https://pkg.go.dev/golang.org/x/time/rate#Limiter + MaximumPerQueueSchedulingRate float64 `validate:"gt=0"` + MaximumPerQueueSchedulingBurst int `validate:"gt=0"` // Armada stores contexts associated with recent job scheduling attempts. // This setting limits the number of such contexts to store. // Contexts associated with the most recent scheduling attempt for each queue and cluster are always stored. diff --git a/internal/scheduler/constraints/constraints.go b/internal/scheduler/constraints/constraints.go index 477b121b2e4..f7d19509cc8 100644 --- a/internal/scheduler/constraints/constraints.go +++ b/internal/scheduler/constraints/constraints.go @@ -16,9 +16,12 @@ const ( UnschedulableReasonMaximumNumberOfJobsScheduled = "maximum number of jobs scheduled" UnschedulableReasonMaximumNumberOfGangsScheduled = "maximum number of gangs scheduled" UnschedulableReasonMaximumResourcesPerQueueExceeded = "maximum total resources for this queue exceeded" + UnschedulableReasonGlobalRateLimitExceeded = "global scheduling rate limit exceeded" + UnschedulableReasonQueueRateLimitExceeded = "queue scheduling rate limit exceeded" ) -// IsTerminalUnschedulableReason returns true if reason indicates it's not possible to schedule any more jobs in this round. +// IsTerminalUnschedulableReason returns true if reason indicates +// it's not possible to schedule any more jobs in this round. func IsTerminalUnschedulableReason(reason string) bool { if reason == UnschedulableReasonMaximumResourcesScheduled { return true @@ -29,15 +32,20 @@ func IsTerminalUnschedulableReason(reason string) bool { if reason == UnschedulableReasonMaximumNumberOfGangsScheduled { return true } + if reason == UnschedulableReasonGlobalRateLimitExceeded { + return true + } return false } +// IsTerminalQueueUnschedulableReason returns true if reason indicates +// it's not possible to schedule any more jobs from this queue in this round. +func IsTerminalQueueUnschedulableReason(reason string) bool { + return reason == UnschedulableReasonQueueRateLimitExceeded +} + // SchedulingConstraints contains scheduling constraints, e.g., per-queue resource limits. type SchedulingConstraints struct { - // Max number of jobs to scheduler per lease jobs call. - MaximumJobsToSchedule uint - // Max number of jobs to scheduler per lease jobs call. - MaximumGangsToSchedule uint // Max number of jobs to consider for a queue before giving up. MaxQueueLookback uint // Jobs leased to this executor must be at least this large. @@ -82,8 +90,6 @@ func SchedulingConstraintsFromSchedulingConfig( maximumResourceFractionToSchedule = m } return SchedulingConstraints{ - MaximumJobsToSchedule: config.MaximumJobsToSchedule, - MaximumGangsToSchedule: config.MaximumGangsToSchedule, MaxQueueLookback: config.MaxQueueLookback, MinimumJobSize: minimumJobSize, MaximumResourcesToSchedule: absoluteFromRelativeLimits(totalResources, maximumResourceFractionToSchedule), @@ -99,21 +105,22 @@ func absoluteFromRelativeLimits(totalResources schedulerobjects.ResourceList, re return absoluteLimits } -func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *schedulercontext.SchedulingContext) (bool, string, error) { - // MaximumJobsToSchedule check. - if constraints.MaximumJobsToSchedule != 0 && sctx.NumScheduledJobs == int(constraints.MaximumJobsToSchedule) { - return false, UnschedulableReasonMaximumNumberOfJobsScheduled, nil +func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *schedulercontext.SchedulingContext, queue string) (bool, string, error) { + // MaximumResourcesToSchedule check. + if !sctx.ScheduledResources.IsStrictlyLessOrEqual(constraints.MaximumResourcesToSchedule) { + return false, UnschedulableReasonMaximumResourcesScheduled, nil } - // MaximumGangsToSchedule check. - if constraints.MaximumGangsToSchedule != 0 && sctx.NumScheduledGangs == int(constraints.MaximumGangsToSchedule) { - return false, UnschedulableReasonMaximumNumberOfGangsScheduled, nil + // Global rate limiter check. + if sctx.Limiter != nil && sctx.Limiter.Tokens() <= 0 { + return false, UnschedulableReasonGlobalRateLimitExceeded, nil } - // MaximumResourcesToSchedule check. - if !sctx.ScheduledResources.IsStrictlyLessOrEqual(constraints.MaximumResourcesToSchedule) { - return false, UnschedulableReasonMaximumResourcesScheduled, nil + // Per-queue rate limiter check. + if qctx := sctx.QueueSchedulingContexts[queue]; qctx != nil && qctx.Limiter != nil && qctx.Limiter.Tokens() <= 0 { + return false, UnschedulableReasonQueueRateLimitExceeded, nil } + return true, "", nil } diff --git a/internal/scheduler/constraints/constraints_test.go b/internal/scheduler/constraints/constraints_test.go index e387bf60ea7..f0bb38ca6b0 100644 --- a/internal/scheduler/constraints/constraints_test.go +++ b/internal/scheduler/constraints/constraints_test.go @@ -21,7 +21,7 @@ func TestConstraints(t *testing.T) { }{} // TODO: Add tests. for name, tc := range tests { t.Run(name, func(t *testing.T) { - ok, unschedulableReason, err := tc.constraints.CheckRoundConstraints(tc.sctx) + ok, unschedulableReason, err := tc.constraints.CheckRoundConstraints(tc.sctx, tc.queue) require.NoError(t, err) require.Equal(t, tc.globalUnschedulableReason == "", ok) require.Equal(t, tc.globalUnschedulableReason, unschedulableReason) diff --git a/internal/scheduler/context/context.go b/internal/scheduler/context/context.go index 30f2c87e6ec..f7af5fff2d4 100644 --- a/internal/scheduler/context/context.go +++ b/internal/scheduler/context/context.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" "golang.org/x/exp/maps" "golang.org/x/exp/slices" + "golang.org/x/time/rate" "github.com/armadaproject/armada/internal/armada/configuration" "github.com/armadaproject/armada/internal/common/armadaerrors" @@ -38,6 +39,8 @@ type SchedulingContext struct { DefaultPriorityClass string // Determines how fairness is computed. FairnessCostProvider fairness.FairnessCostProvider + // Limits job scheduling rate globally across all queues. + Limiter *rate.Limiter // Sum of queue weights across all queues. WeightSum float64 // Per-queue scheduling contexts. @@ -73,6 +76,7 @@ func NewSchedulingContext( priorityClasses map[string]types.PriorityClass, defaultPriorityClass string, fairnessCostProvider fairness.FairnessCostProvider, + limiter *rate.Limiter, totalResources schedulerobjects.ResourceList, ) *SchedulingContext { return &SchedulingContext{ @@ -82,6 +86,7 @@ func NewSchedulingContext( PriorityClasses: priorityClasses, DefaultPriorityClass: defaultPriorityClass, FairnessCostProvider: fairnessCostProvider, + Limiter: limiter, QueueSchedulingContexts: make(map[string]*QueueSchedulingContext), TotalResources: totalResources.DeepCopy(), ScheduledResources: schedulerobjects.NewResourceListWithDefaultSize(), @@ -110,7 +115,11 @@ func (sctx *SchedulingContext) ClearUnfeasibleSchedulingKeys() { sctx.UnfeasibleSchedulingKeys = make(map[schedulerobjects.SchedulingKey]*JobSchedulingContext) } -func (sctx *SchedulingContext) AddQueueSchedulingContext(queue string, weight float64, initialAllocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]) error { +func (sctx *SchedulingContext) AddQueueSchedulingContext( + queue string, weight float64, + initialAllocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string], + limiter *rate.Limiter, +) error { if _, ok := sctx.QueueSchedulingContexts[queue]; ok { return errors.WithStack(&armadaerrors.ErrInvalidArgument{ Name: "queue", @@ -134,6 +143,7 @@ func (sctx *SchedulingContext) AddQueueSchedulingContext(queue string, weight fl ExecutorId: sctx.ExecutorId, Queue: queue, Weight: weight, + Limiter: limiter, Allocated: allocated, AllocatedByPriorityClass: initialAllocatedByPriorityClass, ScheduledResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]), @@ -335,6 +345,8 @@ type QueueSchedulingContext struct { Queue string // Determines the fair share of this queue relative to other queues. Weight float64 + // Limits job scheduling rate for this queue. + Limiter *rate.Limiter // Total resources assigned to the queue across all clusters by priority class priority. // Includes jobs scheduled during this invocation of the scheduler. Allocated schedulerobjects.ResourceList diff --git a/internal/scheduler/context/context_test.go b/internal/scheduler/context/context_test.go index 3b932f30540..67fdabd483f 100644 --- a/internal/scheduler/context/context_test.go +++ b/internal/scheduler/context/context_test.go @@ -43,6 +43,7 @@ func TestSchedulingContextAccounting(t *testing.T) { testfixtures.TestPriorityClasses, testfixtures.TestDefaultPriorityClass, fairnessCostProvider, + nil, totalResources, ) priorityFactorByQueue := map[string]float64{"A": 1, "B": 1} @@ -52,7 +53,7 @@ func TestSchedulingContextAccounting(t *testing.T) { }, } for _, queue := range []string{"A", "B"} { - err := sctx.AddQueueSchedulingContext(queue, priorityFactorByQueue[queue], allocatedByQueueAndPriorityClass[queue]) + err := sctx.AddQueueSchedulingContext(queue, priorityFactorByQueue[queue], allocatedByQueueAndPriorityClass[queue], nil) require.NoError(t, err) } diff --git a/internal/scheduler/gang_scheduler.go b/internal/scheduler/gang_scheduler.go index f1a39e31ead..24bc6a2b835 100644 --- a/internal/scheduler/gang_scheduler.go +++ b/internal/scheduler/gang_scheduler.go @@ -42,7 +42,7 @@ func (sch *GangScheduler) SkipUnsuccessfulSchedulingKeyCheck() { func (sch *GangScheduler) Schedule(ctx context.Context, gctx *schedulercontext.GangSchedulingContext) (ok bool, unschedulableReason string, err error) { // Exit immediately if this is a new gang and we've hit any round limits. if !gctx.AllJobsEvicted { - if ok, unschedulableReason, err = sch.constraints.CheckRoundConstraints(sch.schedulingContext); err != nil || !ok { + if ok, unschedulableReason, err = sch.constraints.CheckRoundConstraints(sch.schedulingContext, gctx.Queue); err != nil || !ok { return } } diff --git a/internal/scheduler/gang_scheduler_test.go b/internal/scheduler/gang_scheduler_test.go index bab895a0421..fb7a55bc6d4 100644 --- a/internal/scheduler/gang_scheduler_test.go +++ b/internal/scheduler/gang_scheduler_test.go @@ -340,10 +340,11 @@ func TestGangScheduler(t *testing.T) { tc.SchedulingConfig.Preemption.PriorityClasses, tc.SchedulingConfig.Preemption.DefaultPriorityClass, fairnessCostProvider, + nil, tc.TotalResources, ) for queue, priorityFactor := range priorityFactorByQueue { - err := sctx.AddQueueSchedulingContext(queue, priorityFactor, nil) + err := sctx.AddQueueSchedulingContext(queue, priorityFactor, nil, nil) require.NoError(t, err) } constraints := schedulerconstraints.SchedulingConstraintsFromSchedulingConfig( diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index 923d77296bc..b2919fd2219 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -1367,11 +1367,12 @@ func TestPreemptingQueueScheduler(t *testing.T) { tc.SchedulingConfig.Preemption.PriorityClasses, tc.SchedulingConfig.Preemption.DefaultPriorityClass, fairnessCostProvider, + nil, tc.TotalResources, ) for queue, priorityFactor := range tc.PriorityFactorByQueue { weight := 1 / priorityFactor - err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClass[queue]) + err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClass[queue], nil) require.NoError(t, err) } constraints := schedulerconstraints.SchedulingConstraintsFromSchedulingConfig( @@ -1645,11 +1646,12 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { tc.SchedulingConfig.Preemption.PriorityClasses, tc.SchedulingConfig.Preemption.DefaultPriorityClass, fairnessCostProvider, + nil, nodeDb.TotalResources(), ) for queue, priorityFactor := range priorityFactorByQueue { weight := 1 / priorityFactor - err := sctx.AddQueueSchedulingContext(queue, weight, make(schedulerobjects.QuantityByTAndResourceType[string])) + err := sctx.AddQueueSchedulingContext(queue, weight, make(schedulerobjects.QuantityByTAndResourceType[string]), nil) require.NoError(b, err) } constraints := schedulerconstraints.SchedulingConstraintsFromSchedulingConfig( @@ -1706,11 +1708,12 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { tc.SchedulingConfig.Preemption.PriorityClasses, tc.SchedulingConfig.Preemption.DefaultPriorityClass, fairnessCostProvider, + nil, nodeDb.TotalResources(), ) for queue, priorityFactor := range priorityFactorByQueue { weight := 1 / priorityFactor - err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClass[queue]) + err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClass[queue], nil) require.NoError(b, err) } sch := NewPreemptingQueueScheduler( diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index ba6c223f49a..680cc8b7161 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -254,10 +254,13 @@ func (it *QueuedGangIterator) hitLookbackLimit() bool { // Specifically, it yields the next gang in the queue with smallest fraction of its fair share, // where the fraction of fair share computation includes the yielded gang. type CandidateGangIterator struct { - queueProvier fairness.QueueRepository + queueRepository fairness.QueueRepository fairnessCostProvider fairness.FairnessCostProvider // If true, this iterator only yields gangs where all jobs are evicted. onlyYieldEvicted bool + // If, e.g., onlyYieldEvictedByQueue["A"] is true, + // this iterator only yields gangs where all jobs are evicted for queue A. + onlyYieldEvictedByQueue map[string]bool // Reusable buffer to avoid allocations. buffer schedulerobjects.ResourceList // Priority queue containing per-queue iterators. @@ -266,15 +269,16 @@ type CandidateGangIterator struct { } func NewCandidateGangIterator( - queueProvier fairness.QueueRepository, + queueRepository fairness.QueueRepository, fairnessCostProvider fairness.FairnessCostProvider, iteratorsByQueue map[string]*QueuedGangIterator, ) (*CandidateGangIterator, error) { it := &CandidateGangIterator{ - queueProvier: queueProvier, - fairnessCostProvider: fairnessCostProvider, - buffer: schedulerobjects.NewResourceListWithDefaultSize(), - pq: make(QueueCandidateGangIteratorPQ, 0, len(iteratorsByQueue)), + queueRepository: queueRepository, + fairnessCostProvider: fairnessCostProvider, + onlyYieldEvictedByQueue: make(map[string]bool), + buffer: schedulerobjects.NewResourceListWithDefaultSize(), + pq: make(QueueCandidateGangIteratorPQ, 0, len(iteratorsByQueue)), } for queue, queueIt := range iteratorsByQueue { if _, err := it.updateAndPushPQItem(it.newPQItem(queue, queueIt)); err != nil { @@ -288,6 +292,48 @@ func (it *CandidateGangIterator) OnlyYieldEvicted() { it.onlyYieldEvicted = true } +func (it *CandidateGangIterator) OnlyYieldEvictedForQueue(queue string) { + it.onlyYieldEvictedByQueue[queue] = true +} + +// Clear removes the first item in the iterator. +// If it.onlyYieldEvicted is true, any consecutive non-evicted jobs are also removed. +func (it *CandidateGangIterator) Clear() error { + if len(it.pq) == 0 { + return nil + } + item := heap.Pop(&it.pq).(*QueueCandidateGangIteratorItem) + if err := item.it.Clear(); err != nil { + return err + } + if _, err := it.updateAndPushPQItem(item); err != nil { + return err + } + + // If set to only yield evicted gangs, drop any queues for which the next gang is non-evicted here. + // We assume here that all evicted jobs appear before non-evicted jobs in the queue. + // Hence, it's safe to drop a queue if the first job is non-evicted. + if it.onlyYieldEvicted { + for len(it.pq) > 0 && !it.pq[0].gctx.AllJobsEvicted { + heap.Pop(&it.pq) + } + } else { + // Same check as above on a per-queue basis. + for len(it.pq) > 0 && it.onlyYieldEvictedByQueue[it.pq[0].gctx.Queue] && !it.pq[0].gctx.AllJobsEvicted { + heap.Pop(&it.pq) + } + } + return nil +} + +func (it *CandidateGangIterator) Peek() (*schedulercontext.GangSchedulingContext, error) { + if len(it.pq) == 0 { + // No queued jobs left. + return nil, nil + } + return it.pq[0].gctx, nil +} + func (it *CandidateGangIterator) newPQItem(queue string, queueIt *QueuedGangIterator) *QueueCandidateGangIteratorItem { return &QueueCandidateGangIteratorItem{ queue: queue, @@ -303,8 +349,9 @@ func (it *CandidateGangIterator) updateAndPushPQItem(item *QueueCandidateGangIte return false, nil } if it.onlyYieldEvicted && !item.gctx.AllJobsEvicted { - // We assume here that all evicted jobs appear before non-evicted jobs in the queue. - // Hence, it's safe to drop a queue once a non-evicted job has been seen. + return false, nil + } + if it.onlyYieldEvictedByQueue[item.gctx.Queue] && !item.gctx.AllJobsEvicted { return false, nil } heap.Push(&it.pq, item) @@ -335,7 +382,7 @@ func (it *CandidateGangIterator) updatePQItem(item *QueueCandidateGangIteratorIt // queueCostWithGctx returns the cost associated with a queue if gctx were to be scheduled. func (it *CandidateGangIterator) queueCostWithGctx(gctx *schedulercontext.GangSchedulingContext) (float64, error) { - queue, ok := it.queueProvier.GetQueue(gctx.Queue) + queue, ok := it.queueRepository.GetQueue(gctx.Queue) if !ok { return 0, errors.Errorf("unknown queue %s", gctx.Queue) } @@ -345,33 +392,6 @@ func (it *CandidateGangIterator) queueCostWithGctx(gctx *schedulercontext.GangSc return it.fairnessCostProvider.CostFromAllocationAndWeight(it.buffer, queue.GetWeight()), nil } -// Clear removes the first item in the iterator. -// If it.onlyYieldEvicted is true, any consecutive non-evicted jobs are also removed. -func (it *CandidateGangIterator) Clear() error { - if len(it.pq) == 0 { - return nil - } - item := heap.Pop(&it.pq).(*QueueCandidateGangIteratorItem) - if err := item.it.Clear(); err != nil { - return err - } - if _, err := it.updateAndPushPQItem(item); err != nil { - return err - } - for len(it.pq) > 0 && it.onlyYieldEvicted && !it.pq[0].gctx.AllJobsEvicted { - heap.Pop(&it.pq) - } - return nil -} - -func (it *CandidateGangIterator) Peek() (*schedulercontext.GangSchedulingContext, error) { - if len(it.pq) == 0 { - // No queued jobs left. - return nil, nil - } - return it.pq[0].gctx, nil -} - // Priority queue used by CandidateGangIterator to determine from which queue to schedule the next job. type QueueCandidateGangIteratorPQ []*QueueCandidateGangIteratorItem diff --git a/internal/scheduler/queue_scheduler_test.go b/internal/scheduler/queue_scheduler_test.go index 02ea3929aaa..2a2afcc552a 100644 --- a/internal/scheduler/queue_scheduler_test.go +++ b/internal/scheduler/queue_scheduler_test.go @@ -473,11 +473,12 @@ func TestQueueScheduler(t *testing.T) { tc.SchedulingConfig.Preemption.PriorityClasses, tc.SchedulingConfig.Preemption.DefaultPriorityClass, fairnessCostProvider, + nil, tc.TotalResources, ) for queue, priorityFactor := range tc.PriorityFactorByQueue { weight := 1 / priorityFactor - err := sctx.AddQueueSchedulingContext(queue, weight, tc.InitialAllocatedByQueueAndPriorityClass[queue]) + err := sctx.AddQueueSchedulingContext(queue, weight, tc.InitialAllocatedByQueueAndPriorityClass[queue], nil) require.NoError(t, err) } constraints := schedulerconstraints.SchedulingConstraintsFromSchedulingConfig( diff --git a/internal/scheduler/reports_test.go b/internal/scheduler/reports_test.go index fc96cc1b25e..b3c8f568d38 100644 --- a/internal/scheduler/reports_test.go +++ b/internal/scheduler/reports_test.go @@ -246,7 +246,7 @@ func withSuccessfulJobSchedulingContext(sctx *schedulercontext.SchedulingContext } qctx := sctx.QueueSchedulingContexts[queue] if qctx == nil { - if err := sctx.AddQueueSchedulingContext(queue, 1.0, make(schedulerobjects.QuantityByTAndResourceType[string])); err != nil { + if err := sctx.AddQueueSchedulingContext(queue, 1.0, make(schedulerobjects.QuantityByTAndResourceType[string]), nil); err != nil { panic(err) } qctx = sctx.QueueSchedulingContexts[queue] @@ -266,7 +266,7 @@ func withPreemptingJobSchedulingContext(sctx *schedulercontext.SchedulingContext } qctx := sctx.QueueSchedulingContexts[queue] if qctx == nil { - if err := sctx.AddQueueSchedulingContext(queue, 1.0, make(schedulerobjects.QuantityByTAndResourceType[string])); err != nil { + if err := sctx.AddQueueSchedulingContext(queue, 1.0, make(schedulerobjects.QuantityByTAndResourceType[string]), nil); err != nil { panic(err) } qctx = sctx.QueueSchedulingContexts[queue] @@ -286,7 +286,7 @@ func withUnsuccessfulJobSchedulingContext(sctx *schedulercontext.SchedulingConte } qctx := sctx.QueueSchedulingContexts[queue] if qctx == nil { - if err := sctx.AddQueueSchedulingContext(queue, 1.0, make(schedulerobjects.QuantityByTAndResourceType[string])); err != nil { + if err := sctx.AddQueueSchedulingContext(queue, 1.0, make(schedulerobjects.QuantityByTAndResourceType[string]), nil); err != nil { panic(err) } qctx = sctx.QueueSchedulingContexts[queue] @@ -304,6 +304,7 @@ func testSchedulingContext(executorId string) *schedulercontext.SchedulingContex nil, "", nil, + nil, schedulerobjects.ResourceList{}, ) sctx.Started = time.Time{} diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index 2c925d465a5..72020430701 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -12,6 +12,7 @@ import ( "github.com/sirupsen/logrus" "golang.org/x/exp/maps" "golang.org/x/exp/slices" + "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/util/clock" "github.com/armadaproject/armada/internal/armada/configuration" @@ -42,6 +43,7 @@ type FairSchedulingAlgo struct { executorRepository database.ExecutorRepository queueRepository database.QueueRepository schedulingContextRepository *SchedulingContextRepository + limiter *rate.Limiter maxSchedulingDuration time.Duration // Order in which to schedule executor groups. // Executors are grouped by either id (i.e., individually) or by pool. diff --git a/internal/scheduler/simulator/simulator.go b/internal/scheduler/simulator/simulator.go index 97666a7a9fc..5d157e87e01 100644 --- a/internal/scheduler/simulator/simulator.go +++ b/internal/scheduler/simulator/simulator.go @@ -415,6 +415,7 @@ func (s *Simulator) handleScheduleEvent() error { s.schedulingConfig.Preemption.PriorityClasses, s.schedulingConfig.Preemption.DefaultPriorityClass, fairnessCostProvider, + nil, totalResources, ) for _, queue := range s.testCase.Queues { @@ -422,6 +423,7 @@ func (s *Simulator) handleScheduleEvent() error { queue.Name, queue.Weight, s.allocationByPoolAndQueueAndPriorityClass[pool.Name][queue.Name], + nil, ) if err != nil { return err From 86cbf41972304000e0f95bbe90ffbd3adb839b4f Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Thu, 31 Aug 2023 16:46:59 +0100 Subject: [PATCH 02/14] Add rate limiters --- internal/armada/server/lease.go | 32 +++++++++++++++---- internal/scheduler/context/context.go | 10 ++++++ internal/scheduler/queue_scheduler.go | 4 +++ internal/scheduler/scheduling_algo.go | 21 ++++++++++-- .../scheduler/testfixtures/testfixtures.go | 10 ------ 5 files changed, 57 insertions(+), 20 deletions(-) diff --git a/internal/armada/server/lease.go b/internal/armada/server/lease.go index 508206d9758..0e872c02af6 100644 --- a/internal/armada/server/lease.go +++ b/internal/armada/server/lease.go @@ -18,6 +18,7 @@ import ( log "github.com/sirupsen/logrus" "golang.org/x/exp/maps" "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/utils/clock" @@ -59,6 +60,10 @@ type AggregatedQueueServer struct { schedulingInfoRepository repository.SchedulingInfoRepository decompressorPool *pool.ObjectPool clock clock.Clock + // Global job scheduling rate-limiter. + limiter *rate.Limiter + // Per-queue job scheduling rate-limiters. + limiterByQueue map[string]*rate.Limiter // For storing reports of scheduling attempts. SchedulingContextRepository *scheduler.SchedulingContextRepository // Stores the most recent NodeDb for each executor. @@ -98,12 +103,16 @@ func NewAggregatedQueueServer( return compress.NewZlibDecompressor(), nil }), &poolConfig) return &AggregatedQueueServer{ - permissions: permissions, - schedulingConfig: schedulingConfig, - jobRepository: jobRepository, - queueRepository: queueRepository, - usageRepository: usageRepository, - eventStore: eventStore, + permissions: permissions, + schedulingConfig: schedulingConfig, + jobRepository: jobRepository, + queueRepository: queueRepository, + usageRepository: usageRepository, + eventStore: eventStore, + limiter: rate.NewLimiter( + rate.Limit(schedulingConfig.MaximumSchedulingRate), + schedulingConfig.MaximumSchedulingBurst, + ), schedulingInfoRepository: schedulingInfoRepository, decompressorPool: decompressorPool, executorRepository: executorRepository, @@ -491,6 +500,7 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL q.schedulingConfig.Preemption.PriorityClasses, q.schedulingConfig.Preemption.DefaultPriorityClass, fairnessCostProvider, + q.limiter, totalResources, ) for queue, priorityFactor := range priorityFactorByQueue { @@ -502,7 +512,15 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL if priorityFactor > 0 { weight = 1 / priorityFactor } - if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClassForPool[queue]); err != nil { + queueLimiter, ok := q.limiterByQueue[queue] + if !ok { + // Create per-queue limiters lazily. + queueLimiter = rate.NewLimiter( + rate.Limit(q.schedulingConfig.MaximumPerQueueSchedulingRate), + q.schedulingConfig.MaximumPerQueueSchedulingBurst, + ) + } + if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClassForPool[queue], queueLimiter); err != nil { return nil, err } } diff --git a/internal/scheduler/context/context.go b/internal/scheduler/context/context.go index f7af5fff2d4..929ef034926 100644 --- a/internal/scheduler/context/context.go +++ b/internal/scheduler/context/context.go @@ -259,6 +259,11 @@ func (sctx *SchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingContex sctx.ScheduledResources.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests) sctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) sctx.NumScheduledJobs++ + + // Remove a token from the rate-limiter bucket. + // We don't check the return value here as we allow exceeding the rate limit by one gang. + // Rather, we check whether the number of tokens is positive before scheduling a new job. + sctx.Limiter.AllowN(time.Now(), 1) } } return evictedInThisRound, nil @@ -493,6 +498,11 @@ func (qctx *QueueSchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingC } else { qctx.SuccessfulJobSchedulingContexts[jctx.JobId] = jctx qctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) + + // Remove a token from the rate-limiter bucket. + // We don't check the return value here as we allow exceeding the rate limit by one gang. + // Rather, we check whether the number of tokens is positive before scheduling a new job. + qctx.Limiter.AllowN(time.Now(), 1) } } else { qctx.UnsuccessfulJobSchedulingContexts[jctx.JobId] = jctx diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index 680cc8b7161..3a4a0fe3a33 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -102,6 +102,10 @@ func (sch *QueueScheduler) Schedule(ctx context.Context) (*SchedulerResult, erro // If unschedulableReason indicates no more new jobs can be scheduled, // instruct the underlying iterator to only yield evicted jobs from now on. sch.candidateGangIterator.OnlyYieldEvicted() + } else if schedulerconstraints.IsTerminalQueueUnschedulableReason(unschedulableReason) { + // If unschedulableReason indicates no more new jobs can be scheduled for this queue, + // instruct the underlying iterator to only yield evicted jobs for this queue from now on. + sch.candidateGangIterator.OnlyYieldEvictedForQueue(gctx.Queue) } // Clear() to get the next gang in order of smallest fair share. // Calling clear here ensures the gang scheduled in this iteration is accounted for. diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index 72020430701..d183b9e14c5 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -43,8 +43,12 @@ type FairSchedulingAlgo struct { executorRepository database.ExecutorRepository queueRepository database.QueueRepository schedulingContextRepository *SchedulingContextRepository - limiter *rate.Limiter - maxSchedulingDuration time.Duration + // Global job scheduling rate-limiter. + limiter *rate.Limiter + // Per-queue job scheduling rate-limiters. + limiterByQueue map[string]*rate.Limiter + // Max amount of time each scheduling round is allowed to take. + maxSchedulingDuration time.Duration // Order in which to schedule executor groups. // Executors are grouped by either id (i.e., individually) or by pool. executorGroupsToSchedule []string @@ -70,6 +74,8 @@ func NewFairSchedulingAlgo( executorRepository: executorRepository, queueRepository: queueRepository, schedulingContextRepository: schedulingContextRepository, + limiter: rate.NewLimiter(rate.Limit(config.MaximumSchedulingRate), config.MaximumSchedulingBurst), + limiterByQueue: make(map[string]*rate.Limiter), maxSchedulingDuration: maxSchedulingDuration, rand: util.NewThreadsafeRand(time.Now().UnixNano()), clock: clock.RealClock{}, @@ -374,6 +380,7 @@ func (l *FairSchedulingAlgo) scheduleOnExecutors( l.schedulingConfig.Preemption.PriorityClasses, l.schedulingConfig.Preemption.DefaultPriorityClass, fairnessCostProvider, + l.limiter, totalResources, ) for queue, priorityFactor := range fsctx.priorityFactorByQueue { @@ -389,7 +396,15 @@ func (l *FairSchedulingAlgo) scheduleOnExecutors( if priorityFactor > 0 { weight = 1 / priorityFactor } - if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByPriorityClass); err != nil { + queueLimiter, ok := l.limiterByQueue[queue] + if !ok { + // Create per-queue limiters lazily. + queueLimiter = rate.NewLimiter( + rate.Limit(l.schedulingConfig.MaximumPerQueueSchedulingRate), + l.schedulingConfig.MaximumPerQueueSchedulingBurst, + ) + } + if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByPriorityClass, queueLimiter); err != nil { return nil, nil, err } } diff --git a/internal/scheduler/testfixtures/testfixtures.go b/internal/scheduler/testfixtures/testfixtures.go index 7c6e01f39c4..a00348651e1 100644 --- a/internal/scheduler/testfixtures/testfixtures.go +++ b/internal/scheduler/testfixtures/testfixtures.go @@ -166,16 +166,6 @@ func WithIndexedResourcesConfig(indexResources []configuration.IndexedResource, return config } -func WithMaxJobsToScheduleConfig(n uint, config configuration.SchedulingConfig) configuration.SchedulingConfig { - config.MaximumJobsToSchedule = n - return config -} - -func WithMaxGangsToScheduleConfig(n uint, config configuration.SchedulingConfig) configuration.SchedulingConfig { - config.MaximumGangsToSchedule = n - return config -} - func WithMaxLookbackPerQueueConfig(n uint, config configuration.SchedulingConfig) configuration.SchedulingConfig { // For legacy reasons, it's called QueueLeaseBatchSize in config. config.MaxQueueLookback = n From 84f19d5def3ea2418e5d7c82789c60269f9825c7 Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Thu, 31 Aug 2023 17:26:54 +0100 Subject: [PATCH 03/14] go mod tidy --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index be85ad6e213..8655357409a 100644 --- a/go.mod +++ b/go.mod @@ -93,6 +93,7 @@ require ( github.com/prometheus/common v0.37.0 github.com/sanity-io/litter v1.5.5 github.com/segmentio/fasthash v1.0.3 + golang.org/x/time v0.3.0 google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 ) @@ -195,7 +196,6 @@ require ( golang.org/x/sys v0.7.0 // indirect golang.org/x/term v0.7.0 // indirect golang.org/x/text v0.9.0 // indirect - golang.org/x/time v0.3.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect google.golang.org/protobuf v1.30.0 // indirect From 07fb8be0d7aa8395369d820d5b2993c9cb31b593 Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Fri, 1 Sep 2023 17:19:42 +0100 Subject: [PATCH 04/14] Updates --- config/armada/config.yaml | 4 + internal/armada/server/lease.go | 1 + internal/scheduler/constraints/constraints.go | 9 +- internal/scheduler/context/context.go | 24 ++++-- internal/scheduler/gang_scheduler.go | 15 ++++ internal/scheduler/nodedb/nodedb.go | 1 + .../preempting_queue_scheduler_test.go | 84 +++++++++++++------ internal/scheduler/queue_scheduler_test.go | 58 ++++++++----- internal/scheduler/scheduling_algo.go | 1 + .../scheduler/testfixtures/testfixtures.go | 27 ++++-- 10 files changed, 158 insertions(+), 66 deletions(-) diff --git a/config/armada/config.yaml b/config/armada/config.yaml index cb9b8e56681..6a752a2ce04 100644 --- a/config/armada/config.yaml +++ b/config/armada/config.yaml @@ -67,6 +67,10 @@ scheduling: maximumResourceFractionToSchedule: memory: 1.0 cpu: 1.0 + maximumSchedulingRate: 100.0 + maximumSchedulingBurst: 200 + maximumPerQueueSchedulingRate: 50.0 + maximumPerQueueSchedulingBurst: 100 maxJobSchedulingContextsPerExecutor: 10000 lease: expireAfter: 15m diff --git a/internal/armada/server/lease.go b/internal/armada/server/lease.go index 0e872c02af6..4d0a009594e 100644 --- a/internal/armada/server/lease.go +++ b/internal/armada/server/lease.go @@ -519,6 +519,7 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL rate.Limit(q.schedulingConfig.MaximumPerQueueSchedulingRate), q.schedulingConfig.MaximumPerQueueSchedulingBurst, ) + q.limiterByQueue[queue] = queueLimiter } if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClassForPool[queue], queueLimiter); err != nil { return nil, err diff --git a/internal/scheduler/constraints/constraints.go b/internal/scheduler/constraints/constraints.go index f7d19509cc8..3d6cbad5624 100644 --- a/internal/scheduler/constraints/constraints.go +++ b/internal/scheduler/constraints/constraints.go @@ -1,6 +1,7 @@ package constraints import ( + "fmt" "math" "github.com/pkg/errors" @@ -112,14 +113,18 @@ func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *schedulerc } // Global rate limiter check. - if sctx.Limiter != nil && sctx.Limiter.Tokens() <= 0 { + fmt.Println("Global tokens", sctx.Limiter.TokensAt(sctx.Started)) + if sctx.Limiter != nil && sctx.Limiter.TokensAt(sctx.Started) <= 0 { return false, UnschedulableReasonGlobalRateLimitExceeded, nil } + // fmt.Println("Global tokens again", sctx.Limiter.TokensAt(sctx.Started)) // Per-queue rate limiter check. - if qctx := sctx.QueueSchedulingContexts[queue]; qctx != nil && qctx.Limiter != nil && qctx.Limiter.Tokens() <= 0 { + fmt.Println("Per-queue tokens", sctx.QueueSchedulingContexts[queue].Limiter.TokensAt(sctx.Started)) + if qctx := sctx.QueueSchedulingContexts[queue]; qctx != nil && qctx.Limiter != nil && qctx.Limiter.TokensAt(sctx.Started) <= 0 { return false, UnschedulableReasonQueueRateLimitExceeded, nil } + // fmt.Println("Per-queue tokens again", sctx.QueueSchedulingContexts[queue].Limiter.TokensAt(sctx.Started)) return true, "", nil } diff --git a/internal/scheduler/context/context.go b/internal/scheduler/context/context.go index 929ef034926..d3c6ec7144c 100644 --- a/internal/scheduler/context/context.go +++ b/internal/scheduler/context/context.go @@ -40,6 +40,7 @@ type SchedulingContext struct { // Determines how fairness is computed. FairnessCostProvider fairness.FairnessCostProvider // Limits job scheduling rate globally across all queues. + // Use the "Started" time to ensure limiter state remains constant within each scheduling round. Limiter *rate.Limiter // Sum of queue weights across all queues. WeightSum float64 @@ -260,10 +261,14 @@ func (sctx *SchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingContex sctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) sctx.NumScheduledJobs++ - // Remove a token from the rate-limiter bucket. - // We don't check the return value here as we allow exceeding the rate limit by one gang. - // Rather, we check whether the number of tokens is positive before scheduling a new job. - sctx.Limiter.AllowN(time.Now(), 1) + // // Remove a token from the rate-limiter bucket. + // // We don't check the return value here as we allow exceeding the rate limit by one gang. + // // Rather, we check whether the number of tokens is positive before scheduling a new job. + // if sctx.Limiter != nil { + // fmt.Println("Tokens before adding job:", sctx.Limiter.TokensAt(sctx.Started)) + // v := sctx.Limiter.AllowN(sctx.Started, 1) + // fmt.Println("Tokens after adding job:", sctx.Limiter.TokensAt(sctx.Started), "v:", v) + // } } } return evictedInThisRound, nil @@ -351,6 +356,7 @@ type QueueSchedulingContext struct { // Determines the fair share of this queue relative to other queues. Weight float64 // Limits job scheduling rate for this queue. + // Use the "Started" time to ensure limiter state remains constant within each scheduling round. Limiter *rate.Limiter // Total resources assigned to the queue across all clusters by priority class priority. // Includes jobs scheduled during this invocation of the scheduler. @@ -499,10 +505,12 @@ func (qctx *QueueSchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingC qctx.SuccessfulJobSchedulingContexts[jctx.JobId] = jctx qctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) - // Remove a token from the rate-limiter bucket. - // We don't check the return value here as we allow exceeding the rate limit by one gang. - // Rather, we check whether the number of tokens is positive before scheduling a new job. - qctx.Limiter.AllowN(time.Now(), 1) + // // Remove a token from the rate-limiter bucket. + // // We don't check the return value here as we allow exceeding the rate limit by one gang. + // // Rather, we check whether the number of tokens is positive before scheduling a new job. + // if qctx.Limiter != nil { + // qctx.Limiter.AllowN(qctx.SchedulingContext.Started, 1) + // } } } else { qctx.UnsuccessfulJobSchedulingContexts[jctx.JobId] = jctx diff --git a/internal/scheduler/gang_scheduler.go b/internal/scheduler/gang_scheduler.go index 24bc6a2b835..d70e40f9229 100644 --- a/internal/scheduler/gang_scheduler.go +++ b/internal/scheduler/gang_scheduler.go @@ -55,6 +55,21 @@ func (sch *GangScheduler) Schedule(ctx context.Context, gctx *schedulercontext.G if err != nil { return } + + // Update rate-limiters to account for new successfully scheduled jobs. + if ok && !gctx.AllJobsEvicted { + fmt.Println("Tokens before adding job:", sch.schedulingContext.Limiter.TokensAt(sch.schedulingContext.Started)) + sch.schedulingContext.Limiter.ReserveN(sch.schedulingContext.Started, len(gctx.JobSchedulingContexts)) + fmt.Println("Tokens after adding job:", sch.schedulingContext.Limiter.TokensAt(sch.schedulingContext.Started)) + qctx := sch.schedulingContext.QueueSchedulingContexts[gctx.Queue] + if qctx != nil { + fmt.Println("Per-queue tokens before adding job:", qctx.Limiter.TokensAt(sch.schedulingContext.Started)) + qctx.Limiter.ReserveN(sch.schedulingContext.Started, len(gctx.JobSchedulingContexts)) + fmt.Println("Per-queue tokens after adding job:", qctx.Limiter.TokensAt(sch.schedulingContext.Started)) + } + } + + // Process unschedulable jobs. if !ok { // Register the job as unschedulable. If the job was added to the context, remove it first. if gangAddedToSchedulingContext { diff --git a/internal/scheduler/nodedb/nodedb.go b/internal/scheduler/nodedb/nodedb.go index a2351d1e75f..1f00bc3dfca 100644 --- a/internal/scheduler/nodedb/nodedb.go +++ b/internal/scheduler/nodedb/nodedb.go @@ -65,6 +65,7 @@ type Node struct { // UnsafeCopy returns a pointer to a new value of type Node; it is unsafe because it only makes // shallow copies of fields that are not mutated by methods of NodeDb. func (node *Node) UnsafeCopy() *Node { + fmt.Println("wtf", node) return &Node{ Id: node.Id, Name: node.Name, diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index b2919fd2219..63d4a2a08d6 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -5,6 +5,7 @@ import ( "fmt" "math/rand" "testing" + "time" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/sirupsen/logrus" @@ -12,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/exp/maps" "golang.org/x/exp/slices" + "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/api/resource" "github.com/armadaproject/armada/internal/armada/configuration" @@ -625,31 +627,31 @@ func TestPreemptingQueueScheduler(t *testing.T) { "B": 1, }, }, - "rescheduled jobs don't count towards maxJobsToSchedule": { - SchedulingConfig: testfixtures.WithMaxJobsToScheduleConfig(5, testfixtures.TestSchedulingConfig()), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), - Rounds: []SchedulingRound{ - { - JobsByQueue: map[string][]*jobdb.Job{ - "A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 10), - }, - ExpectedScheduledIndices: map[string][]int{ - "A": testfixtures.IntRange(0, 4), - }, - }, - { - JobsByQueue: map[string][]*jobdb.Job{ - "A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 10), - }, - ExpectedScheduledIndices: map[string][]int{ - "A": testfixtures.IntRange(0, 4), - }, - }, - }, - PriorityFactorByQueue: map[string]float64{ - "A": 1, - }, - }, + // "rescheduled jobs don't count towards maxJobsToSchedule": { + // SchedulingConfig: testfixtures.WithMaxJobsToScheduleConfig(5, testfixtures.TestSchedulingConfig()), + // Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + // Rounds: []SchedulingRound{ + // { + // JobsByQueue: map[string][]*jobdb.Job{ + // "A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 10), + // }, + // ExpectedScheduledIndices: map[string][]int{ + // "A": testfixtures.IntRange(0, 4), + // }, + // }, + // { + // JobsByQueue: map[string][]*jobdb.Job{ + // "A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 10), + // }, + // ExpectedScheduledIndices: map[string][]int{ + // "A": testfixtures.IntRange(0, 4), + // }, + // }, + // }, + // PriorityFactorByQueue: map[string]float64{ + // "A": 1, + // }, + // }, "rescheduled jobs don't count towards maxQueueLookback": { SchedulingConfig: testfixtures.WithMaxLookbackPerQueueConfig(5, testfixtures.TestSchedulingConfig()), Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), @@ -1278,6 +1280,13 @@ func TestPreemptingQueueScheduler(t *testing.T) { } for name, tc := range tests { t.Run(name, func(t *testing.T) { + // balancing three queues + // gang_preemption_with_partial_gang + fmt.Println("name", name) + if name != "gang preemption with partial gang" { + return + } + nodeDb, err := NewNodeDb() require.NoError(t, err) txn := nodeDb.Txn(true) @@ -1300,6 +1309,21 @@ func TestPreemptingQueueScheduler(t *testing.T) { var jobIdsByGangId map[string]map[string]bool var gangIdByJobId map[string]string + // Scheduling rate-limiters persist between rounds. + // We control the rate at which time passes between scheduling rounds. + schedulingInterval := time.Second + limiter := rate.NewLimiter( + rate.Limit(tc.SchedulingConfig.MaximumSchedulingRate), + tc.SchedulingConfig.MaximumSchedulingBurst, + ) + limiterByQueue := make(map[string]*rate.Limiter) + for queue := range tc.PriorityFactorByQueue { + limiterByQueue[queue] = rate.NewLimiter( + rate.Limit(tc.SchedulingConfig.MaximumPerQueueSchedulingRate), + tc.SchedulingConfig.MaximumPerQueueSchedulingBurst, + ) + } + // Run the scheduler. log := logrus.NewEntry(logrus.New()) for i, round := range tc.Rounds { @@ -1367,12 +1391,18 @@ func TestPreemptingQueueScheduler(t *testing.T) { tc.SchedulingConfig.Preemption.PriorityClasses, tc.SchedulingConfig.Preemption.DefaultPriorityClass, fairnessCostProvider, - nil, + limiter, tc.TotalResources, ) + sctx.Started = time.Time{}.Add(time.Duration(i) * schedulingInterval) for queue, priorityFactor := range tc.PriorityFactorByQueue { weight := 1 / priorityFactor - err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClass[queue], nil) + err := sctx.AddQueueSchedulingContext( + queue, + weight, + allocatedByQueueAndPriorityClass[queue], + limiterByQueue[queue], + ) require.NoError(t, err) } constraints := schedulerconstraints.SchedulingConstraintsFromSchedulingConfig( diff --git a/internal/scheduler/queue_scheduler_test.go b/internal/scheduler/queue_scheduler_test.go index 2a2afcc552a..678c600aab3 100644 --- a/internal/scheduler/queue_scheduler_test.go +++ b/internal/scheduler/queue_scheduler_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/exp/maps" "golang.org/x/exp/slices" + "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/api/resource" "github.com/armadaproject/armada/internal/armada/configuration" @@ -87,8 +88,8 @@ func TestQueueScheduler(t *testing.T) { PriorityFactorByQueue: map[string]float64{"A": 1}, ExpectedScheduledIndices: []int{0, 11}, }, - "MaximumJobsToSchedule": { - SchedulingConfig: testfixtures.WithMaxJobsToScheduleConfig(2, testfixtures.TestSchedulingConfig()), + "MaximumSchedulingBurst": { + SchedulingConfig: testfixtures.WithGlobalSchedulingRateLimiterConfig(10, 2, testfixtures.TestSchedulingConfig()), Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), @@ -99,32 +100,33 @@ func TestQueueScheduler(t *testing.T) { ExpectedScheduledIndices: []int{0, 11}, ExpectedNeverAttemptedIndices: []int{12, 13}, }, - "MaximumGangsToSchedule": { - SchedulingConfig: testfixtures.WithMaxGangsToScheduleConfig(2, testfixtures.TestSchedulingConfig()), + "MaximumSchedulingBurst can be exceeded by one gang": { + SchedulingConfig: testfixtures.WithGlobalSchedulingRateLimiterConfig(10, 2, testfixtures.TestSchedulingConfig()), Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate( + testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), + testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 1), testfixtures.WithGangAnnotationsJobs( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 2), ), - testfixtures.WithGangAnnotationsJobs( - testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 2), - ), - testfixtures.WithGangAnnotationsJobs( - testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 2), - ), - testfixtures.WithGangAnnotationsJobs( - testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 2), - ), - testfixtures.WithGangAnnotationsJobs( - testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 2), - ), - testfixtures.WithGangAnnotationsJobs( - testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 2), - ), + testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), ), PriorityFactorByQueue: map[string]float64{"A": 1}, - ExpectedScheduledIndices: []int{0, 1, 6, 7}, - ExpectedNeverAttemptedIndices: []int{8, 9, 10, 11}, + ExpectedScheduledIndices: []int{0, 2, 3}, + ExpectedNeverAttemptedIndices: []int{4}, + }, + "MaximumPerQueueSchedulingBurst": { + SchedulingConfig: testfixtures.WithPerQueueSchedulingLimiterConfig(10, 2, testfixtures.TestSchedulingConfig()), + Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Jobs: armadaslices.Concatenate( + testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), + testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 10), + testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 3), + testfixtures.N1Cpu4GiJobs("B", testfixtures.PriorityClass0, 1), + ), + PriorityFactorByQueue: map[string]float64{"A": 1, "B": 1}, + ExpectedScheduledIndices: []int{0, 11, 14}, + ExpectedNeverAttemptedIndices: []int{12, 13}, }, "MaximumResourceFractionToSchedule": { SchedulingConfig: testfixtures.WithRoundLimitsConfig( @@ -473,12 +475,22 @@ func TestQueueScheduler(t *testing.T) { tc.SchedulingConfig.Preemption.PriorityClasses, tc.SchedulingConfig.Preemption.DefaultPriorityClass, fairnessCostProvider, - nil, + rate.NewLimiter( + rate.Limit(tc.SchedulingConfig.MaximumSchedulingRate), + tc.SchedulingConfig.MaximumSchedulingBurst, + ), tc.TotalResources, ) for queue, priorityFactor := range tc.PriorityFactorByQueue { weight := 1 / priorityFactor - err := sctx.AddQueueSchedulingContext(queue, weight, tc.InitialAllocatedByQueueAndPriorityClass[queue], nil) + err := sctx.AddQueueSchedulingContext( + queue, weight, + tc.InitialAllocatedByQueueAndPriorityClass[queue], + rate.NewLimiter( + rate.Limit(tc.SchedulingConfig.MaximumPerQueueSchedulingRate), + tc.SchedulingConfig.MaximumPerQueueSchedulingBurst, + ), + ) require.NoError(t, err) } constraints := schedulerconstraints.SchedulingConstraintsFromSchedulingConfig( diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index d183b9e14c5..0ba817488ee 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -403,6 +403,7 @@ func (l *FairSchedulingAlgo) scheduleOnExecutors( rate.Limit(l.schedulingConfig.MaximumPerQueueSchedulingRate), l.schedulingConfig.MaximumPerQueueSchedulingBurst, ) + l.limiterByQueue[queue] = queueLimiter } if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByPriorityClass, queueLimiter); err != nil { return nil, nil, err diff --git a/internal/scheduler/testfixtures/testfixtures.go b/internal/scheduler/testfixtures/testfixtures.go index a00348651e1..b4b63614944 100644 --- a/internal/scheduler/testfixtures/testfixtures.go +++ b/internal/scheduler/testfixtures/testfixtures.go @@ -95,12 +95,16 @@ func TestSchedulingConfig() configuration.SchedulingConfig { NodeEvictionProbability: 1.0, NodeOversubscriptionEvictionProbability: 1.0, }, - IndexedResources: TestResources, - IndexedNodeLabels: TestIndexedNodeLabels, + MaximumSchedulingRate: math.MaxFloat64, + MaximumSchedulingBurst: math.MaxInt, + MaximumPerQueueSchedulingRate: math.MaxFloat64, + MaximumPerQueueSchedulingBurst: math.MaxInt, + IndexedResources: TestResources, + IndexedNodeLabels: TestIndexedNodeLabels, DominantResourceFairnessResourcesToConsider: TestResourceNames, - ExecutorTimeout: 15 * time.Minute, - MaxUnacknowledgedJobsPerExecutor: math.MaxInt, - EnableNewPreemptionStrategy: true, + ExecutorTimeout: 15 * time.Minute, + MaxUnacknowledgedJobsPerExecutor: math.MaxInt, + EnableNewPreemptionStrategy: true, } } @@ -166,8 +170,19 @@ func WithIndexedResourcesConfig(indexResources []configuration.IndexedResource, return config } +func WithGlobalSchedulingRateLimiterConfig(maximumSchedulingRate float64, maximumSchedulingBurst int, config configuration.SchedulingConfig) configuration.SchedulingConfig { + config.MaximumSchedulingRate = maximumSchedulingRate + config.MaximumSchedulingBurst = maximumSchedulingBurst + return config +} + +func WithPerQueueSchedulingLimiterConfig(maximumPerQueueSchedulingRate float64, maximumPerQueueSchedulingBurst int, config configuration.SchedulingConfig) configuration.SchedulingConfig { + config.MaximumPerQueueSchedulingRate = maximumPerQueueSchedulingRate + config.MaximumPerQueueSchedulingBurst = maximumPerQueueSchedulingBurst + return config +} + func WithMaxLookbackPerQueueConfig(n uint, config configuration.SchedulingConfig) configuration.SchedulingConfig { - // For legacy reasons, it's called QueueLeaseBatchSize in config. config.MaxQueueLookback = n return config } From 68a1aee47c6bfb71def1d5d139ca9c03c81490dc Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Mon, 4 Sep 2023 20:24:44 +0100 Subject: [PATCH 05/14] Add tests --- internal/scheduler/constraints/constraints.go | 123 +++++++++++------- .../scheduler/constraints/constraints_test.go | 2 +- internal/scheduler/gang_scheduler.go | 33 ++--- internal/scheduler/nodedb/nodedb.go | 1 - internal/scheduler/pool_assigner.go | 3 +- .../preempting_queue_scheduler_test.go | 99 +++++++++----- internal/scheduler/queue_scheduler.go | 1 + internal/scheduler/queue_scheduler_test.go | 41 +++--- .../scheduler/testfixtures/testfixtures.go | 4 +- 9 files changed, 189 insertions(+), 118 deletions(-) diff --git a/internal/scheduler/constraints/constraints.go b/internal/scheduler/constraints/constraints.go index 3d6cbad5624..650ea625dfa 100644 --- a/internal/scheduler/constraints/constraints.go +++ b/internal/scheduler/constraints/constraints.go @@ -13,27 +13,33 @@ import ( ) const ( - UnschedulableReasonMaximumResourcesScheduled = "maximum resources scheduled" - UnschedulableReasonMaximumNumberOfJobsScheduled = "maximum number of jobs scheduled" - UnschedulableReasonMaximumNumberOfGangsScheduled = "maximum number of gangs scheduled" - UnschedulableReasonMaximumResourcesPerQueueExceeded = "maximum total resources for this queue exceeded" - UnschedulableReasonGlobalRateLimitExceeded = "global scheduling rate limit exceeded" - UnschedulableReasonQueueRateLimitExceeded = "queue scheduling rate limit exceeded" + // Indicates that the limit on resources scheduled per round has been exceeded. + MaximumResourcesScheduledUnschedulableReason = "maximum resources scheduled" + + // Indicates that the queue has been assigned more than its allowed amount of resources. + MaximumResourcesPerQueueExceededUnschedulableReason = "maximum total resources for this queue exceeded" + + // Indicates that the scheduling rate limit has been exceeded. + GlobalRateLimitExceededUnschedulableReason = "global scheduling rate limit exceeded" + QueueRateLimitExceededUnschedulableReason = "queue scheduling rate limit exceeded" + + // Indicates that scheduling a gang would exceed the rate limit. + GlobalRateLimitExceededByGangUnschedulableReason = "gang would exceed global scheduling rate limit" + QueueRateLimitExceededByGangUnschedulableReason = "gang would exceed queue scheduling rate limit" + + // Indicates that the number of jobs in a gang exceeds the burst size. + // This means the gang can not be scheduled without manually increasing the burst size. + GangExceedsGlobalBurstSizeUnschedulableReason = "gang cardinality too large: exceeds global max burst size" + GangExceedsQueueBurstSizeUnschedulableReason = "gang cardinality too large: exceeds queue max burst size" ) // IsTerminalUnschedulableReason returns true if reason indicates // it's not possible to schedule any more jobs in this round. func IsTerminalUnschedulableReason(reason string) bool { - if reason == UnschedulableReasonMaximumResourcesScheduled { - return true - } - if reason == UnschedulableReasonMaximumNumberOfJobsScheduled { + if reason == MaximumResourcesScheduledUnschedulableReason { return true } - if reason == UnschedulableReasonMaximumNumberOfGangsScheduled { - return true - } - if reason == UnschedulableReasonGlobalRateLimitExceeded { + if reason == GlobalRateLimitExceededUnschedulableReason { return true } return false @@ -42,7 +48,7 @@ func IsTerminalUnschedulableReason(reason string) bool { // IsTerminalQueueUnschedulableReason returns true if reason indicates // it's not possible to schedule any more jobs from this queue in this round. func IsTerminalQueueUnschedulableReason(reason string) bool { - return reason == UnschedulableReasonQueueRateLimitExceeded + return reason == QueueRateLimitExceededUnschedulableReason } // SchedulingConstraints contains scheduling constraints, e.g., per-queue resource limits. @@ -106,52 +112,79 @@ func absoluteFromRelativeLimits(totalResources schedulerobjects.ResourceList, re return absoluteLimits } +// ScaleQuantity scales q in-place by a factor f. +// This functions overflows for quantities the milli value of which can't be expressed as an int64. +// E.g., 1Pi is ok, but not 10Pi. +func ScaleQuantity(q resource.Quantity, f float64) resource.Quantity { + q.SetMilli(int64(math.Round(float64(q.MilliValue()) * f))) + return q +} + func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *schedulercontext.SchedulingContext, queue string) (bool, string, error) { // MaximumResourcesToSchedule check. if !sctx.ScheduledResources.IsStrictlyLessOrEqual(constraints.MaximumResourcesToSchedule) { - return false, UnschedulableReasonMaximumResourcesScheduled, nil + return false, MaximumResourcesScheduledUnschedulableReason, nil } - - // Global rate limiter check. - fmt.Println("Global tokens", sctx.Limiter.TokensAt(sctx.Started)) - if sctx.Limiter != nil && sctx.Limiter.TokensAt(sctx.Started) <= 0 { - return false, UnschedulableReasonGlobalRateLimitExceeded, nil - } - // fmt.Println("Global tokens again", sctx.Limiter.TokensAt(sctx.Started)) - - // Per-queue rate limiter check. - fmt.Println("Per-queue tokens", sctx.QueueSchedulingContexts[queue].Limiter.TokensAt(sctx.Started)) - if qctx := sctx.QueueSchedulingContexts[queue]; qctx != nil && qctx.Limiter != nil && qctx.Limiter.TokensAt(sctx.Started) <= 0 { - return false, UnschedulableReasonQueueRateLimitExceeded, nil - } - // fmt.Println("Per-queue tokens again", sctx.QueueSchedulingContexts[queue].Limiter.TokensAt(sctx.Started)) - return true, "", nil } -func (constraints *SchedulingConstraints) CheckPerQueueAndPriorityClassConstraints( +func (constraints *SchedulingConstraints) CheckConstraints( sctx *schedulercontext.SchedulingContext, - queue string, - priorityClassName string, + gctx *schedulercontext.GangSchedulingContext, ) (bool, string, error) { - qctx := sctx.QueueSchedulingContexts[queue] + qctx := sctx.QueueSchedulingContexts[gctx.Queue] if qctx == nil { - return false, "", errors.Errorf("no QueueSchedulingContext for queue %s", queue) + return false, "", errors.Errorf("no QueueSchedulingContext for queue %s", gctx.Queue) + } + + // Check that the job is large enough for this executor. + if ok, unschedulableReason := requestsAreLargeEnough(gctx.TotalResourceRequests, constraints.MinimumJobSize); !ok { + return false, unschedulableReason, nil + } + + // Global rate limiter check. + if sctx.Limiter != nil { + tokens := sctx.Limiter.TokensAt(sctx.Started) + if tokens <= 0 { + return false, GlobalRateLimitExceededUnschedulableReason, nil + } + if sctx.Limiter.Burst() < len(gctx.JobSchedulingContexts) { + return false, GangExceedsGlobalBurstSizeUnschedulableReason, nil + } + if tokens < float64(len(gctx.JobSchedulingContexts)) { + return false, GlobalRateLimitExceededByGangUnschedulableReason, nil + } + } + + // Per-queue rate limiter check. + if qctx.Limiter != nil { + tokens := qctx.Limiter.TokensAt(sctx.Started) + if tokens <= 0 { + return false, QueueRateLimitExceededUnschedulableReason, nil + } + if qctx.Limiter.Burst() < len(gctx.JobSchedulingContexts) { + return false, GangExceedsQueueBurstSizeUnschedulableReason, nil + } + if tokens < float64(len(gctx.JobSchedulingContexts)) { + return false, QueueRateLimitExceededByGangUnschedulableReason, nil + } } // PriorityClassSchedulingConstraintsByPriorityClassName check. - if priorityClassConstraint, ok := constraints.PriorityClassSchedulingConstraintsByPriorityClassName[priorityClassName]; ok { - if !qctx.AllocatedByPriorityClass[priorityClassName].IsStrictlyLessOrEqual(priorityClassConstraint.MaximumResourcesPerQueue) { - return false, UnschedulableReasonMaximumResourcesPerQueueExceeded, nil + if priorityClassConstraint, ok := constraints.PriorityClassSchedulingConstraintsByPriorityClassName[gctx.PriorityClassName]; ok { + if !qctx.AllocatedByPriorityClass[gctx.PriorityClassName].IsStrictlyLessOrEqual(priorityClassConstraint.MaximumResourcesPerQueue) { + return false, MaximumResourcesPerQueueExceededUnschedulableReason, nil } } return true, "", nil } -// ScaleQuantity scales q in-place by a factor f. -// This functions overflows for quantities the milli value of which can't be expressed as an int64. -// E.g., 1Pi is ok, but not 10Pi. -func ScaleQuantity(q resource.Quantity, f float64) resource.Quantity { - q.SetMilli(int64(math.Round(float64(q.MilliValue()) * f))) - return q +func requestsAreLargeEnough(totalResourceRequests, minRequest schedulerobjects.ResourceList) (bool, string) { + for t, minQuantity := range minRequest.Resources { + q := totalResourceRequests.Get(t) + if minQuantity.Cmp(q) == 1 { + return false, fmt.Sprintf("job requests %s %s, but the minimum is %s", q.String(), t, minQuantity.String()) + } + } + return true, "" } diff --git a/internal/scheduler/constraints/constraints_test.go b/internal/scheduler/constraints/constraints_test.go index f0bb38ca6b0..081058191dc 100644 --- a/internal/scheduler/constraints/constraints_test.go +++ b/internal/scheduler/constraints/constraints_test.go @@ -26,7 +26,7 @@ func TestConstraints(t *testing.T) { require.Equal(t, tc.globalUnschedulableReason == "", ok) require.Equal(t, tc.globalUnschedulableReason, unschedulableReason) - ok, unschedulableReason, err = tc.constraints.CheckPerQueueAndPriorityClassConstraints(tc.sctx, tc.queue, tc.priorityClassName) + ok, unschedulableReason, err = tc.constraints.CheckConstraints(tc.sctx, nil) require.NoError(t, err) require.Equal(t, tc.perQueueAndPriorityClassUnschedulableReason == "", ok) require.Equal(t, tc.perQueueAndPriorityClassUnschedulableReason, unschedulableReason) diff --git a/internal/scheduler/gang_scheduler.go b/internal/scheduler/gang_scheduler.go index d70e40f9229..33365550a11 100644 --- a/internal/scheduler/gang_scheduler.go +++ b/internal/scheduler/gang_scheduler.go @@ -40,7 +40,10 @@ func (sch *GangScheduler) SkipUnsuccessfulSchedulingKeyCheck() { } func (sch *GangScheduler) Schedule(ctx context.Context, gctx *schedulercontext.GangSchedulingContext) (ok bool, unschedulableReason string, err error) { - // Exit immediately if this is a new gang and we've hit any round limits. + // Exit immediately if this is a new gang and we've exceeded any round limits. + // + // Because this check occurs before adding the gctx to the sctx, + // the round limits can be exceeded by one gang. if !gctx.AllJobsEvicted { if ok, unschedulableReason, err = sch.constraints.CheckRoundConstraints(sch.schedulingContext, gctx.Queue); err != nil || !ok { return @@ -58,14 +61,12 @@ func (sch *GangScheduler) Schedule(ctx context.Context, gctx *schedulercontext.G // Update rate-limiters to account for new successfully scheduled jobs. if ok && !gctx.AllJobsEvicted { - fmt.Println("Tokens before adding job:", sch.schedulingContext.Limiter.TokensAt(sch.schedulingContext.Started)) - sch.schedulingContext.Limiter.ReserveN(sch.schedulingContext.Started, len(gctx.JobSchedulingContexts)) - fmt.Println("Tokens after adding job:", sch.schedulingContext.Limiter.TokensAt(sch.schedulingContext.Started)) + if sch.schedulingContext.Limiter != nil { + sch.schedulingContext.Limiter.ReserveN(sch.schedulingContext.Started, len(gctx.JobSchedulingContexts)) + } qctx := sch.schedulingContext.QueueSchedulingContexts[gctx.Queue] - if qctx != nil { - fmt.Println("Per-queue tokens before adding job:", qctx.Limiter.TokensAt(sch.schedulingContext.Started)) + if qctx != nil && qctx.Limiter != nil { qctx.Limiter.ReserveN(sch.schedulingContext.Started, len(gctx.JobSchedulingContexts)) - fmt.Println("Per-queue tokens after adding job:", qctx.Limiter.TokensAt(sch.schedulingContext.Started)) } } @@ -99,24 +100,13 @@ func (sch *GangScheduler) Schedule(ctx context.Context, gctx *schedulercontext.G } } }() - - // Try scheduling the gang. if _, err = sch.schedulingContext.AddGangSchedulingContext(gctx); err != nil { return } gangAddedToSchedulingContext = true if !gctx.AllJobsEvicted { - // Check that the job is large enough for this executor. - // This check needs to be here, since it relates to a specific job. - // Only perform limit checks for new jobs to avoid preempting jobs if, e.g., MinimumJobSize changes. - if ok, unschedulableReason = requestsAreLargeEnough(gctx.TotalResourceRequests, sch.constraints.MinimumJobSize); !ok { - return - } - if ok, unschedulableReason, err = sch.constraints.CheckPerQueueAndPriorityClassConstraints( - sch.schedulingContext, - gctx.Queue, - gctx.PriorityClassName, - ); err != nil || !ok { + // Only perform these checks for new jobs to avoid preempting jobs if, e.g., MinimumJobSize changes. + if ok, unschedulableReason, err = sch.constraints.CheckConstraints(sch.schedulingContext, gctx); err != nil || !ok { return } } @@ -241,9 +231,6 @@ func meanScheduledAtPriorityFromGctx(gctx *schedulercontext.GangSchedulingContex } func requestsAreLargeEnough(totalResourceRequests, minRequest schedulerobjects.ResourceList) (bool, string) { - if len(minRequest.Resources) == 0 { - return true, "" - } for t, minQuantity := range minRequest.Resources { q := totalResourceRequests.Get(t) if minQuantity.Cmp(q) == 1 { diff --git a/internal/scheduler/nodedb/nodedb.go b/internal/scheduler/nodedb/nodedb.go index 1f00bc3dfca..a2351d1e75f 100644 --- a/internal/scheduler/nodedb/nodedb.go +++ b/internal/scheduler/nodedb/nodedb.go @@ -65,7 +65,6 @@ type Node struct { // UnsafeCopy returns a pointer to a new value of type Node; it is unsafe because it only makes // shallow copies of fields that are not mutated by methods of NodeDb. func (node *Node) UnsafeCopy() *Node { - fmt.Println("wtf", node) return &Node{ Id: node.Id, Name: node.Name, diff --git a/internal/scheduler/pool_assigner.go b/internal/scheduler/pool_assigner.go index 66e2ff999f0..eeeeb28672e 100644 --- a/internal/scheduler/pool_assigner.go +++ b/internal/scheduler/pool_assigner.go @@ -123,7 +123,8 @@ func (p *DefaultPoolAssigner) AssignPool(j *jobdb.Job) (string, error) { req := j.PodRequirements() req = p.clearAnnotations(req) - // Otherwise iterate through each pool and detect the first one the job is potentially schedulable on + // Otherwise iterate through each pool and detect the first one the job is potentially schedulable on. + // TODO: We should use the real scheduler instead since this check may go out of sync with the scheduler. for pool, executors := range p.executorsByPool { for _, e := range executors { requests := req.GetResourceRequirements().Requests diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index 63d4a2a08d6..e2d19068404 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -627,31 +627,67 @@ func TestPreemptingQueueScheduler(t *testing.T) { "B": 1, }, }, - // "rescheduled jobs don't count towards maxJobsToSchedule": { - // SchedulingConfig: testfixtures.WithMaxJobsToScheduleConfig(5, testfixtures.TestSchedulingConfig()), - // Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), - // Rounds: []SchedulingRound{ - // { - // JobsByQueue: map[string][]*jobdb.Job{ - // "A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 10), - // }, - // ExpectedScheduledIndices: map[string][]int{ - // "A": testfixtures.IntRange(0, 4), - // }, - // }, - // { - // JobsByQueue: map[string][]*jobdb.Job{ - // "A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 10), - // }, - // ExpectedScheduledIndices: map[string][]int{ - // "A": testfixtures.IntRange(0, 4), - // }, - // }, - // }, - // PriorityFactorByQueue: map[string]float64{ - // "A": 1, - // }, - // }, + "rescheduled jobs don't count towards global scheduling rate limit": { + SchedulingConfig: testfixtures.WithGlobalSchedulingRateLimiterConfig(2, 5, testfixtures.TestSchedulingConfig()), + Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Rounds: []SchedulingRound{ + { + JobsByQueue: map[string][]*jobdb.Job{ + "A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 10), + }, + ExpectedScheduledIndices: map[string][]int{ + "A": testfixtures.IntRange(0, 4), + }, + }, + { + JobsByQueue: map[string][]*jobdb.Job{ + "A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 10), + }, + ExpectedScheduledIndices: map[string][]int{ + "A": testfixtures.IntRange(0, 1), + }, + }, + }, + PriorityFactorByQueue: map[string]float64{ + "A": 1, + }, + }, + "MaximumSchedulingRate": { + SchedulingConfig: testfixtures.WithGlobalSchedulingRateLimiterConfig(2, 4, testfixtures.TestSchedulingConfig()), + Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Rounds: []SchedulingRound{ + { + JobsByQueue: map[string][]*jobdb.Job{ + "A": testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 6)), + }, + }, + { + JobsByQueue: map[string][]*jobdb.Job{ + "A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 10), + }, + ExpectedScheduledIndices: map[string][]int{ + "A": testfixtures.IntRange(0, 3), + }, + }, + { + JobsByQueue: map[string][]*jobdb.Job{ + "A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 10), + }, + ExpectedScheduledIndices: map[string][]int{ + "A": testfixtures.IntRange(0, 1), + }, + }, + { + JobsByQueue: map[string][]*jobdb.Job{ + "A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 10), + }, + ExpectedScheduledIndices: map[string][]int{ + "A": testfixtures.IntRange(0, 1), + }, + }, + }, + PriorityFactorByQueue: map[string]float64{"A": 1}, + }, "rescheduled jobs don't count towards maxQueueLookback": { SchedulingConfig: testfixtures.WithMaxLookbackPerQueueConfig(5, testfixtures.TestSchedulingConfig()), Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), @@ -1280,10 +1316,10 @@ func TestPreemptingQueueScheduler(t *testing.T) { } for name, tc := range tests { t.Run(name, func(t *testing.T) { - // balancing three queues - // gang_preemption_with_partial_gang - fmt.Println("name", name) - if name != "gang preemption with partial gang" { + // // balancing three queues + // // gang_preemption_with_partial_gang + // "rescheduled jobs don't count towards global scheduling rate limit" + if name != "exceeding rate limiter burst capacity increase recovery time" { return } @@ -1311,6 +1347,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { // Scheduling rate-limiters persist between rounds. // We control the rate at which time passes between scheduling rounds. + schedulingStarted := time.Now() schedulingInterval := time.Second limiter := rate.NewLimiter( rate.Limit(tc.SchedulingConfig.MaximumSchedulingRate), @@ -1394,7 +1431,8 @@ func TestPreemptingQueueScheduler(t *testing.T) { limiter, tc.TotalResources, ) - sctx.Started = time.Time{}.Add(time.Duration(i) * schedulingInterval) + sctx.Started = schedulingStarted.Add(time.Duration(i) * schedulingInterval) + for queue, priorityFactor := range tc.PriorityFactorByQueue { weight := 1 / priorityFactor err := sctx.AddQueueSchedulingContext( @@ -1404,6 +1442,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { limiterByQueue[queue], ) require.NoError(t, err) + } constraints := schedulerconstraints.SchedulingConstraintsFromSchedulingConfig( "pool", diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index 3a4a0fe3a33..bd1242a3735 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -201,6 +201,7 @@ func (it *QueuedGangIterator) Peek() (*schedulercontext.GangSchedulingContext, e if len(it.schedulingContext.UnfeasibleSchedulingKeys) > 0 { schedulingKey := it.schedulingContext.SchedulingKeyFromLegacySchedulerJob(job) if unsuccessfulJctx, ok := it.schedulingContext.UnfeasibleSchedulingKeys[schedulingKey]; ok { + // TODO: For performance, we should avoid creating new objects and instead reference the existing one. jctx := &schedulercontext.JobSchedulingContext{ Created: time.Now(), JobId: job.GetId(), diff --git a/internal/scheduler/queue_scheduler_test.go b/internal/scheduler/queue_scheduler_test.go index 678c600aab3..aeb8056b03c 100644 --- a/internal/scheduler/queue_scheduler_test.go +++ b/internal/scheduler/queue_scheduler_test.go @@ -91,16 +91,27 @@ func TestQueueScheduler(t *testing.T) { "MaximumSchedulingBurst": { SchedulingConfig: testfixtures.WithGlobalSchedulingRateLimiterConfig(10, 2, testfixtures.TestSchedulingConfig()), Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Jobs: armadaslices.Concatenate( + testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), + testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 10), + testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 4), + ), + PriorityFactorByQueue: map[string]float64{"A": 1}, + ExpectedScheduledIndices: []int{0, 11}, + }, + "MaximumPerQueueSchedulingBurst": { + SchedulingConfig: testfixtures.WithPerQueueSchedulingLimiterConfig(10, 2, testfixtures.TestSchedulingConfig()), + Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 10), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 3), + testfixtures.N1Cpu4GiJobs("B", testfixtures.PriorityClass0, 1), ), - PriorityFactorByQueue: map[string]float64{"A": 1}, - ExpectedScheduledIndices: []int{0, 11}, - ExpectedNeverAttemptedIndices: []int{12, 13}, + PriorityFactorByQueue: map[string]float64{"A": 1, "B": 1}, + ExpectedScheduledIndices: []int{0, 11, 14}, }, - "MaximumSchedulingBurst can be exceeded by one gang": { + "MaximumSchedulingBurst is not exceeded by gangs": { SchedulingConfig: testfixtures.WithGlobalSchedulingRateLimiterConfig(10, 2, testfixtures.TestSchedulingConfig()), Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate( @@ -109,24 +120,24 @@ func TestQueueScheduler(t *testing.T) { testfixtures.WithGangAnnotationsJobs( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 2), ), - testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), + testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 2), ), - PriorityFactorByQueue: map[string]float64{"A": 1}, - ExpectedScheduledIndices: []int{0, 2, 3}, - ExpectedNeverAttemptedIndices: []int{4}, + PriorityFactorByQueue: map[string]float64{"A": 1}, + ExpectedScheduledIndices: []int{0, 4}, }, - "MaximumPerQueueSchedulingBurst": { + "MaximumPerQueueSchedulingBurst is not exceeded by gangs": { SchedulingConfig: testfixtures.WithPerQueueSchedulingLimiterConfig(10, 2, testfixtures.TestSchedulingConfig()), Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), - testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 10), - testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 3), - testfixtures.N1Cpu4GiJobs("B", testfixtures.PriorityClass0, 1), + testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 1), + testfixtures.WithGangAnnotationsJobs( + testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 2), + ), + testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 2), ), - PriorityFactorByQueue: map[string]float64{"A": 1, "B": 1}, - ExpectedScheduledIndices: []int{0, 11, 14}, - ExpectedNeverAttemptedIndices: []int{12, 13}, + PriorityFactorByQueue: map[string]float64{"A": 1}, + ExpectedScheduledIndices: []int{0, 4}, }, "MaximumResourceFractionToSchedule": { SchedulingConfig: testfixtures.WithRoundLimitsConfig( diff --git a/internal/scheduler/testfixtures/testfixtures.go b/internal/scheduler/testfixtures/testfixtures.go index b4b63614944..0acda7d60a6 100644 --- a/internal/scheduler/testfixtures/testfixtures.go +++ b/internal/scheduler/testfixtures/testfixtures.go @@ -95,9 +95,9 @@ func TestSchedulingConfig() configuration.SchedulingConfig { NodeEvictionProbability: 1.0, NodeOversubscriptionEvictionProbability: 1.0, }, - MaximumSchedulingRate: math.MaxFloat64, + MaximumSchedulingRate: math.Inf(1), MaximumSchedulingBurst: math.MaxInt, - MaximumPerQueueSchedulingRate: math.MaxFloat64, + MaximumPerQueueSchedulingRate: math.Inf(1), MaximumPerQueueSchedulingBurst: math.MaxInt, IndexedResources: TestResources, IndexedNodeLabels: TestIndexedNodeLabels, From 8ec0e5007779532461f2b16f3a985c5d81709fb1 Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Mon, 4 Sep 2023 21:09:19 +0100 Subject: [PATCH 06/14] Update default config --- config/armada/config.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config/armada/config.yaml b/config/armada/config.yaml index 6a752a2ce04..bf00c5d46a3 100644 --- a/config/armada/config.yaml +++ b/config/armada/config.yaml @@ -68,9 +68,9 @@ scheduling: memory: 1.0 cpu: 1.0 maximumSchedulingRate: 100.0 - maximumSchedulingBurst: 200 + maximumSchedulingBurst: 1000 maximumPerQueueSchedulingRate: 50.0 - maximumPerQueueSchedulingBurst: 100 + maximumPerQueueSchedulingBurst: 1000 maxJobSchedulingContextsPerExecutor: 10000 lease: expireAfter: 15m From c3a5e3820c309b44910be57da7085191d082aa5c Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Tue, 5 Sep 2023 11:54:01 +0100 Subject: [PATCH 07/14] Update default scheduler config --- config/scheduler/config.yaml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/config/scheduler/config.yaml b/config/scheduler/config.yaml index 531f4c6a78d..68179f99067 100644 --- a/config/scheduler/config.yaml +++ b/config/scheduler/config.yaml @@ -96,7 +96,11 @@ scheduling: maximumResourceFractionToSchedule: memory: 1.0 cpu: 1.0 - maximumJobsToSchedule: 5000 + maximumSchedulingRate: 100.0 + maximumSchedulingBurst: 1000 + maximumPerQueueSchedulingRate: 50.0 + maximumPerQueueSchedulingBurst: 1000 + maxJobSchedulingContextsPerExecutor: 10000 maxUnacknowledgedJobsPerExecutor: 2500 maxJobSchedulingContextsPerExecutor: 10000 defaultJobLimits: From 8eb724eff40d0352d9b0c99c90073626b6c0e348 Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Tue, 5 Sep 2023 12:01:11 +0100 Subject: [PATCH 08/14] Whitespace --- config/armada/config.yaml | 8 ++++---- config/scheduler/config.yaml | 9 ++++----- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/config/armada/config.yaml b/config/armada/config.yaml index bf00c5d46a3..32875689747 100644 --- a/config/armada/config.yaml +++ b/config/armada/config.yaml @@ -67,10 +67,10 @@ scheduling: maximumResourceFractionToSchedule: memory: 1.0 cpu: 1.0 - maximumSchedulingRate: 100.0 - maximumSchedulingBurst: 1000 - maximumPerQueueSchedulingRate: 50.0 - maximumPerQueueSchedulingBurst: 1000 + maximumSchedulingRate: 100.0 + maximumSchedulingBurst: 1000 + maximumPerQueueSchedulingRate: 50.0 + maximumPerQueueSchedulingBurst: 1000 maxJobSchedulingContextsPerExecutor: 10000 lease: expireAfter: 15m diff --git a/config/scheduler/config.yaml b/config/scheduler/config.yaml index 68179f99067..2aac016a82b 100644 --- a/config/scheduler/config.yaml +++ b/config/scheduler/config.yaml @@ -96,11 +96,10 @@ scheduling: maximumResourceFractionToSchedule: memory: 1.0 cpu: 1.0 - maximumSchedulingRate: 100.0 - maximumSchedulingBurst: 1000 - maximumPerQueueSchedulingRate: 50.0 - maximumPerQueueSchedulingBurst: 1000 - maxJobSchedulingContextsPerExecutor: 10000 + maximumSchedulingRate: 100.0 + maximumSchedulingBurst: 1000 + maximumPerQueueSchedulingRate: 50.0 + maximumPerQueueSchedulingBurst: 1000 maxUnacknowledgedJobsPerExecutor: 2500 maxJobSchedulingContextsPerExecutor: 10000 defaultJobLimits: From c00356f2ee14265567f42b0458f7f53f8ed40f65 Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Tue, 5 Sep 2023 12:21:49 +0100 Subject: [PATCH 09/14] Cleanup --- internal/armada/configuration/types.go | 23 ++++++++++++++----- internal/armada/server/lease.go | 2 +- internal/scheduler/constraints/constraints.go | 8 +++---- internal/scheduler/context/context.go | 16 ------------- internal/scheduler/gang_scheduler.go | 11 --------- internal/scheduler/pool_assigner.go | 3 ++- .../preempting_queue_scheduler_test.go | 7 ------ 7 files changed, 24 insertions(+), 46 deletions(-) diff --git a/internal/armada/configuration/types.go b/internal/armada/configuration/types.go index 9c53ca2bbdb..708b15aaa6a 100644 --- a/internal/armada/configuration/types.go +++ b/internal/armada/configuration/types.go @@ -126,12 +126,23 @@ type SchedulingConfig struct { MaximumResourceFractionToScheduleByPool map[string]map[string]float64 // Token bucket global job scheduling rate limiter settings; see // https://pkg.go.dev/golang.org/x/time/rate#Limiter - MaximumSchedulingRate float64 `validate:"gt=0"` - MaximumSchedulingBurst int `validate:"gt=0"` - // Token bucket per-queue job scheduling rate limiter settings; see - // https://pkg.go.dev/golang.org/x/time/rate#Limiter - MaximumPerQueueSchedulingRate float64 `validate:"gt=0"` - MaximumPerQueueSchedulingBurst int `validate:"gt=0"` + // + // Rate-limiting is based on the number of tokens available at the start of each scheduling round, + // i.e., tokens accumulated while scheduling are only available at the start of the next scheduling round. + // + // MaximumSchedulingRate controls the number of jobs scheduled per second in steady-state, + // i.e., once the burst capacity has been exhausted. + MaximumSchedulingRate float64 `validate:"gt=0"` + // MaximumSchedulingBurst controls the burst capacity of the rate-limiter. + // + // There are two important implications: + // - Armada will never schedule more than MaximumSchedulingBurst jobs per scheduling round. + // - Gang jobs with cardinality greater than MaximumSchedulingBurst can never be scheduled. + MaximumSchedulingBurst int `validate:"gt=0"` + // Per-queue version of MaximumSchedulingRate. + MaximumPerQueueSchedulingRate float64 `validate:"gt=0"` + // Per-queue version of MaximumSchedulingBurst. + MaximumPerQueueSchedulingBurst int `validate:"gt=0"` // Armada stores contexts associated with recent job scheduling attempts. // This setting limits the number of such contexts to store. // Contexts associated with the most recent scheduling attempt for each queue and cluster are always stored. diff --git a/internal/armada/server/lease.go b/internal/armada/server/lease.go index 4d0a009594e..b5e77678615 100644 --- a/internal/armada/server/lease.go +++ b/internal/armada/server/lease.go @@ -97,7 +97,6 @@ func NewAggregatedQueueServer( TimeBetweenEvictionRuns: 0, NumTestsPerEvictionRun: 10, } - decompressorPool := pool.NewObjectPool(context.Background(), pool.NewPooledObjectFactorySimple( func(context.Context) (interface{}, error) { return compress.NewZlibDecompressor(), nil @@ -113,6 +112,7 @@ func NewAggregatedQueueServer( rate.Limit(schedulingConfig.MaximumSchedulingRate), schedulingConfig.MaximumSchedulingBurst, ), + limiterByQueue: make(map[string]*rate.Limiter), schedulingInfoRepository: schedulingInfoRepository, decompressorPool: decompressorPool, executorRepository: executorRepository, diff --git a/internal/scheduler/constraints/constraints.go b/internal/scheduler/constraints/constraints.go index 650ea625dfa..569b93a204f 100644 --- a/internal/scheduler/constraints/constraints.go +++ b/internal/scheduler/constraints/constraints.go @@ -16,7 +16,7 @@ const ( // Indicates that the limit on resources scheduled per round has been exceeded. MaximumResourcesScheduledUnschedulableReason = "maximum resources scheduled" - // Indicates that the queue has been assigned more than its allowed amount of resources. + // Indicates that a queue has been assigned more than its allowed amount of resources. MaximumResourcesPerQueueExceededUnschedulableReason = "maximum total resources for this queue exceeded" // Indicates that the scheduling rate limit has been exceeded. @@ -28,7 +28,7 @@ const ( QueueRateLimitExceededByGangUnschedulableReason = "gang would exceed queue scheduling rate limit" // Indicates that the number of jobs in a gang exceeds the burst size. - // This means the gang can not be scheduled without manually increasing the burst size. + // This means the gang can not be scheduled without first increasing the burst size. GangExceedsGlobalBurstSizeUnschedulableReason = "gang cardinality too large: exceeds global max burst size" GangExceedsQueueBurstSizeUnschedulableReason = "gang cardinality too large: exceeds queue max burst size" ) @@ -138,7 +138,7 @@ func (constraints *SchedulingConstraints) CheckConstraints( } // Check that the job is large enough for this executor. - if ok, unschedulableReason := requestsAreLargeEnough(gctx.TotalResourceRequests, constraints.MinimumJobSize); !ok { + if ok, unschedulableReason := RequestsAreLargeEnough(gctx.TotalResourceRequests, constraints.MinimumJobSize); !ok { return false, unschedulableReason, nil } @@ -179,7 +179,7 @@ func (constraints *SchedulingConstraints) CheckConstraints( return true, "", nil } -func requestsAreLargeEnough(totalResourceRequests, minRequest schedulerobjects.ResourceList) (bool, string) { +func RequestsAreLargeEnough(totalResourceRequests, minRequest schedulerobjects.ResourceList) (bool, string) { for t, minQuantity := range minRequest.Resources { q := totalResourceRequests.Get(t) if minQuantity.Cmp(q) == 1 { diff --git a/internal/scheduler/context/context.go b/internal/scheduler/context/context.go index d3c6ec7144c..3c9cbd97289 100644 --- a/internal/scheduler/context/context.go +++ b/internal/scheduler/context/context.go @@ -260,15 +260,6 @@ func (sctx *SchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingContex sctx.ScheduledResources.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests) sctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) sctx.NumScheduledJobs++ - - // // Remove a token from the rate-limiter bucket. - // // We don't check the return value here as we allow exceeding the rate limit by one gang. - // // Rather, we check whether the number of tokens is positive before scheduling a new job. - // if sctx.Limiter != nil { - // fmt.Println("Tokens before adding job:", sctx.Limiter.TokensAt(sctx.Started)) - // v := sctx.Limiter.AllowN(sctx.Started, 1) - // fmt.Println("Tokens after adding job:", sctx.Limiter.TokensAt(sctx.Started), "v:", v) - // } } } return evictedInThisRound, nil @@ -504,13 +495,6 @@ func (qctx *QueueSchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingC } else { qctx.SuccessfulJobSchedulingContexts[jctx.JobId] = jctx qctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) - - // // Remove a token from the rate-limiter bucket. - // // We don't check the return value here as we allow exceeding the rate limit by one gang. - // // Rather, we check whether the number of tokens is positive before scheduling a new job. - // if qctx.Limiter != nil { - // qctx.Limiter.AllowN(qctx.SchedulingContext.Started, 1) - // } } } else { qctx.UnsuccessfulJobSchedulingContexts[jctx.JobId] = jctx diff --git a/internal/scheduler/gang_scheduler.go b/internal/scheduler/gang_scheduler.go index 33365550a11..bcd30e889b8 100644 --- a/internal/scheduler/gang_scheduler.go +++ b/internal/scheduler/gang_scheduler.go @@ -11,7 +11,6 @@ import ( schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/interfaces" "github.com/armadaproject/armada/internal/scheduler/nodedb" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) // GangScheduler schedules one gang at a time. GangScheduler is not aware of queues. @@ -229,13 +228,3 @@ func meanScheduledAtPriorityFromGctx(gctx *schedulercontext.GangSchedulingContex } return float64(sum) / float64(len(gctx.JobSchedulingContexts)), true } - -func requestsAreLargeEnough(totalResourceRequests, minRequest schedulerobjects.ResourceList) (bool, string) { - for t, minQuantity := range minRequest.Resources { - q := totalResourceRequests.Get(t) - if minQuantity.Cmp(q) == 1 { - return false, fmt.Sprintf("job requests %s %s, but the minimum is %s", q.String(), t, minQuantity.String()) - } - } - return true, "" -} diff --git a/internal/scheduler/pool_assigner.go b/internal/scheduler/pool_assigner.go index eeeeb28672e..94aa07e4908 100644 --- a/internal/scheduler/pool_assigner.go +++ b/internal/scheduler/pool_assigner.go @@ -11,6 +11,7 @@ import ( "github.com/armadaproject/armada/internal/armada/configuration" "github.com/armadaproject/armada/internal/common/types" + "github.com/armadaproject/armada/internal/scheduler/constraints" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/database" "github.com/armadaproject/armada/internal/scheduler/jobdb" @@ -128,7 +129,7 @@ func (p *DefaultPoolAssigner) AssignPool(j *jobdb.Job) (string, error) { for pool, executors := range p.executorsByPool { for _, e := range executors { requests := req.GetResourceRequirements().Requests - if ok, _ := requestsAreLargeEnough(schedulerobjects.ResourceListFromV1ResourceList(requests), e.minimumJobSize); !ok { + if ok, _ := constraints.RequestsAreLargeEnough(schedulerobjects.ResourceListFromV1ResourceList(requests), e.minimumJobSize); !ok { continue } nodeDb := e.nodeDb diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index e2d19068404..dc84cd225ae 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -1316,13 +1316,6 @@ func TestPreemptingQueueScheduler(t *testing.T) { } for name, tc := range tests { t.Run(name, func(t *testing.T) { - // // balancing three queues - // // gang_preemption_with_partial_gang - // "rescheduled jobs don't count towards global scheduling rate limit" - if name != "exceeding rate limiter burst capacity increase recovery time" { - return - } - nodeDb, err := NewNodeDb() require.NoError(t, err) txn := nodeDb.Txn(true) From 008d9064b28dcf4d81cf1c1a2bee59a9e46a6c1c Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Fri, 8 Sep 2023 10:11:45 +0100 Subject: [PATCH 10/14] Docstring improvements --- internal/armada/configuration/types.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/internal/armada/configuration/types.go b/internal/armada/configuration/types.go index 708b15aaa6a..c1b518d09a0 100644 --- a/internal/armada/configuration/types.go +++ b/internal/armada/configuration/types.go @@ -124,14 +124,19 @@ type SchedulingConfig struct { MaximumResourceFractionToSchedule map[string]float64 // Overrides MaximalClusterFractionToSchedule if set for the current pool. MaximumResourceFractionToScheduleByPool map[string]map[string]float64 - // Token bucket global job scheduling rate limiter settings; see - // https://pkg.go.dev/golang.org/x/time/rate#Limiter + // The rate at which Armada schedules jobs is rate-limited using a token bucket approach. + // Specifically, there is a token bucket that persists between scheduling rounds. + // The bucket fills up at a rate of MaximumSchedulingRate tokens per second and has capacity MaximumSchedulingBurst. + // A token is removed from the bucket when a scheduling a job and scheduling stops while the bucket is empty. + // + // Hence, MaximumSchedulingRate controls the maximum number of jobs scheduled per second in steady-state, + // i.e., once the burst capacity has been exhausted. // // Rate-limiting is based on the number of tokens available at the start of each scheduling round, - // i.e., tokens accumulated while scheduling are only available at the start of the next scheduling round. + // i.e., tokens accumulated while scheduling become available at the start of the next scheduling round. // - // MaximumSchedulingRate controls the number of jobs scheduled per second in steady-state, - // i.e., once the burst capacity has been exhausted. + // For more information about the rate-limiter, see: + // https://pkg.go.dev/golang.org/x/time/rate#Limiter MaximumSchedulingRate float64 `validate:"gt=0"` // MaximumSchedulingBurst controls the burst capacity of the rate-limiter. // @@ -139,6 +144,9 @@ type SchedulingConfig struct { // - Armada will never schedule more than MaximumSchedulingBurst jobs per scheduling round. // - Gang jobs with cardinality greater than MaximumSchedulingBurst can never be scheduled. MaximumSchedulingBurst int `validate:"gt=0"` + // In addition to the global rate-limiter, there is a separate rate-limiter for each queue. + // These work the same as the global rate-limiter, except they apply only to jobs scheduled from a specific queue. + // // Per-queue version of MaximumSchedulingRate. MaximumPerQueueSchedulingRate float64 `validate:"gt=0"` // Per-queue version of MaximumSchedulingBurst. From 81d4dce7b12fb00b3274ae191c5d3b0fa42a809a Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Fri, 8 Sep 2023 10:15:03 +0100 Subject: [PATCH 11/14] Remove limiter nil checks --- internal/scheduler/constraints/constraints.go | 40 +++++++++---------- internal/scheduler/gang_scheduler.go | 7 +--- 2 files changed, 20 insertions(+), 27 deletions(-) diff --git a/internal/scheduler/constraints/constraints.go b/internal/scheduler/constraints/constraints.go index 569b93a204f..415ff2d888f 100644 --- a/internal/scheduler/constraints/constraints.go +++ b/internal/scheduler/constraints/constraints.go @@ -143,31 +143,27 @@ func (constraints *SchedulingConstraints) CheckConstraints( } // Global rate limiter check. - if sctx.Limiter != nil { - tokens := sctx.Limiter.TokensAt(sctx.Started) - if tokens <= 0 { - return false, GlobalRateLimitExceededUnschedulableReason, nil - } - if sctx.Limiter.Burst() < len(gctx.JobSchedulingContexts) { - return false, GangExceedsGlobalBurstSizeUnschedulableReason, nil - } - if tokens < float64(len(gctx.JobSchedulingContexts)) { - return false, GlobalRateLimitExceededByGangUnschedulableReason, nil - } + tokens := sctx.Limiter.TokensAt(sctx.Started) + if tokens <= 0 { + return false, GlobalRateLimitExceededUnschedulableReason, nil + } + if sctx.Limiter.Burst() < len(gctx.JobSchedulingContexts) { + return false, GangExceedsGlobalBurstSizeUnschedulableReason, nil + } + if tokens < float64(len(gctx.JobSchedulingContexts)) { + return false, GlobalRateLimitExceededByGangUnschedulableReason, nil } // Per-queue rate limiter check. - if qctx.Limiter != nil { - tokens := qctx.Limiter.TokensAt(sctx.Started) - if tokens <= 0 { - return false, QueueRateLimitExceededUnschedulableReason, nil - } - if qctx.Limiter.Burst() < len(gctx.JobSchedulingContexts) { - return false, GangExceedsQueueBurstSizeUnschedulableReason, nil - } - if tokens < float64(len(gctx.JobSchedulingContexts)) { - return false, QueueRateLimitExceededByGangUnschedulableReason, nil - } + tokens = qctx.Limiter.TokensAt(sctx.Started) + if tokens <= 0 { + return false, QueueRateLimitExceededUnschedulableReason, nil + } + if qctx.Limiter.Burst() < len(gctx.JobSchedulingContexts) { + return false, GangExceedsQueueBurstSizeUnschedulableReason, nil + } + if tokens < float64(len(gctx.JobSchedulingContexts)) { + return false, QueueRateLimitExceededByGangUnschedulableReason, nil } // PriorityClassSchedulingConstraintsByPriorityClassName check. diff --git a/internal/scheduler/gang_scheduler.go b/internal/scheduler/gang_scheduler.go index bcd30e889b8..aba16014f33 100644 --- a/internal/scheduler/gang_scheduler.go +++ b/internal/scheduler/gang_scheduler.go @@ -60,11 +60,8 @@ func (sch *GangScheduler) Schedule(ctx context.Context, gctx *schedulercontext.G // Update rate-limiters to account for new successfully scheduled jobs. if ok && !gctx.AllJobsEvicted { - if sch.schedulingContext.Limiter != nil { - sch.schedulingContext.Limiter.ReserveN(sch.schedulingContext.Started, len(gctx.JobSchedulingContexts)) - } - qctx := sch.schedulingContext.QueueSchedulingContexts[gctx.Queue] - if qctx != nil && qctx.Limiter != nil { + sch.schedulingContext.Limiter.ReserveN(sch.schedulingContext.Started, len(gctx.JobSchedulingContexts)) + if qctx := sch.schedulingContext.QueueSchedulingContexts[gctx.Queue]; qctx != nil { qctx.Limiter.ReserveN(sch.schedulingContext.Started, len(gctx.JobSchedulingContexts)) } } From 1bce27da3320f41c65f3826e78cc0eb2351be3fa Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Fri, 8 Sep 2023 10:20:50 +0100 Subject: [PATCH 12/14] Add Cardinality() function on gctx --- internal/scheduler/constraints/constraints.go | 8 ++++---- internal/scheduler/context/context.go | 5 +++++ internal/scheduler/gang_scheduler.go | 10 +++++----- internal/scheduler/queue_scheduler.go | 2 +- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/internal/scheduler/constraints/constraints.go b/internal/scheduler/constraints/constraints.go index 415ff2d888f..b859d71bca1 100644 --- a/internal/scheduler/constraints/constraints.go +++ b/internal/scheduler/constraints/constraints.go @@ -147,10 +147,10 @@ func (constraints *SchedulingConstraints) CheckConstraints( if tokens <= 0 { return false, GlobalRateLimitExceededUnschedulableReason, nil } - if sctx.Limiter.Burst() < len(gctx.JobSchedulingContexts) { + if sctx.Limiter.Burst() < gctx.Cardinality() { return false, GangExceedsGlobalBurstSizeUnschedulableReason, nil } - if tokens < float64(len(gctx.JobSchedulingContexts)) { + if tokens < float64(gctx.Cardinality()) { return false, GlobalRateLimitExceededByGangUnschedulableReason, nil } @@ -159,10 +159,10 @@ func (constraints *SchedulingConstraints) CheckConstraints( if tokens <= 0 { return false, QueueRateLimitExceededUnschedulableReason, nil } - if qctx.Limiter.Burst() < len(gctx.JobSchedulingContexts) { + if qctx.Limiter.Burst() < gctx.Cardinality() { return false, GangExceedsQueueBurstSizeUnschedulableReason, nil } - if tokens < float64(len(gctx.JobSchedulingContexts)) { + if tokens < float64(gctx.Cardinality()) { return false, QueueRateLimitExceededByGangUnschedulableReason, nil } diff --git a/internal/scheduler/context/context.go b/internal/scheduler/context/context.go index 3c9cbd97289..29c12bbc374 100644 --- a/internal/scheduler/context/context.go +++ b/internal/scheduler/context/context.go @@ -574,6 +574,11 @@ func NewGangSchedulingContext(jctxs []*JobSchedulingContext) *GangSchedulingCont } } +// Cardinality returns the number of jobs in the gang. +func (gctx *GangSchedulingContext) Cardinality() int { + return len(gctx.JobSchedulingContexts) +} + func isEvictedJob(job interfaces.LegacySchedulerJob) bool { return job.GetAnnotations()[schedulerconfig.IsEvictedAnnotation] == "true" } diff --git a/internal/scheduler/gang_scheduler.go b/internal/scheduler/gang_scheduler.go index aba16014f33..ffca7be9f8e 100644 --- a/internal/scheduler/gang_scheduler.go +++ b/internal/scheduler/gang_scheduler.go @@ -60,9 +60,9 @@ func (sch *GangScheduler) Schedule(ctx context.Context, gctx *schedulercontext.G // Update rate-limiters to account for new successfully scheduled jobs. if ok && !gctx.AllJobsEvicted { - sch.schedulingContext.Limiter.ReserveN(sch.schedulingContext.Started, len(gctx.JobSchedulingContexts)) + sch.schedulingContext.Limiter.ReserveN(sch.schedulingContext.Started, gctx.Cardinality()) if qctx := sch.schedulingContext.QueueSchedulingContexts[gctx.Queue]; qctx != nil { - qctx.Limiter.ReserveN(sch.schedulingContext.Started, len(gctx.JobSchedulingContexts)) + qctx.Limiter.ReserveN(sch.schedulingContext.Started, gctx.Cardinality()) } } @@ -86,7 +86,7 @@ func (sch *GangScheduler) Schedule(ctx context.Context, gctx *schedulercontext.G // // Only record unfeasible scheduling keys for single-job gangs. // Since a gang may be unschedulable even if all its members are individually schedulable. - if !sch.skipUnsuccessfulSchedulingKeyCheck && len(gctx.JobSchedulingContexts) == 1 { + if !sch.skipUnsuccessfulSchedulingKeyCheck && gctx.Cardinality() == 1 { jctx := gctx.JobSchedulingContexts[0] schedulingKey := sch.schedulingContext.SchedulingKeyFromLegacySchedulerJob(jctx.Job) if _, ok := sch.schedulingContext.UnfeasibleSchedulingKeys[schedulingKey]; !ok { @@ -196,7 +196,7 @@ func (sch *GangScheduler) tryScheduleGangWithTxn(ctx context.Context, txn *memdb jctx.PodSchedulingContext.NodeId = "" } } - if len(gctx.JobSchedulingContexts) > 1 { + if gctx.Cardinality() > 1 { unschedulableReason = "at least one job in the gang does not fit on any node" } else { unschedulableReason = "job does not fit on any node" @@ -223,5 +223,5 @@ func meanScheduledAtPriorityFromGctx(gctx *schedulercontext.GangSchedulingContex } sum += jctx.PodSchedulingContext.ScheduledAtPriority } - return float64(sum) / float64(len(gctx.JobSchedulingContexts)), true + return float64(sum) / float64(gctx.Cardinality()), true } diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index 894f49015aa..825c9f26bfb 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -74,7 +74,7 @@ func (sch *QueueScheduler) Schedule(ctx context.Context) (*SchedulerResult, erro if gctx == nil { break } - if len(gctx.JobSchedulingContexts) == 0 { + if gctx.Cardinality() == 0 { if err := sch.candidateGangIterator.Clear(); err != nil { return nil, err } From 8d7d05c3e02bd0faef9af18a5a25037da9ef4893 Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Fri, 8 Sep 2023 10:34:41 +0100 Subject: [PATCH 13/14] Fix test --- internal/scheduler/gang_scheduler_test.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/internal/scheduler/gang_scheduler_test.go b/internal/scheduler/gang_scheduler_test.go index fb7a55bc6d4..e5fbafad703 100644 --- a/internal/scheduler/gang_scheduler_test.go +++ b/internal/scheduler/gang_scheduler_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/api/resource" "github.com/armadaproject/armada/internal/armada/configuration" @@ -340,11 +341,22 @@ func TestGangScheduler(t *testing.T) { tc.SchedulingConfig.Preemption.PriorityClasses, tc.SchedulingConfig.Preemption.DefaultPriorityClass, fairnessCostProvider, - nil, + rate.NewLimiter( + rate.Limit(tc.SchedulingConfig.MaximumSchedulingRate), + tc.SchedulingConfig.MaximumSchedulingBurst, + ), tc.TotalResources, ) for queue, priorityFactor := range priorityFactorByQueue { - err := sctx.AddQueueSchedulingContext(queue, priorityFactor, nil, nil) + err := sctx.AddQueueSchedulingContext( + queue, + priorityFactor, + nil, + rate.NewLimiter( + rate.Limit(tc.SchedulingConfig.MaximumPerQueueSchedulingRate), + tc.SchedulingConfig.MaximumPerQueueSchedulingBurst, + ), + ) require.NoError(t, err) } constraints := schedulerconstraints.SchedulingConstraintsFromSchedulingConfig( From e6af79d8da098899f4d23e3b23f9b92df18b1c24 Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Fri, 8 Sep 2023 10:40:13 +0100 Subject: [PATCH 14/14] Fix test --- internal/scheduler/simulator/simulator.go | 24 +++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/internal/scheduler/simulator/simulator.go b/internal/scheduler/simulator/simulator.go index 5d157e87e01..639f617c275 100644 --- a/internal/scheduler/simulator/simulator.go +++ b/internal/scheduler/simulator/simulator.go @@ -20,6 +20,7 @@ import ( "github.com/spf13/viper" "golang.org/x/exp/maps" "golang.org/x/exp/slices" + "golang.org/x/time/rate" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/yaml" @@ -69,6 +70,11 @@ type Simulator struct { eventLog EventLog // Simulated events are emitted on this channel in order. c chan *armadaevents.EventSequence + + // Global job scheduling rate-limiter. + limiter *rate.Limiter + // Per-queue job scheduling rate-limiters. + limiterByQueue map[string]*rate.Limiter } func NewSimulator(testCase *TestCase, schedulingConfig configuration.SchedulingConfig) (*Simulator, error) { @@ -143,6 +149,11 @@ func NewSimulator(testCase *TestCase, schedulingConfig configuration.SchedulingC allocationByPoolAndQueueAndPriorityClass: make(map[string]map[string]schedulerobjects.QuantityByTAndResourceType[string]), totalResourcesByPool: totalResourcesByPool, c: make(chan *armadaevents.EventSequence), + limiter: rate.NewLimiter( + rate.Limit(schedulingConfig.MaximumSchedulingRate), + schedulingConfig.MaximumSchedulingBurst, + ), + limiterByQueue: make(map[string]*rate.Limiter), } // Mark all jobTemplates as active. @@ -415,15 +426,24 @@ func (s *Simulator) handleScheduleEvent() error { s.schedulingConfig.Preemption.PriorityClasses, s.schedulingConfig.Preemption.DefaultPriorityClass, fairnessCostProvider, - nil, + s.limiter, totalResources, ) + sctx.Started = s.time for _, queue := range s.testCase.Queues { + limiter, ok := s.limiterByQueue[queue.Name] + if !ok { + limiter = rate.NewLimiter( + rate.Limit(s.schedulingConfig.MaximumPerQueueSchedulingRate), + s.schedulingConfig.MaximumPerQueueSchedulingBurst, + ) + s.limiterByQueue[queue.Name] = limiter + } err := sctx.AddQueueSchedulingContext( queue.Name, queue.Weight, s.allocationByPoolAndQueueAndPriorityClass[pool.Name][queue.Name], - nil, + limiter, ) if err != nil { return err