From 9d3feb49245758c0551c78ae4f459b0e7c99e72b Mon Sep 17 00:00:00 2001 From: robertdavidsmith <34475852+robertdavidsmith@users.noreply.github.com> Date: Tue, 31 Dec 2024 11:07:08 +0000 Subject: [PATCH] Scheduler: refactor: move nodedb tests to internaltypes (#4110) Signed-off-by: Robert Smith --- .../scheduler/internaltypes/node_factory.go | 62 +- .../internaltypes/resource_list_map_util.go | 24 + .../resource_list_map_util_test.go | 50 ++ internal/scheduler/nodedb/nodedb_test.go | 183 ++-- .../scheduler/nodedb/nodeiteration_test.go | 786 +++++++----------- .../scheduler/testfixtures/testfixtures.go | 158 ++++ 6 files changed, 690 insertions(+), 573 deletions(-) diff --git a/internal/scheduler/internaltypes/node_factory.go b/internal/scheduler/internaltypes/node_factory.go index 22509f17ac3..76d454c11ad 100644 --- a/internal/scheduler/internaltypes/node_factory.go +++ b/internal/scheduler/internaltypes/node_factory.go @@ -2,6 +2,7 @@ package internaltypes import ( "fmt" + "sync/atomic" v1 "k8s.io/api/core/v1" @@ -30,7 +31,7 @@ type NodeFactory struct { resourceListFactory *ResourceListFactory // Used for assigning node index - nodeIndexCounter uint64 + nodeIndexCounter atomic.Uint64 } func NewNodeFactory( @@ -42,7 +43,7 @@ func NewNodeFactory( indexedTaints: util.StringListToSet(indexedTaints), indexedNodeLabels: util.StringListToSet(indexedNodeLabels), resourceListFactory: resourceListFactory, - nodeIndexCounter: 0, + nodeIndexCounter: atomic.Uint64{}, } } @@ -58,10 +59,9 @@ func (f *NodeFactory) CreateNodeAndType( unallocatableResources map[int32]ResourceList, allocatableByPriority map[int32]ResourceList, ) *Node { - f.nodeIndexCounter++ return CreateNodeAndType( id, - f.nodeIndexCounter, + f.allocateNodeIndex(), executor, name, pool, @@ -77,9 +77,8 @@ func (f *NodeFactory) CreateNodeAndType( } func (f *NodeFactory) FromSchedulerObjectsNode(node *schedulerobjects.Node) (*Node, error) { - f.nodeIndexCounter++ return FromSchedulerObjectsNode(node, - f.nodeIndexCounter, + f.allocateNodeIndex(), f.indexedTaints, f.indexedNodeLabels, f.resourceListFactory, @@ -104,3 +103,54 @@ func (f *NodeFactory) FromSchedulerObjectsExecutors(executors []*schedulerobject } return result } + +func (f *NodeFactory) ResourceListFactory() *ResourceListFactory { + return f.resourceListFactory +} + +func (f *NodeFactory) AddLabels(nodes []*Node, extraLabels map[string]string) []*Node { + result := make([]*Node, len(nodes)) + for i, node := range nodes { + newLabels := util.MergeMaps(node.GetLabels(), extraLabels) + result[i] = CreateNodeAndType(node.GetId(), + node.GetIndex(), + node.GetExecutor(), + node.GetName(), + node.GetPool(), + false, + node.GetTaints(), + newLabels, + f.indexedTaints, + f.indexedNodeLabels, + node.GetTotalResources(), + node.GetUnallocatableResources(), + node.AllocatableByPriority, + ) + } + return result +} + +func (f *NodeFactory) AddTaints(nodes []*Node, extraTaints []v1.Taint) []*Node { + result := make([]*Node, len(nodes)) + for i, node := range nodes { + result[i] = CreateNodeAndType(node.GetId(), + node.GetIndex(), + node.GetExecutor(), + node.GetName(), + node.GetPool(), + false, + append(node.GetTaints(), extraTaints...), + node.GetLabels(), + f.indexedTaints, + f.indexedNodeLabels, + node.GetTotalResources(), + node.GetUnallocatableResources(), + node.AllocatableByPriority, + ) + } + return result +} + +func (f *NodeFactory) allocateNodeIndex() uint64 { + return f.nodeIndexCounter.Add(1) +} diff --git a/internal/scheduler/internaltypes/resource_list_map_util.go b/internal/scheduler/internaltypes/resource_list_map_util.go index b359703034a..b851e82c6cc 100644 --- a/internal/scheduler/internaltypes/resource_list_map_util.go +++ b/internal/scheduler/internaltypes/resource_list_map_util.go @@ -57,3 +57,27 @@ func RlMapRemoveZeros(m map[string]ResourceList) map[string]ResourceList { } return result } + +func NewAllocatableByPriorityAndResourceType(priorities []int32, rl ResourceList) map[int32]ResourceList { + result := map[int32]ResourceList{} + for _, priority := range priorities { + result[priority] = rl + } + return result +} + +// MarkAllocated indicates resources have been allocated to pods of priority p, +// hence reducing the resources allocatable to pods of priority p or lower. +func MarkAllocated(m map[int32]ResourceList, p int32, rs ResourceList) { + MarkAllocatable(m, p, rs.Negate()) +} + +// MarkAllocatable indicates resources have been released by pods of priority p, +// thus increasing the resources allocatable to pods of priority p or lower. +func MarkAllocatable(m map[int32]ResourceList, p int32, rs ResourceList) { + for priority, allocatableResourcesAtPriority := range m { + if priority <= p { + m[priority] = allocatableResourcesAtPriority.Add(rs) + } + } +} diff --git a/internal/scheduler/internaltypes/resource_list_map_util_test.go b/internal/scheduler/internaltypes/resource_list_map_util_test.go index f278eeefd72..85457c42a1a 100644 --- a/internal/scheduler/internaltypes/resource_list_map_util_test.go +++ b/internal/scheduler/internaltypes/resource_list_map_util_test.go @@ -78,6 +78,56 @@ func TestRlMapRemoveZeros(t *testing.T) { assert.Equal(t, expected, RlMapRemoveZeros(input)) } +func TestNewAllocatableByPriorityAndResourceType(t *testing.T) { + factory := testFactory() + rl := testResourceList(factory, "2", "2Ki") + + result := NewAllocatableByPriorityAndResourceType([]int32{1, 2}, rl) + assert.Equal(t, 2, len(result)) + assert.Equal(t, int64(2000), result[1].GetByNameZeroIfMissing("cpu")) + assert.Equal(t, int64(2000), result[2].GetByNameZeroIfMissing("cpu")) +} + +func TestMarkAllocated(t *testing.T) { + factory := testFactory() + m := map[int32]ResourceList{ + 1: testResourceList(factory, "10", "10Gi"), + 2: testResourceList(factory, "20", "20Gi"), + 3: testResourceList(factory, "30", "30Gi"), + 4: testResourceList(factory, "40", "40Gi"), + } + + expected := map[int32]ResourceList{ + 1: testResourceList(factory, "8", "8Gi"), + 2: testResourceList(factory, "18", "18Gi"), + 3: testResourceList(factory, "30", "30Gi"), + 4: testResourceList(factory, "40", "40Gi"), + } + + MarkAllocated(m, 2, testResourceList(factory, "2", "2Gi")) + assert.Equal(t, expected, m) +} + +func TestMarkAllocatable(t *testing.T) { + factory := testFactory() + m := map[int32]ResourceList{ + 1: testResourceList(factory, "10", "10Gi"), + 2: testResourceList(factory, "20", "20Gi"), + 3: testResourceList(factory, "30", "30Gi"), + 4: testResourceList(factory, "40", "40Gi"), + } + + expected := map[int32]ResourceList{ + 1: testResourceList(factory, "12", "12Gi"), + 2: testResourceList(factory, "22", "22Gi"), + 3: testResourceList(factory, "30", "30Gi"), + 4: testResourceList(factory, "40", "40Gi"), + } + + MarkAllocatable(m, 2, testResourceList(factory, "2", "2Gi")) + assert.Equal(t, expected, m) +} + func testMapAllPositive(factory *ResourceListFactory) map[string]ResourceList { return map[string]ResourceList{ "a": testResourceList(factory, "1", "1Ki"), diff --git a/internal/scheduler/nodedb/nodedb_test.go b/internal/scheduler/nodedb/nodedb_test.go index d72f8d5cbd5..5e87f425876 100644 --- a/internal/scheduler/nodedb/nodedb_test.go +++ b/internal/scheduler/nodedb/nodedb_test.go @@ -28,53 +28,45 @@ func TestNodeDbSchema(t *testing.T) { // Test the accounting of total resources across all nodes. func TestTotalResources(t *testing.T) { - nodeDb, err := newNodeDbWithNodes([]*schedulerobjects.Node{}) + nodeDb, err := newNodeDbWithNodes([]*internaltypes.Node{}) require.NoError(t, err) assert.False(t, nodeDb.TotalKubernetesResources().IsEmpty()) assert.True(t, nodeDb.TotalKubernetesResources().AllZero()) - expected := schedulerobjects.ResourceList{Resources: make(map[string]resource.Quantity)} + expected := testfixtures.TestNodeFactory.ResourceListFactory().MakeAllZero() // Upserting nodes for the first time should increase the resource count. - nodes := testfixtures.N32CpuNodes(2, testfixtures.TestPriorities) + nodes := testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities) for _, node := range nodes { - expected.Add(node.TotalResources) + expected = expected.Add(node.GetTotalResources()) } txn := nodeDb.Txn(true) for _, node := range nodes { - dbNode, err := testfixtures.TestNodeFactory.FromSchedulerObjectsNode(node) - require.NoError(t, err) - err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, dbNode) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node) require.NoError(t, err) } txn.Commit() - assert.True(t, expected.Equal(schedulerobjects.ResourceList{ - Resources: nodeDb.TotalKubernetesResources().ToMap(), - })) + assert.True(t, expected.Equal(nodeDb.TotalKubernetesResources())) // Upserting new nodes should increase the resource count. - nodes = testfixtures.N8GpuNodes(3, testfixtures.TestPriorities) + nodes = testfixtures.ItN8GpuNodes(3, testfixtures.TestPriorities) for _, node := range nodes { - expected.Add(node.TotalResources) + expected = expected.Add(node.GetTotalResources()) } txn = nodeDb.Txn(true) for _, node := range nodes { - dbNode, err := testfixtures.TestNodeFactory.FromSchedulerObjectsNode(node) - require.NoError(t, err) - err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, dbNode) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node) require.NoError(t, err) } txn.Commit() - assert.True(t, expected.Equal(schedulerobjects.ResourceList{ - Resources: nodeDb.TotalKubernetesResources().ToMap(), - })) + assert.True(t, expected.Equal(nodeDb.TotalKubernetesResources())) } func TestSelectNodeForPod_NodeIdLabel_Success(t *testing.T) { - nodes := testfixtures.N32CpuNodes(2, testfixtures.TestPriorities) - nodeId := nodes[1].Id + nodes := testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities) + nodeId := nodes[1].GetId() require.NotEmpty(t, nodeId) db, err := newNodeDbWithNodes(nodes) require.NoError(t, err) @@ -98,8 +90,8 @@ func TestSelectNodeForPod_NodeIdLabel_Success(t *testing.T) { } func TestSelectNodeForPod_NodeIdLabel_Failure(t *testing.T) { - nodes := testfixtures.N32CpuNodes(1, testfixtures.TestPriorities) - nodeId := nodes[0].Id + nodes := testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities) + nodeId := nodes[0].GetId() require.NotEmpty(t, nodeId) db, err := newNodeDbWithNodes(nodes) require.NoError(t, err) @@ -123,10 +115,10 @@ func TestSelectNodeForPod_NodeIdLabel_Failure(t *testing.T) { } func TestNodeBindingEvictionUnbinding(t *testing.T) { - node := testfixtures.Test8GpuNode(testfixtures.TestPriorities) - nodeDb, err := newNodeDbWithNodes([]*schedulerobjects.Node{node}) + node := testfixtures.ItTest8GpuNode(testfixtures.TestPriorities) + nodeDb, err := newNodeDbWithNodes([]*internaltypes.Node{node}) require.NoError(t, err) - entry, err := nodeDb.GetNode(node.Id) + entry, err := nodeDb.GetNode(node.GetId()) require.NoError(t, err) jobFilter := func(job *jobdb.Job) bool { return true } @@ -280,20 +272,19 @@ func TestEviction(t *testing.T) { } for name, tc := range tests { t.Run(name, func(t *testing.T) { - nodeDb, err := newNodeDbWithNodes([]*schedulerobjects.Node{}) + nodeDb, err := newNodeDbWithNodes([]*internaltypes.Node{}) require.NoError(t, err) txn := nodeDb.Txn(true) jobs := []*jobdb.Job{ testfixtures.Test1Cpu4GiJob("queue-alice", testfixtures.PriorityClass0), testfixtures.Test1Cpu4GiJob("queue-alice", testfixtures.PriorityClass3), } - node := testfixtures.Test32CpuNode(testfixtures.TestPriorities) - dbNode, err := testfixtures.TestNodeFactory.FromSchedulerObjectsNode(node) + node := testfixtures.ItTest32CpuNode(testfixtures.TestPriorities) require.NoError(t, err) - err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, jobs, dbNode) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, jobs, node) txn.Commit() require.NoError(t, err) - entry, err := nodeDb.GetNode(node.Id) + entry, err := nodeDb.GetNode(node.GetId()) require.NoError(t, err) existingJobs := make([]*jobdb.Job, len(jobs)) @@ -313,27 +304,27 @@ func TestEviction(t *testing.T) { func TestScheduleIndividually(t *testing.T) { tests := map[string]struct { - Nodes []*schedulerobjects.Node + Nodes []*internaltypes.Node Jobs []*jobdb.Job ExpectSuccess []bool }{ "all jobs fit": { - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 32), ExpectSuccess: testfixtures.Repeat(true, 32), }, "not all jobs fit": { - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 33), ExpectSuccess: append(testfixtures.Repeat(true, 32), testfixtures.Repeat(false, 1)...), }, "unavailable resource": { - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: testfixtures.N1GpuJobs("A", testfixtures.PriorityClass0, 1), ExpectSuccess: testfixtures.Repeat(false, 1), }, "unsupported resource": { - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: testfixtures.WithRequestsJobs( schedulerobjects.ResourceList{ Resources: map[string]resource.Quantity{ @@ -345,7 +336,7 @@ func TestScheduleIndividually(t *testing.T) { ExpectSuccess: testfixtures.Repeat(true, 1), // we ignore unknown resource types on jobs, should never happen in practice anyway as these should fail earlier. }, "preemption": { - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: append( append( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 32), @@ -356,7 +347,7 @@ func TestScheduleIndividually(t *testing.T) { ExpectSuccess: append(testfixtures.Repeat(true, 64), testfixtures.Repeat(false, 32)...), }, "taints/tolerations": { - Nodes: testfixtures.NTainted32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItNTainted32CpuNodes(1, testfixtures.TestPriorities), Jobs: append( append( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), @@ -367,15 +358,13 @@ func TestScheduleIndividually(t *testing.T) { ExpectSuccess: []bool{false, false, true}, }, "node selector": { - Nodes: append( - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), - testfixtures.WithLabelsNodes( + Nodes: append(testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), map[string]string{ "key": "value", }, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), - )..., - ), + )...), Jobs: testfixtures.WithNodeSelectorJobs( map[string]string{ "key": "value", @@ -385,11 +374,11 @@ func TestScheduleIndividually(t *testing.T) { ExpectSuccess: append(testfixtures.Repeat(true, 32), testfixtures.Repeat(false, 1)...), }, "node selector with mismatched value": { - Nodes: testfixtures.WithLabelsNodes( + Nodes: testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), map[string]string{ "key": "value", }, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), ), Jobs: testfixtures.WithNodeSelectorJobs( map[string]string{ @@ -400,7 +389,7 @@ func TestScheduleIndividually(t *testing.T) { ExpectSuccess: testfixtures.Repeat(false, 1), }, "node selector with missing label": { - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: testfixtures.WithNodeSelectorJobs( map[string]string{ "this label does not exist": "value", @@ -411,12 +400,12 @@ func TestScheduleIndividually(t *testing.T) { }, "node affinity": { Nodes: append( - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), - testfixtures.WithLabelsNodes( + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), map[string]string{ "key": "value", }, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), )..., ), Jobs: testfixtures.WithNodeAffinityJobs( @@ -485,7 +474,7 @@ func TestScheduleMany(t *testing.T) { tests := map[string]struct { // Nodes to schedule across. - Nodes []*schedulerobjects.Node + Nodes []*internaltypes.Node // Schedule one group of jobs at a time. // Each group is composed of a slice of pods. Jobs [][]*jobdb.Job @@ -494,18 +483,18 @@ func TestScheduleMany(t *testing.T) { }{ // Attempts to schedule 32. All jobs get scheduled. "simple success": { - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: [][]*jobdb.Job{gangSuccess}, ExpectSuccess: []bool{true}, }, // Attempts to schedule 33 jobs. The overall result fails. "simple failure with min cardinality": { - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: [][]*jobdb.Job{gangFailure}, ExpectSuccess: []bool{false}, }, "correct rollback": { - Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), Jobs: [][]*jobdb.Job{ gangSuccess, gangFailure, @@ -514,7 +503,7 @@ func TestScheduleMany(t *testing.T) { ExpectSuccess: []bool{true, false, true}, }, "varying job size": { - Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), Jobs: [][]*jobdb.Job{ append( testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 1), @@ -568,18 +557,18 @@ func TestAwayNodeTypes(t *testing.T) { require.NoError(t, err) nodeDbTxn := nodeDb.Txn(true) - node := testfixtures.Test32CpuNode([]int32{29000, 30000}) - node.Taints = append( - node.Taints, - v1.Taint{ - Key: "gpu", - Value: "true", - Effect: v1.TaintEffectNoSchedule, + node := testfixtures.ItTest32CpuNode([]int32{29000, 30000}) + node = testfixtures.TestNodeFactory.AddTaints( + []*internaltypes.Node{node}, + []v1.Taint{ + { + Key: "gpu", + Value: "true", + Effect: v1.TaintEffectNoSchedule, + }, }, - ) - dbNode, err := testfixtures.TestNodeFactory.FromSchedulerObjectsNode(node) - require.NoError(t, err) - require.NoError(t, nodeDb.CreateAndInsertWithJobDbJobsWithTxn(nodeDbTxn, nil, dbNode)) + )[0] + require.NoError(t, nodeDb.CreateAndInsertWithJobDbJobsWithTxn(nodeDbTxn, nil, node)) jobId := util.ULID() job := testfixtures.TestJob( @@ -724,7 +713,7 @@ func TestMakeIndexedResourceResolution_ErrorsOnInvalidResolution(t *testing.T) { assert.Nil(t, result) } -func benchmarkUpsert(nodes []*schedulerobjects.Node, b *testing.B) { +func benchmarkUpsert(nodes []*internaltypes.Node, b *testing.B) { nodeDb, err := NewNodeDb( testfixtures.TestPriorityClasses, testfixtures.TestResources, @@ -737,11 +726,9 @@ func benchmarkUpsert(nodes []*schedulerobjects.Node, b *testing.B) { txn := nodeDb.Txn(true) entries := make([]*internaltypes.Node, len(nodes)) for i, node := range nodes { - dbNode, err := testfixtures.TestNodeFactory.FromSchedulerObjectsNode(node) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node) require.NoError(b, err) - err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, dbNode) - require.NoError(b, err) - entry, err := nodeDb.GetNode(node.Id) + entry, err := nodeDb.GetNode(node.GetId()) require.NoError(b, err) entries[i] = entry } @@ -754,18 +741,18 @@ func benchmarkUpsert(nodes []*schedulerobjects.Node, b *testing.B) { } func BenchmarkUpsert1(b *testing.B) { - benchmarkUpsert(testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), b) + benchmarkUpsert(testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), b) } func BenchmarkUpsert1000(b *testing.B) { - benchmarkUpsert(testfixtures.N32CpuNodes(1000, testfixtures.TestPriorities), b) + benchmarkUpsert(testfixtures.ItN32CpuNodes(1000, testfixtures.TestPriorities), b) } func BenchmarkUpsert100000(b *testing.B) { - benchmarkUpsert(testfixtures.N32CpuNodes(100000, testfixtures.TestPriorities), b) + benchmarkUpsert(testfixtures.ItN32CpuNodes(100000, testfixtures.TestPriorities), b) } -func benchmarkScheduleMany(b *testing.B, nodes []*schedulerobjects.Node, jobs []*jobdb.Job) { +func benchmarkScheduleMany(b *testing.B, nodes []*internaltypes.Node, jobs []*jobdb.Job) { nodeDb, err := NewNodeDb( testfixtures.TestPriorityClasses, testfixtures.TestResources, @@ -777,9 +764,7 @@ func benchmarkScheduleMany(b *testing.B, nodes []*schedulerobjects.Node, jobs [] require.NoError(b, err) txn := nodeDb.Txn(true) for _, node := range nodes { - dbNode, err := testfixtures.TestNodeFactory.FromSchedulerObjectsNode(node) - require.NoError(b, err) - err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, dbNode) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node) require.NoError(b, err) } txn.Commit() @@ -798,7 +783,7 @@ func benchmarkScheduleMany(b *testing.B, nodes []*schedulerobjects.Node, jobs [] func BenchmarkScheduleMany10CpuNodes320SmallJobs(b *testing.B) { benchmarkScheduleMany( b, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 320), ) } @@ -806,7 +791,7 @@ func BenchmarkScheduleMany10CpuNodes320SmallJobs(b *testing.B) { func BenchmarkScheduleMany10CpuNodes640SmallJobs(b *testing.B) { benchmarkScheduleMany( b, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 640), ) } @@ -814,7 +799,7 @@ func BenchmarkScheduleMany10CpuNodes640SmallJobs(b *testing.B) { func BenchmarkScheduleMany100CpuNodes3200SmallJobs(b *testing.B) { benchmarkScheduleMany( b, - testfixtures.N32CpuNodes(100, testfixtures.TestPriorities), + testfixtures.ItN32CpuNodes(100, testfixtures.TestPriorities), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 3200), ) } @@ -822,7 +807,7 @@ func BenchmarkScheduleMany100CpuNodes3200SmallJobs(b *testing.B) { func BenchmarkScheduleMany100CpuNodes6400SmallJobs(b *testing.B) { benchmarkScheduleMany( b, - testfixtures.N32CpuNodes(100, testfixtures.TestPriorities), + testfixtures.ItN32CpuNodes(100, testfixtures.TestPriorities), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 6400), ) } @@ -830,7 +815,7 @@ func BenchmarkScheduleMany100CpuNodes6400SmallJobs(b *testing.B) { func BenchmarkScheduleMany1000CpuNodes32000SmallJobs(b *testing.B) { benchmarkScheduleMany( b, - testfixtures.N32CpuNodes(1000, testfixtures.TestPriorities), + testfixtures.ItN32CpuNodes(1000, testfixtures.TestPriorities), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 32000), ) } @@ -838,7 +823,7 @@ func BenchmarkScheduleMany1000CpuNodes32000SmallJobs(b *testing.B) { func BenchmarkScheduleMany1000CpuNodes64000SmallJobs(b *testing.B) { benchmarkScheduleMany( b, - testfixtures.N32CpuNodes(1000, testfixtures.TestPriorities), + testfixtures.ItN32CpuNodes(1000, testfixtures.TestPriorities), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 64000), ) } @@ -846,10 +831,10 @@ func BenchmarkScheduleMany1000CpuNodes64000SmallJobs(b *testing.B) { func BenchmarkScheduleMany100CpuNodes1CpuUnused(b *testing.B) { benchmarkScheduleMany( b, - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("31")}}, - testfixtures.N32CpuNodes(100, testfixtures.TestPriorities), + cpu("31"), + testfixtures.ItN32CpuNodes(100, testfixtures.TestPriorities), ), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 100), ) @@ -858,10 +843,10 @@ func BenchmarkScheduleMany100CpuNodes1CpuUnused(b *testing.B) { func BenchmarkScheduleMany1000CpuNodes1CpuUnused(b *testing.B) { benchmarkScheduleMany( b, - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("31")}}, - testfixtures.N32CpuNodes(1000, testfixtures.TestPriorities), + cpu("31"), + testfixtures.ItN32CpuNodes(1000, testfixtures.TestPriorities), ), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1000), ) @@ -870,10 +855,10 @@ func BenchmarkScheduleMany1000CpuNodes1CpuUnused(b *testing.B) { func BenchmarkScheduleMany10000CpuNodes1CpuUnused(b *testing.B) { benchmarkScheduleMany( b, - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("31")}}, - testfixtures.N32CpuNodes(10000, testfixtures.TestPriorities), + cpu("31"), + testfixtures.ItN32CpuNodes(10000, testfixtures.TestPriorities), ), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 10000), ) @@ -881,9 +866,9 @@ func BenchmarkScheduleMany10000CpuNodes1CpuUnused(b *testing.B) { func BenchmarkScheduleManyResourceConstrained(b *testing.B) { nodes := append(append( - testfixtures.N32CpuNodes(500, testfixtures.TestPriorities), - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities)...), - testfixtures.N32CpuNodes(499, testfixtures.TestPriorities)..., + testfixtures.ItN32CpuNodes(500, testfixtures.TestPriorities), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities)...), + testfixtures.ItN32CpuNodes(499, testfixtures.TestPriorities)..., ) benchmarkScheduleMany( b, @@ -892,7 +877,7 @@ func BenchmarkScheduleManyResourceConstrained(b *testing.B) { ) } -func newNodeDbWithNodes(nodes []*schedulerobjects.Node) (*NodeDb, error) { +func newNodeDbWithNodes(nodes []*internaltypes.Node) (*NodeDb, error) { nodeDb, err := NewNodeDb( testfixtures.TestPriorityClasses, testfixtures.TestResources, @@ -906,11 +891,7 @@ func newNodeDbWithNodes(nodes []*schedulerobjects.Node) (*NodeDb, error) { } txn := nodeDb.Txn(true) for _, node := range nodes { - dbNode, err := testfixtures.TestNodeFactory.FromSchedulerObjectsNode(node) - if err != nil { - return nil, err - } - if err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, dbNode); err != nil { + if err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node); err != nil { return nil, err } } diff --git a/internal/scheduler/nodedb/nodeiteration_test.go b/internal/scheduler/nodedb/nodeiteration_test.go index 8ab626577dd..aa699293139 100644 --- a/internal/scheduler/nodedb/nodeiteration_test.go +++ b/internal/scheduler/nodedb/nodeiteration_test.go @@ -15,29 +15,28 @@ import ( "github.com/armadaproject/armada/internal/common/util" "github.com/armadaproject/armada/internal/scheduler/internaltypes" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" "github.com/armadaproject/armada/internal/scheduler/testfixtures" ) func TestNodesIterator(t *testing.T) { tests := map[string]struct { - Nodes []*schedulerobjects.Node + Nodes []*internaltypes.Node }{ "1 node": { - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), }, "0 nodes": { - Nodes: testfixtures.N32CpuNodes(0, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(0, testfixtures.TestPriorities), }, "3 nodes": { - Nodes: testfixtures.N32CpuNodes(3, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(3, testfixtures.TestPriorities), }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { indexById := make(map[string]int) for i, node := range tc.Nodes { - indexById[node.Id] = i + indexById[node.GetId()] = i } nodeDb, err := newNodeDbWithNodes(tc.Nodes) if !assert.NoError(t, err) { @@ -49,10 +48,10 @@ func TestNodesIterator(t *testing.T) { } sortedNodes := slices.Clone(tc.Nodes) - slices.SortFunc(sortedNodes, func(a, b *schedulerobjects.Node) int { - if a.Id < b.Id { + slices.SortFunc(sortedNodes, func(a, b *internaltypes.Node) int { + if a.GetId() < b.GetId() { return -1 - } else if a.Id > b.Id { + } else if a.GetId() > b.GetId() { return 1 } else { return 0 @@ -60,7 +59,7 @@ func TestNodesIterator(t *testing.T) { }) expected := make([]int, len(sortedNodes)) for i, node := range sortedNodes { - expected[i] = indexById[node.Id] + expected[i] = indexById[node.GetId()] } actual := make([]int, 0) @@ -74,306 +73,237 @@ func TestNodesIterator(t *testing.T) { } func TestNodeTypeIterator(t *testing.T) { - const nodeTypeALabel = "a" - const nodeTypeBLabel = "b" - - nodeTypeAId := nodeTypeLabelToNodeTypeId(nodeTypeALabel) - gpuNodeTypeAId := gpuNodeTypeLabelToNodeTypeId(nodeTypeALabel) + nodeTypeA := labelsToNodeType(map[string]string{testfixtures.NodeTypeLabel: "a"}) + nodeTypeB := labelsToNodeType(map[string]string{testfixtures.NodeTypeLabel: "b"}) tests := map[string]struct { - nodes []*schedulerobjects.Node + nodes []*internaltypes.Node nodeTypeId uint64 priority int32 - resourceRequests schedulerobjects.ResourceList + resourceRequests internaltypes.ResourceList expected []int }{ "only yield nodes of the right nodeType": { nodes: armadaslices.Concatenate( - withNodeTypeNodes( - nodeTypeALabel, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.ItWithNodeTypeNodes( + nodeTypeA, + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - withNodeTypeNodes( - nodeTypeBLabel, - testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + testfixtures.ItWithNodeTypeNodes( + nodeTypeB, + testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), ), - withNodeTypeNodes( - nodeTypeALabel, - testfixtures.N32CpuNodes(3, testfixtures.TestPriorities), + testfixtures.ItWithNodeTypeNodes( + nodeTypeA, + testfixtures.ItN32CpuNodes(3, testfixtures.TestPriorities), ), ), - nodeTypeId: nodeTypeAId, + nodeTypeId: nodeTypeA.GetId(), priority: 0, - resourceRequests: schedulerobjects.ResourceList{}, + resourceRequests: testfixtures.TestResourceListFactory.MakeAllZero(), expected: armadaslices.Concatenate( testfixtures.IntRange(0, 0), testfixtures.IntRange(3, 5), ), }, "filter nodes with insufficient resources and return in increasing order": { - nodes: withNodeTypeNodes( - nodeTypeALabel, + nodes: testfixtures.ItWithNodeTypeNodes( + nodeTypeA, armadaslices.Concatenate( - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("15")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("15"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("16"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("17")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("17"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), ), - nodeTypeId: nodeTypeAId, + nodeTypeId: nodeTypeA.GetId(), priority: 0, - resourceRequests: schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, + resourceRequests: cpu("16"), expected: []int{1, 0}, }, "filter nodes with insufficient resources at priority and return in increasing order": { - nodes: withNodeTypeNodes( - nodeTypeALabel, + nodes: testfixtures.ItWithNodeTypeNodes( + nodeTypeA, armadaslices.Concatenate( - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("15")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("15"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("16"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("17")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("17"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 1, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("15")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("15"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 1, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("16"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 1, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("17")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("17"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 2, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("15")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("15"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 2, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("16"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 2, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("17")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("17"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), ), - nodeTypeId: nodeTypeAId, + nodeTypeId: nodeTypeA.GetId(), priority: 1, - resourceRequests: schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, + resourceRequests: cpu("16"), expected: []int{4, 7, 3, 6, 0, 1, 2}, }, "nested ordering": { - nodes: withNodeTypeNodes( - nodeTypeALabel, + nodes: testfixtures.ItWithNodeTypeNodes( + nodeTypeA, armadaslices.Concatenate( - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("15"), - "memory": resource.MustParse("1Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("15", "1Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("15"), - "memory": resource.MustParse("2Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("15", "2Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("15"), - "memory": resource.MustParse("129Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("15", "129Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("15"), - "memory": resource.MustParse("130Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("15", "130Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("15"), - "memory": resource.MustParse("131Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("15", "131Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("16"), - "memory": resource.MustParse("130Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("16", "130Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("16"), - "memory": resource.MustParse("128Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("16", "128Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("16"), - "memory": resource.MustParse("129Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("16", "129Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("17"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("17"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), ), - nodeTypeId: nodeTypeAId, - priority: 0, - resourceRequests: schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("16"), - "memory": resource.MustParse("128Gi"), - }}, - expected: []int{6, 1, 0}, + nodeTypeId: nodeTypeA.GetId(), + priority: 0, + resourceRequests: cpuMem("16", "128Gi"), + expected: []int{6, 1, 0}, }, "double-nested ordering": { - nodes: withNodeTypeNodes( - nodeTypeALabel, + nodes: testfixtures.ItWithNodeTypeNodes( + nodeTypeA, armadaslices.Concatenate( - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("1Gi"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMem("31", "1Gi"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("1Gi"), - "nvidia.com/gpu": resource.MustParse("1"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMemGpu("31", "1Gi", "1"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("1Gi"), - "nvidia.com/gpu": resource.MustParse("2"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMemGpu("31", "1Gi", "2"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("1Gi"), - "nvidia.com/gpu": resource.MustParse("5"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMemGpu("31", "1Gi", "5"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("2Gi"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMem("31", "2Gi"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("2Gi"), - "nvidia.com/gpu": resource.MustParse("1"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMemGpu("31", "2Gi", "1"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("32"), - "memory": resource.MustParse("514Gi"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMem("32", "514Gi"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("32"), - "memory": resource.MustParse("512Gi"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMem("32", "512Gi"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("32"), - "memory": resource.MustParse("513Gi"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMem("32", "513Gi"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("33"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpu("33"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), ), ), - nodeTypeId: gpuNodeTypeAId, - priority: 0, - resourceRequests: schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("32"), - "memory": resource.MustParse("512Gi"), - "nvidia.com/gpu": resource.MustParse("4"), - }}, - expected: []int{7, 5, 4, 2, 1, 0}, + nodeTypeId: nodeTypeA.GetId(), + priority: 0, + resourceRequests: cpuMemGpu("32", "512Gi", "4"), + expected: []int{7, 5, 4, 2, 1, 0}, }, } for name, tc := range tests { @@ -384,27 +314,19 @@ func TestNodeTypeIterator(t *testing.T) { entries := make([]*internaltypes.Node, len(tc.nodes)) for i, node := range tc.nodes { // Set monotonically increasing node IDs to ensure nodes appear in predictable order. - node.Id = fmt.Sprintf("%d", i) - - entry, err := internaltypes.FromSchedulerObjectsNode(node, - uint64(i), - nodeDb.indexedTaints, - nodeDb.indexedNodeLabels, - nodeDb.resourceListFactory) - - require.NoError(t, err) + newNodeId := fmt.Sprintf("%d", i) + entry := testfixtures.ItWithIdNodes(newNodeId, []*internaltypes.Node{node})[0] nodeDb.AddNodeToDb(entry) - entries[i] = entry } require.NoError(t, nodeDb.UpsertMany(entries)) indexedResourceRequests := make([]int64, len(testfixtures.TestResources)) - rr, err := testfixtures.TestResourceListFactory.FromJobResourceListFailOnUnknown(tc.resourceRequests.Resources) + assert.Nil(t, err) for i, resourceName := range nodeDb.indexedResources { - indexedResourceRequests[i], err = rr.GetByName(resourceName) + indexedResourceRequests[i], err = tc.resourceRequests.GetByName(resourceName) assert.Nil(t, err) } keyIndex := -1 @@ -452,44 +374,36 @@ func TestNodeTypeIterator(t *testing.T) { } func TestNodeTypesIterator(t *testing.T) { - const nodeTypeALabel = "a" - const nodeTypeBLabel = "b" - const nodeTypeCLabel = "c" - const nodeTypeDLabel = "d" - - nodeTypeAId := nodeTypeLabelToNodeTypeId(nodeTypeALabel) - nodeTypeBId := nodeTypeLabelToNodeTypeId(nodeTypeBLabel) - nodeTypeCId := nodeTypeLabelToNodeTypeId(nodeTypeCLabel) - - gpuNodeTypeAId := gpuNodeTypeLabelToNodeTypeId(nodeTypeALabel) - gpuNodeTypeBId := gpuNodeTypeLabelToNodeTypeId(nodeTypeBLabel) - gpuNodeTypeCId := gpuNodeTypeLabelToNodeTypeId(nodeTypeCLabel) + nodeTypeA := labelsToNodeType(map[string]string{testfixtures.NodeTypeLabel: "a"}) + nodeTypeB := labelsToNodeType(map[string]string{testfixtures.NodeTypeLabel: "b"}) + nodeTypeC := labelsToNodeType(map[string]string{testfixtures.NodeTypeLabel: "c"}) + nodeTypeD := labelsToNodeType(map[string]string{testfixtures.NodeTypeLabel: "d"}) tests := map[string]struct { - nodes []*schedulerobjects.Node + nodes []*internaltypes.Node nodeTypeIds []uint64 priority int32 - resourceRequests schedulerobjects.ResourceList + resourceRequests internaltypes.ResourceList expected []int }{ "only yield nodes of the right nodeType": { nodes: armadaslices.Concatenate( - withNodeTypeNodes( - nodeTypeALabel, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.ItWithNodeTypeNodes( + nodeTypeA, + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - withNodeTypeNodes( - nodeTypeBLabel, - testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + testfixtures.ItWithNodeTypeNodes( + nodeTypeB, + testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), ), - withNodeTypeNodes( - nodeTypeCLabel, - testfixtures.N32CpuNodes(3, testfixtures.TestPriorities), + testfixtures.ItWithNodeTypeNodes( + nodeTypeC, + testfixtures.ItN32CpuNodes(3, testfixtures.TestPriorities), ), ), - nodeTypeIds: []uint64{nodeTypeAId, nodeTypeCId}, + nodeTypeIds: []uint64{nodeTypeA.GetId(), nodeTypeC.GetId()}, priority: 0, - resourceRequests: schedulerobjects.ResourceList{}, + resourceRequests: testfixtures.TestResourceListFactory.MakeAllZero(), expected: armadaslices.Concatenate( testfixtures.IntRange(0, 0), testfixtures.IntRange(3, 5), @@ -497,294 +411,227 @@ func TestNodeTypesIterator(t *testing.T) { }, "filter nodes with insufficient resources and return in increasing order": { nodes: armadaslices.Concatenate( - withNodeTypeNodes( - nodeTypeALabel, - testfixtures.WithUsedResourcesNodes( - 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("15")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.ItWithNodeTypeNodes( + nodeTypeA, + testfixtures.ItWithUsedResourcesNodes(0, + cpu("15"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), - withNodeTypeNodes( - nodeTypeBLabel, - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithNodeTypeNodes( + nodeTypeB, + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("16"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), - withNodeTypeNodes( - nodeTypeCLabel, - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithNodeTypeNodes( + nodeTypeC, + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("17")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("17"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), - withNodeTypeNodes( - nodeTypeDLabel, - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithNodeTypeNodes( + nodeTypeD, + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("14")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("14"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), ), - nodeTypeIds: []uint64{nodeTypeAId, nodeTypeBId, nodeTypeCId}, + nodeTypeIds: []uint64{nodeTypeA.GetId(), nodeTypeB.GetId(), nodeTypeC.GetId()}, priority: 0, - resourceRequests: schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, + resourceRequests: cpu("16"), expected: []int{1, 0}, }, "filter nodes with insufficient resources at priority and return in increasing order": { - nodes: withNodeTypeNodes( - nodeTypeALabel, + nodes: testfixtures.ItWithNodeTypeNodes( + nodeTypeA, armadaslices.Concatenate( - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("15")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("15"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("16"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("17")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("17"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 1, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("15")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("15"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 1, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("16"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 1, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("17")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("17"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 2, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("15")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("15"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 2, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("16"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 2, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("17")}}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("17"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), ), - nodeTypeIds: []uint64{nodeTypeAId}, + nodeTypeIds: []uint64{nodeTypeA.GetId()}, priority: 1, - resourceRequests: schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("16")}}, + resourceRequests: cpu("16"), expected: []int{4, 7, 3, 6, 0, 1, 2}, }, "nested ordering": { - nodes: withNodeTypeNodes( - nodeTypeALabel, + nodes: testfixtures.ItWithNodeTypeNodes( + nodeTypeA, armadaslices.Concatenate( - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("15"), - "memory": resource.MustParse("1Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("15", "1Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("15"), - "memory": resource.MustParse("2Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("15", "2Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("15"), - "memory": resource.MustParse("129Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("15", "129Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("15"), - "memory": resource.MustParse("130Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("15", "130Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("15"), - "memory": resource.MustParse("131Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("15", "131Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("16"), - "memory": resource.MustParse("130Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("16", "130Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("16"), - "memory": resource.MustParse("128Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("16", "128Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("16"), - "memory": resource.MustParse("129Gi"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpuMem("16", "129Gi"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("17"), - }}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + cpu("17"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), ), - nodeTypeIds: []uint64{nodeTypeAId}, - priority: 0, - resourceRequests: schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("16"), - "memory": resource.MustParse("128Gi"), - }}, - expected: []int{6, 1, 0}, + nodeTypeIds: []uint64{nodeTypeA.GetId()}, + priority: 0, + resourceRequests: cpuMem("16", "128Gi"), + expected: []int{6, 1, 0}, }, "double-nested ordering": { nodes: armadaslices.Concatenate( - withNodeTypeNodes( - nodeTypeALabel, + testfixtures.ItWithNodeTypeNodes( + nodeTypeA, armadaslices.Concatenate( - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("1Gi"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMem("31", "1Gi"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("1Gi"), - "nvidia.com/gpu": resource.MustParse("1"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMemGpu("31", "1Gi", "1"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("1Gi"), - "nvidia.com/gpu": resource.MustParse("2"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMemGpu("31", "1Gi", "2"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("1Gi"), - "nvidia.com/gpu": resource.MustParse("5"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMemGpu("31", "1Gi", "5"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), ), ), - withNodeTypeNodes( - nodeTypeBLabel, + testfixtures.ItWithNodeTypeNodes( + nodeTypeB, armadaslices.Concatenate( - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("2Gi"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMem("31", "2Gi"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - "memory": resource.MustParse("2Gi"), - "nvidia.com/gpu": resource.MustParse("1"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMemGpu("31", "2Gi", "1"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("32"), - "memory": resource.MustParse("514Gi"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMem("32", "514Gi"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("32"), - "memory": resource.MustParse("512Gi"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMem("32", "512Gi"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), ), ), - withNodeTypeNodes( - nodeTypeCLabel, + testfixtures.ItWithNodeTypeNodes( + nodeTypeC, armadaslices.Concatenate( - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("32"), - "memory": resource.MustParse("513Gi"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpuMem("32", "513Gi"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("33"), - }}, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + cpu("33"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), ), ), ), - nodeTypeIds: []uint64{gpuNodeTypeAId, gpuNodeTypeBId, gpuNodeTypeCId}, - priority: 0, - resourceRequests: schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("32"), - "memory": resource.MustParse("512Gi"), - "nvidia.com/gpu": resource.MustParse("4"), - }}, - expected: []int{7, 5, 4, 2, 1, 0}, + nodeTypeIds: []uint64{nodeTypeA.GetId(), nodeTypeB.GetId(), nodeTypeC.GetId()}, + priority: 0, + resourceRequests: cpuMemGpu("32", "512Gi", "4"), + expected: []int{7, 5, 4, 2, 1, 0}, }, } for name, tc := range tests { @@ -795,13 +642,9 @@ func TestNodeTypesIterator(t *testing.T) { entries := make([]*internaltypes.Node, len(tc.nodes)) for i, node := range tc.nodes { // Set monotonically increasing node IDs to ensure nodes appear in predictable order. - node.Id = fmt.Sprintf("%d", i) - - entry, err := internaltypes.FromSchedulerObjectsNode(node, - uint64(i), - nodeDb.indexedTaints, - nodeDb.indexedNodeLabels, - nodeDb.resourceListFactory) + nodeId := fmt.Sprintf("%d", i) + entry := testfixtures.ItWithIdNodes(nodeId, []*internaltypes.Node{node})[0] + entry = testfixtures.ItWithIndexNode(uint64(i), entry) require.NoError(t, err) @@ -811,8 +654,7 @@ func TestNodeTypesIterator(t *testing.T) { } require.NoError(t, nodeDb.UpsertMany(entries)) - rr, err := testfixtures.TestResourceListFactory.FromJobResourceListFailOnUnknown(tc.resourceRequests.Resources) - assert.Nil(t, err) + rr := tc.resourceRequests indexedResourceRequests := make([]int64, len(testfixtures.TestResources)) for i, resourceName := range testfixtures.TestResourceNames { @@ -862,14 +704,14 @@ func BenchmarkNodeTypeIterator(b *testing.B) { 2, 2100, 2200, 2300, 2400, 2500, 2600, 2700, 2800, 2900, 3, 4, 5, 6, 7, 8, 9, } - nodes := testfixtures.N32CpuNodes(numNodes, testfixtures.TestPriorities) + nodes := testfixtures.ItN32CpuNodes(numNodes, testfixtures.TestPriorities) for i, node := range nodes { var q resource.Quantity q.SetMilli(allocatedMilliCpus[i%len(allocatedMilliCpus)]) - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( testfixtures.TestPriorities[len(testfixtures.TestPriorities)-1], - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": q}}, - []*schedulerobjects.Node{node}, + testfixtures.TestResourceListFactory.FromJobResourceListIgnoreUnknown(map[string]resource.Quantity{"cpu": q}), + []*internaltypes.Node{node}, ) } nodeDb, err := newNodeDbWithNodes(nodes) @@ -908,27 +750,39 @@ func BenchmarkNodeTypeIterator(b *testing.B) { } } -func withNodeTypeNodes(nodeTypeLabel string, nodes []*schedulerobjects.Node) []*schedulerobjects.Node { - for _, node := range nodes { - node.Labels[testfixtures.NodeTypeLabel] = nodeTypeLabel - } - return nodes +func labelsToNodeType(labels map[string]string) *internaltypes.NodeType { + nodeType := internaltypes.NewNodeType( + []v1.Taint{}, + labels, + util.StringListToSet(testfixtures.TestIndexedTaints), + util.StringListToSet(testfixtures.TestIndexedNodeLabels), + ) + return nodeType } -func nodeTypeLabelToNodeTypeId(nodeTypeLabel string) uint64 { - return labelsToNodeTypeId(map[string]string{testfixtures.NodeTypeLabel: nodeTypeLabel}) +func cpu(cpu string) internaltypes.ResourceList { + return testfixtures.TestResourceListFactory.FromNodeProto( + map[string]resource.Quantity{ + "cpu": resource.MustParse(cpu), + }, + ) } -func gpuNodeTypeLabelToNodeTypeId(nodeTypeLabel string) uint64 { - return labelsToNodeTypeId(map[string]string{testfixtures.NodeTypeLabel: nodeTypeLabel, "gpu": "true"}) +func cpuMem(cpu string, memory string) internaltypes.ResourceList { + return testfixtures.TestResourceListFactory.FromNodeProto( + map[string]resource.Quantity{ + "cpu": resource.MustParse(cpu), + "memory": resource.MustParse(memory), + }, + ) } -func labelsToNodeTypeId(labels map[string]string) uint64 { - nodeType := internaltypes.NewNodeType( - []v1.Taint{}, - labels, - util.StringListToSet(testfixtures.TestIndexedTaints), - util.StringListToSet(testfixtures.TestIndexedNodeLabels), +func cpuMemGpu(cpu string, memory string, gpu string) internaltypes.ResourceList { + return testfixtures.TestResourceListFactory.FromNodeProto( + map[string]resource.Quantity{ + "cpu": resource.MustParse(cpu), + "memory": resource.MustParse(memory), + "nvidia.com/gpu": resource.MustParse(gpu), + }, ) - return nodeType.GetId() } diff --git a/internal/scheduler/testfixtures/testfixtures.go b/internal/scheduler/testfixtures/testfixtures.go index 4a44cf346a0..5c66f044119 100644 --- a/internal/scheduler/testfixtures/testfixtures.go +++ b/internal/scheduler/testfixtures/testfixtures.go @@ -297,6 +297,13 @@ func WithUsedResourcesNodes(p int32, rl schedulerobjects.ResourceList, nodes []* return nodes } +func ItWithUsedResourcesNodes(p int32, rl internaltypes.ResourceList, nodes []*internaltypes.Node) []*internaltypes.Node { + for _, node := range nodes { + internaltypes.MarkAllocated(node.AllocatableByPriority, p, rl) + } + return nodes +} + func WithLabelsNodes(labels map[string]string, nodes []*schedulerobjects.Node) []*schedulerobjects.Node { for _, node := range nodes { if node.Labels == nil { @@ -308,6 +315,70 @@ func WithLabelsNodes(labels map[string]string, nodes []*schedulerobjects.Node) [ return nodes } +func ItWithNodeTypeNodes(nodeType *internaltypes.NodeType, nodes []*internaltypes.Node) []*internaltypes.Node { + result := make([]*internaltypes.Node, len(nodes)) + for i, node := range nodes { + result[i] = internaltypes.CreateNode(node.GetId(), + nodeType, + node.GetIndex(), + node.GetExecutor(), + node.GetName(), + node.GetPool(), + node.GetTaints(), + node.GetLabels(), + node.GetTotalResources(), + node.GetUnallocatableResources(), + node.AllocatableByPriority, + node.AllocatedByQueue, + node.AllocatedByJobId, + node.EvictedJobRunIds, + nil) + } + return result +} + +func ItWithIdNodes(nodeId string, nodes []*internaltypes.Node) []*internaltypes.Node { + result := make([]*internaltypes.Node, len(nodes)) + for i, node := range nodes { + result[i] = internaltypes.CreateNode(nodeId, + node.GetNodeType(), + node.GetIndex(), + node.GetExecutor(), + node.GetName(), + node.GetPool(), + node.GetTaints(), + node.GetLabels(), + node.GetTotalResources(), + node.GetUnallocatableResources(), + node.AllocatableByPriority, + node.AllocatedByQueue, + node.AllocatedByJobId, + node.EvictedJobRunIds, + nil, + ) + } + return result +} + +func ItWithIndexNode(idx uint64, node *internaltypes.Node) *internaltypes.Node { + return internaltypes.CreateNode(node.GetId(), + node.GetNodeType(), + idx, + node.GetExecutor(), + node.GetName(), + node.GetPool(), + node.GetTaints(), + node.GetLabels(), + node.GetTotalResources(), + node.GetUnallocatableResources(), + node.AllocatableByPriority, + node.AllocatedByQueue, + node.AllocatedByJobId, + node.EvictedJobRunIds, + nil, + ) +} + func WithPriorityJobs(priority uint32, jobs []*jobdb.Job) []*jobdb.Job { for i, job := range jobs { jobs[i] = job.WithPriority(priority) @@ -714,6 +785,14 @@ func N32CpuNodes(n int, priorities []int32) []*schedulerobjects.Node { return rv } +func ItN32CpuNodes(n int, priorities []int32) []*internaltypes.Node { + rv := make([]*internaltypes.Node, n) + for i := 0; i < n; i++ { + rv[i] = ItTest32CpuNode(priorities) + } + return rv +} + func NTainted32CpuNodes(n int, priorities []int32) []*schedulerobjects.Node { rv := make([]*schedulerobjects.Node, n) for i := 0; i < n; i++ { @@ -722,6 +801,14 @@ func NTainted32CpuNodes(n int, priorities []int32) []*schedulerobjects.Node { return rv } +func ItNTainted32CpuNodes(n int, priorities []int32) []*internaltypes.Node { + rv := make([]*internaltypes.Node, n) + for i := 0; i < n; i++ { + rv[i] = ItTestTainted32CpuNode(priorities) + } + return rv +} + func N8GpuNodes(n int, priorities []int32) []*schedulerobjects.Node { rv := make([]*schedulerobjects.Node, n) for i := 0; i < n; i++ { @@ -730,6 +817,14 @@ func N8GpuNodes(n int, priorities []int32) []*schedulerobjects.Node { return rv } +func ItN8GpuNodes(n int, priorities []int32) []*internaltypes.Node { + rv := make([]*internaltypes.Node, n) + for i := 0; i < n; i++ { + rv[i] = ItTest8GpuNode(priorities) + } + return rv +} + func SingleQueuePriorityOne(name string) []*api.Queue { return []*api.Queue{{Name: name, PriorityFactor: 1.0}} } @@ -754,6 +849,24 @@ func TestNode(priorities []int32, resources map[string]resource.Quantity) *sched } } +func ItTestNode(priorities []int32, resources map[string]resource.Quantity) *internaltypes.Node { + rl := TestNodeFactory.ResourceListFactory().FromNodeProto(resources) + id := uuid.NewString() + return TestNodeFactory.CreateNodeAndType(id, + "executor1", + id, + TestPool, + false, + []v1.Taint{}, + map[string]string{ + TestHostnameLabel: id, + schedulerconfiguration.NodeIdLabel: id, + }, + rl, + map[int32]internaltypes.ResourceList{}, + internaltypes.NewAllocatableByPriorityAndResourceType(priorities, rl)) +} + func Test32CpuNode(priorities []int32) *schedulerobjects.Node { return TestNode( priorities, @@ -764,6 +877,16 @@ func Test32CpuNode(priorities []int32) *schedulerobjects.Node { ) } +func ItTest32CpuNode(priorities []int32) *internaltypes.Node { + return ItTestNode( + priorities, + map[string]resource.Quantity{ + "cpu": resource.MustParse("32"), + "memory": resource.MustParse("256Gi"), + }, + ) +} + func TestTainted32CpuNode(priorities []int32) *schedulerobjects.Node { node := Test32CpuNode(priorities) node.Taints = []v1.Taint{ @@ -777,6 +900,26 @@ func TestTainted32CpuNode(priorities []int32) *schedulerobjects.Node { return node } +func ItTestTainted32CpuNode(priorities []int32) *internaltypes.Node { + node := ItTest32CpuNode(priorities) + + node = TestNodeFactory.AddTaints([]*internaltypes.Node{node}, + []v1.Taint{ + { + Key: "largeJobsOnly", + Value: "true", + Effect: v1.TaintEffectNoSchedule, + }, + })[0] + + node = TestNodeFactory.AddLabels( + []*internaltypes.Node{node}, + map[string]string{"largeJobsOnly": "true"}, + )[0] + + return node +} + func Test8GpuNode(priorities []int32) *schedulerobjects.Node { node := TestNode( priorities, @@ -790,6 +933,21 @@ func Test8GpuNode(priorities []int32) *schedulerobjects.Node { return node } +func ItTest8GpuNode(priorities []int32) *internaltypes.Node { + node := ItTestNode( + priorities, + map[string]resource.Quantity{ + "cpu": resource.MustParse("64"), + "memory": resource.MustParse("1024Gi"), + "nvidia.com/gpu": resource.MustParse("8"), + }, + ) + return TestNodeFactory.AddLabels( + []*internaltypes.Node{node}, + map[string]string{"gpu": "true"}, + )[0] +} + func WithLastUpdateTimeExecutor(lastUpdateTime time.Time, executor *schedulerobjects.Executor) *schedulerobjects.Executor { executor.LastUpdateTime = lastUpdateTime return executor