Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add per-queue scheduling rate-limiting #2938

Merged
merged 17 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/armada/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ scheduling:
maximumResourceFractionToSchedule:
memory: 1.0
cpu: 1.0
maximumSchedulingRate: 100.0
maximumSchedulingBurst: 1000
maximumPerQueueSchedulingRate: 50.0
maximumPerQueueSchedulingBurst: 1000
maxJobSchedulingContextsPerExecutor: 10000
lease:
expireAfter: 15m
Expand Down
5 changes: 4 additions & 1 deletion config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ scheduling:
maximumResourceFractionToSchedule:
memory: 1.0
cpu: 1.0
maximumJobsToSchedule: 5000
maximumSchedulingRate: 100.0
maximumSchedulingBurst: 1000
maximumPerQueueSchedulingRate: 50.0
maximumPerQueueSchedulingBurst: 1000
maxUnacknowledgedJobsPerExecutor: 2500
maxJobSchedulingContextsPerExecutor: 10000
defaultJobLimits:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ require (
github.com/prometheus/common v0.37.0
github.com/sanity-io/litter v1.5.5
github.com/segmentio/fasthash v1.0.3
golang.org/x/time v0.3.0
google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9
)

Expand Down Expand Up @@ -195,7 +196,6 @@ require (
golang.org/x/sys v0.7.0 // indirect
golang.org/x/term v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
google.golang.org/protobuf v1.30.0 // indirect
Expand Down
31 changes: 27 additions & 4 deletions internal/armada/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,33 @@ type SchedulingConfig struct {
MaximumResourceFractionToSchedule map[string]float64
// Overrides MaximalClusterFractionToSchedule if set for the current pool.
MaximumResourceFractionToScheduleByPool map[string]map[string]float64
// Max number of jobs to schedule in each invocation of the scheduler.
MaximumJobsToSchedule uint
// Max number of gangs to schedule in each invocation of the scheduler.
MaximumGangsToSchedule uint
// The rate at which Armada schedules jobs is rate-limited using a token bucket approach.
// Specifically, there is a token bucket that persists between scheduling rounds.
// The bucket fills up at a rate of MaximumSchedulingRate tokens per second and has capacity MaximumSchedulingBurst.
// A token is removed from the bucket when a scheduling a job and scheduling stops while the bucket is empty.
//
// Hence, MaximumSchedulingRate controls the maximum number of jobs scheduled per second in steady-state,
// i.e., once the burst capacity has been exhausted.
//
// Rate-limiting is based on the number of tokens available at the start of each scheduling round,
// i.e., tokens accumulated while scheduling become available at the start of the next scheduling round.
//
// For more information about the rate-limiter, see:
// https://pkg.go.dev/golang.org/x/time/rate#Limiter
MaximumSchedulingRate float64 `validate:"gt=0"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand why you've made the rate limits act over a given scheduling cycle, but I think longer term we want to make these apply over some timer horizon. That's because what users and administrators care about isn't the scheduling rate in given scheduling cycle but rather the scheduling rate over various time horizons.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the docstring to make clear the rate-limiter persists between rounds.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also might be worth exploring whether MaximumSchedulingRate MaximumPerQueueSchedulingRate could be ints here- the benefit being that config would be slighly simpler. MaximumSchedulingRate almost certainly could be an int, but MaximumPerQueueSchedulingRate maybe not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can leave them as floats. We can use either int or float values in config as-is.

// MaximumSchedulingBurst controls the burst capacity of the rate-limiter.
//
// There are two important implications:
// - Armada will never schedule more than MaximumSchedulingBurst jobs per scheduling round.
// - Gang jobs with cardinality greater than MaximumSchedulingBurst can never be scheduled.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we check for this at submit time? I.e. is is possible for a user to submit a gang job that can never be scheduled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. As discussed, let's add a max gang size separately and then have some warning if that is greater than the burst.

MaximumSchedulingBurst int `validate:"gt=0"`
// In addition to the global rate-limiter, there is a separate rate-limiter for each queue.
// These work the same as the global rate-limiter, except they apply only to jobs scheduled from a specific queue.
//
// Per-queue version of MaximumSchedulingRate.
MaximumPerQueueSchedulingRate float64 `validate:"gt=0"`
// Per-queue version of MaximumSchedulingBurst.
MaximumPerQueueSchedulingBurst int `validate:"gt=0"`
// Armada stores contexts associated with recent job scheduling attempts.
// This setting limits the number of such contexts to store.
// Contexts associated with the most recent scheduling attempt for each queue and cluster are always stored.
Expand Down
35 changes: 27 additions & 8 deletions internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
log "github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/utils/clock"
Expand Down Expand Up @@ -59,6 +60,10 @@ type AggregatedQueueServer struct {
schedulingInfoRepository repository.SchedulingInfoRepository
decompressorPool *pool.ObjectPool
clock clock.Clock
// Global job scheduling rate-limiter.
limiter *rate.Limiter
// Per-queue job scheduling rate-limiters.
limiterByQueue map[string]*rate.Limiter
// For storing reports of scheduling attempts.
SchedulingContextRepository *scheduler.SchedulingContextRepository
// Stores the most recent NodeDb for each executor.
Expand Down Expand Up @@ -92,18 +97,22 @@ func NewAggregatedQueueServer(
TimeBetweenEvictionRuns: 0,
NumTestsPerEvictionRun: 10,
}

decompressorPool := pool.NewObjectPool(context.Background(), pool.NewPooledObjectFactorySimple(
func(context.Context) (interface{}, error) {
return compress.NewZlibDecompressor(), nil
}), &poolConfig)
return &AggregatedQueueServer{
permissions: permissions,
schedulingConfig: schedulingConfig,
jobRepository: jobRepository,
queueRepository: queueRepository,
usageRepository: usageRepository,
eventStore: eventStore,
permissions: permissions,
schedulingConfig: schedulingConfig,
jobRepository: jobRepository,
queueRepository: queueRepository,
usageRepository: usageRepository,
eventStore: eventStore,
limiter: rate.NewLimiter(
rate.Limit(schedulingConfig.MaximumSchedulingRate),
schedulingConfig.MaximumSchedulingBurst,
),
limiterByQueue: make(map[string]*rate.Limiter),
schedulingInfoRepository: schedulingInfoRepository,
decompressorPool: decompressorPool,
executorRepository: executorRepository,
Expand Down Expand Up @@ -491,6 +500,7 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
q.schedulingConfig.Preemption.PriorityClasses,
q.schedulingConfig.Preemption.DefaultPriorityClass,
fairnessCostProvider,
q.limiter,
totalResources,
)
for queue, priorityFactor := range priorityFactorByQueue {
Expand All @@ -502,7 +512,16 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
if priorityFactor > 0 {
weight = 1 / priorityFactor
}
if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClassForPool[queue]); err != nil {
queueLimiter, ok := q.limiterByQueue[queue]
if !ok {
// Create per-queue limiters lazily.
queueLimiter = rate.NewLimiter(
rate.Limit(q.schedulingConfig.MaximumPerQueueSchedulingRate),
q.schedulingConfig.MaximumPerQueueSchedulingBurst,
)
q.limiterByQueue[queue] = queueLimiter
}
if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClassForPool[queue], queueLimiter); err != nil {
return nil, err
}
}
Expand Down
123 changes: 82 additions & 41 deletions internal/scheduler/constraints/constraints.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package constraints

import (
"fmt"
"math"

"github.com/pkg/errors"
Expand All @@ -12,32 +13,46 @@ import (
)

const (
UnschedulableReasonMaximumResourcesScheduled = "maximum resources scheduled"
UnschedulableReasonMaximumNumberOfJobsScheduled = "maximum number of jobs scheduled"
UnschedulableReasonMaximumNumberOfGangsScheduled = "maximum number of gangs scheduled"
UnschedulableReasonMaximumResourcesPerQueueExceeded = "maximum total resources for this queue exceeded"
// Indicates that the limit on resources scheduled per round has been exceeded.
MaximumResourcesScheduledUnschedulableReason = "maximum resources scheduled"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be worth making these reasons into a type just so that functions like IsTerminalUnschedulableReason can be more explicit about what they operate on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. But didn't want to do that in this PR.


// Indicates that a queue has been assigned more than its allowed amount of resources.
MaximumResourcesPerQueueExceededUnschedulableReason = "maximum total resources for this queue exceeded"

// Indicates that the scheduling rate limit has been exceeded.
GlobalRateLimitExceededUnschedulableReason = "global scheduling rate limit exceeded"
QueueRateLimitExceededUnschedulableReason = "queue scheduling rate limit exceeded"

// Indicates that scheduling a gang would exceed the rate limit.
GlobalRateLimitExceededByGangUnschedulableReason = "gang would exceed global scheduling rate limit"
QueueRateLimitExceededByGangUnschedulableReason = "gang would exceed queue scheduling rate limit"

// Indicates that the number of jobs in a gang exceeds the burst size.
// This means the gang can not be scheduled without first increasing the burst size.
GangExceedsGlobalBurstSizeUnschedulableReason = "gang cardinality too large: exceeds global max burst size"
GangExceedsQueueBurstSizeUnschedulableReason = "gang cardinality too large: exceeds queue max burst size"
)

// IsTerminalUnschedulableReason returns true if reason indicates it's not possible to schedule any more jobs in this round.
// IsTerminalUnschedulableReason returns true if reason indicates
// it's not possible to schedule any more jobs in this round.
func IsTerminalUnschedulableReason(reason string) bool {
if reason == UnschedulableReasonMaximumResourcesScheduled {
if reason == MaximumResourcesScheduledUnschedulableReason {
return true
}
if reason == UnschedulableReasonMaximumNumberOfJobsScheduled {
return true
}
if reason == UnschedulableReasonMaximumNumberOfGangsScheduled {
if reason == GlobalRateLimitExceededUnschedulableReason {
return true
}
return false
}

// IsTerminalQueueUnschedulableReason returns true if reason indicates
// it's not possible to schedule any more jobs from this queue in this round.
func IsTerminalQueueUnschedulableReason(reason string) bool {
return reason == QueueRateLimitExceededUnschedulableReason
}

// SchedulingConstraints contains scheduling constraints, e.g., per-queue resource limits.
type SchedulingConstraints struct {
// Max number of jobs to scheduler per lease jobs call.
MaximumJobsToSchedule uint
// Max number of jobs to scheduler per lease jobs call.
MaximumGangsToSchedule uint
// Max number of jobs to consider for a queue before giving up.
MaxQueueLookback uint
// Jobs leased to this executor must be at least this large.
Expand Down Expand Up @@ -82,8 +97,6 @@ func SchedulingConstraintsFromSchedulingConfig(
maximumResourceFractionToSchedule = m
}
return SchedulingConstraints{
MaximumJobsToSchedule: config.MaximumJobsToSchedule,
MaximumGangsToSchedule: config.MaximumGangsToSchedule,
MaxQueueLookback: config.MaxQueueLookback,
MinimumJobSize: minimumJobSize,
MaximumResourcesToSchedule: absoluteFromRelativeLimits(totalResources, maximumResourceFractionToSchedule),
Expand All @@ -99,47 +112,75 @@ func absoluteFromRelativeLimits(totalResources schedulerobjects.ResourceList, re
return absoluteLimits
}

func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *schedulercontext.SchedulingContext) (bool, string, error) {
// MaximumJobsToSchedule check.
if constraints.MaximumJobsToSchedule != 0 && sctx.NumScheduledJobs == int(constraints.MaximumJobsToSchedule) {
return false, UnschedulableReasonMaximumNumberOfJobsScheduled, nil
}

// MaximumGangsToSchedule check.
if constraints.MaximumGangsToSchedule != 0 && sctx.NumScheduledGangs == int(constraints.MaximumGangsToSchedule) {
return false, UnschedulableReasonMaximumNumberOfGangsScheduled, nil
}
// ScaleQuantity scales q in-place by a factor f.
// This functions overflows for quantities the milli value of which can't be expressed as an int64.
// E.g., 1Pi is ok, but not 10Pi.
func ScaleQuantity(q resource.Quantity, f float64) resource.Quantity {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this wasn't added here but functions like this scare the hell out of me!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may actually want to update this to return an error (or panic) if it would overflow.

q.SetMilli(int64(math.Round(float64(q.MilliValue()) * f)))
return q
}

func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *schedulercontext.SchedulingContext, queue string) (bool, string, error) {
// MaximumResourcesToSchedule check.
if !sctx.ScheduledResources.IsStrictlyLessOrEqual(constraints.MaximumResourcesToSchedule) {
return false, UnschedulableReasonMaximumResourcesScheduled, nil
return false, MaximumResourcesScheduledUnschedulableReason, nil
}
return true, "", nil
}

func (constraints *SchedulingConstraints) CheckPerQueueAndPriorityClassConstraints(
func (constraints *SchedulingConstraints) CheckConstraints(
sctx *schedulercontext.SchedulingContext,
queue string,
priorityClassName string,
gctx *schedulercontext.GangSchedulingContext,
) (bool, string, error) {
qctx := sctx.QueueSchedulingContexts[queue]
qctx := sctx.QueueSchedulingContexts[gctx.Queue]
if qctx == nil {
return false, "", errors.Errorf("no QueueSchedulingContext for queue %s", queue)
return false, "", errors.Errorf("no QueueSchedulingContext for queue %s", gctx.Queue)
}

// Check that the job is large enough for this executor.
if ok, unschedulableReason := RequestsAreLargeEnough(gctx.TotalResourceRequests, constraints.MinimumJobSize); !ok {
return false, unschedulableReason, nil
}

// Global rate limiter check.
tokens := sctx.Limiter.TokensAt(sctx.Started)
if tokens <= 0 {
return false, GlobalRateLimitExceededUnschedulableReason, nil
}
if sctx.Limiter.Burst() < gctx.Cardinality() {
return false, GangExceedsGlobalBurstSizeUnschedulableReason, nil
}
if tokens < float64(gctx.Cardinality()) {
return false, GlobalRateLimitExceededByGangUnschedulableReason, nil
}

// Per-queue rate limiter check.
tokens = qctx.Limiter.TokensAt(sctx.Started)
if tokens <= 0 {
return false, QueueRateLimitExceededUnschedulableReason, nil
}
if qctx.Limiter.Burst() < gctx.Cardinality() {
return false, GangExceedsQueueBurstSizeUnschedulableReason, nil
}
if tokens < float64(gctx.Cardinality()) {
return false, QueueRateLimitExceededByGangUnschedulableReason, nil
}

// PriorityClassSchedulingConstraintsByPriorityClassName check.
if priorityClassConstraint, ok := constraints.PriorityClassSchedulingConstraintsByPriorityClassName[priorityClassName]; ok {
if !qctx.AllocatedByPriorityClass[priorityClassName].IsStrictlyLessOrEqual(priorityClassConstraint.MaximumResourcesPerQueue) {
return false, UnschedulableReasonMaximumResourcesPerQueueExceeded, nil
if priorityClassConstraint, ok := constraints.PriorityClassSchedulingConstraintsByPriorityClassName[gctx.PriorityClassName]; ok {
if !qctx.AllocatedByPriorityClass[gctx.PriorityClassName].IsStrictlyLessOrEqual(priorityClassConstraint.MaximumResourcesPerQueue) {
return false, MaximumResourcesPerQueueExceededUnschedulableReason, nil
}
}
return true, "", nil
}

// ScaleQuantity scales q in-place by a factor f.
// This functions overflows for quantities the milli value of which can't be expressed as an int64.
// E.g., 1Pi is ok, but not 10Pi.
func ScaleQuantity(q resource.Quantity, f float64) resource.Quantity {
q.SetMilli(int64(math.Round(float64(q.MilliValue()) * f)))
return q
func RequestsAreLargeEnough(totalResourceRequests, minRequest schedulerobjects.ResourceList) (bool, string) {
for t, minQuantity := range minRequest.Resources {
q := totalResourceRequests.Get(t)
if minQuantity.Cmp(q) == 1 {
return false, fmt.Sprintf("job requests %s %s, but the minimum is %s", q.String(), t, minQuantity.String())
}
}
return true, ""
}
4 changes: 2 additions & 2 deletions internal/scheduler/constraints/constraints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ func TestConstraints(t *testing.T) {
}{} // TODO: Add tests.
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
ok, unschedulableReason, err := tc.constraints.CheckRoundConstraints(tc.sctx)
ok, unschedulableReason, err := tc.constraints.CheckRoundConstraints(tc.sctx, tc.queue)
require.NoError(t, err)
require.Equal(t, tc.globalUnschedulableReason == "", ok)
require.Equal(t, tc.globalUnschedulableReason, unschedulableReason)

ok, unschedulableReason, err = tc.constraints.CheckPerQueueAndPriorityClassConstraints(tc.sctx, tc.queue, tc.priorityClassName)
ok, unschedulableReason, err = tc.constraints.CheckConstraints(tc.sctx, nil)
require.NoError(t, err)
require.Equal(t, tc.perQueueAndPriorityClassUnschedulableReason == "", ok)
require.Equal(t, tc.perQueueAndPriorityClassUnschedulableReason, unschedulableReason)
Expand Down
Loading