From 3cd72a2e895fba1021948a91141ced70ecc4dfa7 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Fri, 11 Oct 2024 16:44:31 +0100 Subject: [PATCH] Only evict jobs if we know about their queue (#4001) * Only evict jobs if we know about their queue Signed-off-by: Chris Martin * fix tests Signed-off-by: Chris Martin * add test Signed-off-by: Chris Martin --------- Signed-off-by: Chris Martin --- internal/scheduler/scheduling/context/job.go | 7 +-- .../scheduling/context/scheduling.go | 22 +++++-- internal/scheduler/scheduling/context/util.go | 11 ++++ .../scheduling/preempting_queue_scheduler.go | 14 +++++ .../preempting_queue_scheduler_test.go | 59 +++++++++++++++---- 5 files changed, 91 insertions(+), 22 deletions(-) diff --git a/internal/scheduler/scheduling/context/job.go b/internal/scheduler/scheduling/context/job.go index cb4df575784..ae8fd0966dd 100644 --- a/internal/scheduler/scheduling/context/job.go +++ b/internal/scheduler/scheduling/context/job.go @@ -63,12 +63,7 @@ type JobSchedulingContext struct { } func (jctx *JobSchedulingContext) IsHomeJob(currentPool string) bool { - // Away jobs can never have been scheduled in this round - // and therefore must have an active run - if jctx.Job.Queued() || jctx.Job.LatestRun() == nil { - return true - } - return jctx.Job.LatestRun().Pool() == currentPool + return IsHomeJob(jctx.Job, currentPool) } func (jctx *JobSchedulingContext) String() string { diff --git a/internal/scheduler/scheduling/context/scheduling.go b/internal/scheduler/scheduling/context/scheduling.go index e554853393f..e3bb7ae9d32 100644 --- a/internal/scheduler/scheduling/context/scheduling.go +++ b/internal/scheduler/scheduling/context/scheduling.go @@ -14,6 +14,7 @@ import ( "github.com/armadaproject/armada/internal/common/armadaerrors" armadamaps "github.com/armadaproject/armada/internal/common/maps" + "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" "github.com/armadaproject/armada/internal/scheduler/scheduling/fairness" ) @@ -319,11 +320,16 @@ func (sctx *SchedulingContext) EvictGang(gctx *GangSchedulingContext) (bool, err return allJobsScheduledInThisRound, nil } +// QueueContextExists returns true if we know about the queue associated with the job. An example of when this can +// return false is when a job is running on a node +func (sctx *SchedulingContext) QueueContextExists(job *jobdb.Job) bool { + queue := sctx.resolveQueueName(job) + _, ok := sctx.QueueSchedulingContexts[queue] + return ok +} + func (sctx *SchedulingContext) EvictJob(jctx *JobSchedulingContext) (bool, error) { - queue := jctx.Job.Queue() - if !jctx.IsHomeJob(sctx.Pool) { - queue = CalculateAwayQueueName(jctx.Job.Queue()) - } + queue := sctx.resolveQueueName(jctx.Job) qctx, ok := sctx.QueueSchedulingContexts[queue] if !ok { return false, errors.Errorf("failed adding job %s to scheduling context: no context for queue %s", jctx.JobId, queue) @@ -391,3 +397,11 @@ func (sctx *SchedulingContext) FairnessError() float64 { } return fairnessError } + +func (sctx *SchedulingContext) resolveQueueName(job *jobdb.Job) string { + queue := job.Queue() + if !IsHomeJob(job, sctx.Pool) { + queue = CalculateAwayQueueName(job.Queue()) + } + return queue +} diff --git a/internal/scheduler/scheduling/context/util.go b/internal/scheduler/scheduling/context/util.go index 36164be6e83..d5494e1fd03 100644 --- a/internal/scheduler/scheduling/context/util.go +++ b/internal/scheduler/scheduling/context/util.go @@ -1,5 +1,16 @@ package context +import "github.com/armadaproject/armada/internal/scheduler/jobdb" + func CalculateAwayQueueName(queueName string) string { return queueName + "-away" } + +func IsHomeJob(job *jobdb.Job, currentPool string) bool { + // Away jobs can never have been scheduled in this round + // and therefore must have an active run + if job.Queued() || job.LatestRun() == nil { + return true + } + return job.LatestRun().Pool() == currentPool +} diff --git a/internal/scheduler/scheduling/preempting_queue_scheduler.go b/internal/scheduler/scheduling/preempting_queue_scheduler.go index 6f9763c30e7..1ea53569e5f 100644 --- a/internal/scheduler/scheduling/preempting_queue_scheduler.go +++ b/internal/scheduler/scheduling/preempting_queue_scheduler.go @@ -102,6 +102,10 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche if job.LatestRun().Pool() != sch.schedulingContext.Pool { return false } + if !sch.schedulingContext.QueueContextExists(job) { + ctx.Warnf("No queue context found for job %s. This job cannot be evicted", job.Id()) + return false + } priorityClass := job.PriorityClass() if !priorityClass.Preemptible { return false @@ -162,6 +166,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche evictorResult, inMemoryJobRepo, err = sch.evict( armadacontext.WithLogField(ctx, "stage", "evict oversubscribed"), NewOversubscribedEvictor( + sch.schedulingContext, sch.jobRepo, sch.nodeDb, ), @@ -660,9 +665,14 @@ func NewFilteredEvictor( } } +type queueChecker interface { + QueueContextExists(job *jobdb.Job) bool +} + // NewOversubscribedEvictor returns a new evictor that // for each node evicts all preemptible jobs of a priority class for which at least one job could not be scheduled func NewOversubscribedEvictor( + queueChecker queueChecker, jobRepo JobRepository, nodeDb *nodedb.NodeDb, ) *Evictor { @@ -687,6 +697,10 @@ func NewOversubscribedEvictor( return len(overSubscribedPriorities) > 0 }, jobFilter: func(ctx *armadacontext.Context, job *jobdb.Job) bool { + if !queueChecker.QueueContextExists(job) { + ctx.Warnf("No queue context found for job %s. This job cannot be evicted", job.Id()) + return false + } priorityClass := job.PriorityClass() if !priorityClass.Preemptible { return false diff --git a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go index eb78aa702f8..550b397a47b 100644 --- a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go +++ b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go @@ -31,6 +31,14 @@ import ( "github.com/armadaproject/armada/internal/scheduler/testfixtures" ) +type testQueueContextChecker struct { + jobIds map[string]bool +} + +func (t testQueueContextChecker) QueueContextExists(job *jobdb.Job) bool { + return t.jobIds[job.Id()] +} + func TestEvictOversubscribed(t *testing.T) { config := testfixtures.TestSchedulingConfig() @@ -58,6 +66,7 @@ func TestEvictOversubscribed(t *testing.T) { require.NoError(t, err) evictor := NewOversubscribedEvictor( + testQueueContextChecker{}, jobDbTxn, nodeDb) result, err := evictor.Evict(armadacontext.Background(), nodeDbTxn) @@ -95,12 +104,8 @@ func TestPreemptingQueueScheduler(t *testing.T) { Rounds []SchedulingRound // Map from queue to the priority factor associated with that queue. PriorityFactorByQueue map[string]float64 - // Initial resource usage for all queues. This value is used across all rounds, - // i.e., we don't update it based on preempted/scheduled jobs. - InitialAllocationByQueueAndPriorityClass map[string]schedulerobjects.QuantityByTAndResourceType[string] - // Total resources across all clusters. - // If empty, it is computed as the total resources across the provided nodes. - TotalResources schedulerobjects.ResourceList + // Map of nodeId to jobs running on those nodes + InitialRunningJobs map[int][]*jobdb.Job }{ "balancing three queues": { SchedulingConfig: testfixtures.TestSchedulingConfig(), @@ -236,6 +241,26 @@ func TestPreemptingQueueScheduler(t *testing.T) { "D": 100, }, }, + "don't prempt jobs where we don't know the queue": { + SchedulingConfig: testfixtures.TestSchedulingConfig(), + Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Rounds: []SchedulingRound{ + { + JobsByQueue: map[string][]*jobdb.Job{ + "A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass1, 32), + }, + ExpectedScheduledIndices: map[string][]int{ + "A": testfixtures.IntRange(0, 23), + }, + }, + }, + InitialRunningJobs: map[int][]*jobdb.Job{ + 0: testfixtures.N1Cpu4GiJobs("B", testfixtures.PriorityClass1, 8), + }, + PriorityFactorByQueue: map[string]float64{ + "A": 1, + }, + }, "avoid preemption when not improving fairness": { SchedulingConfig: testfixtures.TestSchedulingConfig(), Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), @@ -1879,10 +1904,22 @@ func TestPreemptingQueueScheduler(t *testing.T) { jobDb := jobdb.NewJobDb(tc.SchedulingConfig.PriorityClasses, tc.SchedulingConfig.DefaultPriorityClassName, stringinterner.New(1024), testfixtures.TestResourceListFactory) jobDbTxn := jobDb.WriteTxn() + // Add all the initial jobs, creating runs for them + for nodeIdx, jobs := range tc.InitialRunningJobs { + node := tc.Nodes[nodeIdx] + for _, job := range jobs { + err := jobDbTxn.Upsert([]*jobdb.Job{ + job.WithQueued(false). + WithNewRun(node.GetExecutor(), node.GetId(), node.GetName(), node.GetPool(), job.PriorityClass().Priority), + }) + require.NoError(t, err) + } + } + // Accounting across scheduling rounds. roundByJobId := make(map[string]int) indexByJobId := make(map[string]int) - allocatedByQueueAndPriorityClass := armadamaps.DeepCopy(tc.InitialAllocationByQueueAndPriorityClass) + allocatedByQueueAndPriorityClass := make(map[string]schedulerobjects.QuantityByTAndResourceType[string]) nodeIdByJobId := make(map[string]string) var jobIdsByGangId map[string]map[string]bool var gangIdByJobId map[string]string @@ -1994,9 +2031,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { } // If not provided, set total resources equal to the aggregate over tc.Nodes. - if tc.TotalResources.Resources == nil { - tc.TotalResources = nodeDb.TotalKubernetesResources() - } + totalResources := nodeDb.TotalKubernetesResources() fairnessCostProvider, err := fairness.NewDominantResourceFairness( nodeDb.TotalKubernetesResources(), @@ -2007,7 +2042,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { testfixtures.TestPool, fairnessCostProvider, limiter, - tc.TotalResources, + totalResources, ) sctx.Started = schedulingStarted.Add(time.Duration(i) * schedulingInterval) @@ -2023,7 +2058,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { ) require.NoError(t, err) } - constraints := schedulerconstraints.NewSchedulingConstraints("pool", tc.TotalResources, tc.SchedulingConfig, nil) + constraints := schedulerconstraints.NewSchedulingConstraints("pool", totalResources, tc.SchedulingConfig, nil) sctx.UpdateFairShares() sch := NewPreemptingQueueScheduler( sctx,