Skip to content

Commit

Permalink
Merge branch 'master' into rich/scala-funcs-test
Browse files Browse the repository at this point in the history
  • Loading branch information
richscott committed Jan 2, 2025
2 parents 8f90cdf + 9d3feb4 commit d533d7d
Show file tree
Hide file tree
Showing 6 changed files with 690 additions and 573 deletions.
62 changes: 56 additions & 6 deletions internal/scheduler/internaltypes/node_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internaltypes

import (
"fmt"
"sync/atomic"

v1 "k8s.io/api/core/v1"

Expand Down Expand Up @@ -30,7 +31,7 @@ type NodeFactory struct {
resourceListFactory *ResourceListFactory

// Used for assigning node index
nodeIndexCounter uint64
nodeIndexCounter atomic.Uint64
}

func NewNodeFactory(
Expand All @@ -42,7 +43,7 @@ func NewNodeFactory(
indexedTaints: util.StringListToSet(indexedTaints),
indexedNodeLabels: util.StringListToSet(indexedNodeLabels),
resourceListFactory: resourceListFactory,
nodeIndexCounter: 0,
nodeIndexCounter: atomic.Uint64{},
}
}

Expand All @@ -58,10 +59,9 @@ func (f *NodeFactory) CreateNodeAndType(
unallocatableResources map[int32]ResourceList,
allocatableByPriority map[int32]ResourceList,
) *Node {
f.nodeIndexCounter++
return CreateNodeAndType(
id,
f.nodeIndexCounter,
f.allocateNodeIndex(),
executor,
name,
pool,
Expand All @@ -77,9 +77,8 @@ func (f *NodeFactory) CreateNodeAndType(
}

func (f *NodeFactory) FromSchedulerObjectsNode(node *schedulerobjects.Node) (*Node, error) {
f.nodeIndexCounter++
return FromSchedulerObjectsNode(node,
f.nodeIndexCounter,
f.allocateNodeIndex(),
f.indexedTaints,
f.indexedNodeLabels,
f.resourceListFactory,
Expand All @@ -104,3 +103,54 @@ func (f *NodeFactory) FromSchedulerObjectsExecutors(executors []*schedulerobject
}
return result
}

func (f *NodeFactory) ResourceListFactory() *ResourceListFactory {
return f.resourceListFactory
}

func (f *NodeFactory) AddLabels(nodes []*Node, extraLabels map[string]string) []*Node {
result := make([]*Node, len(nodes))
for i, node := range nodes {
newLabels := util.MergeMaps(node.GetLabels(), extraLabels)
result[i] = CreateNodeAndType(node.GetId(),
node.GetIndex(),
node.GetExecutor(),
node.GetName(),
node.GetPool(),
false,
node.GetTaints(),
newLabels,
f.indexedTaints,
f.indexedNodeLabels,
node.GetTotalResources(),
node.GetUnallocatableResources(),
node.AllocatableByPriority,
)
}
return result
}

func (f *NodeFactory) AddTaints(nodes []*Node, extraTaints []v1.Taint) []*Node {
result := make([]*Node, len(nodes))
for i, node := range nodes {
result[i] = CreateNodeAndType(node.GetId(),
node.GetIndex(),
node.GetExecutor(),
node.GetName(),
node.GetPool(),
false,
append(node.GetTaints(), extraTaints...),
node.GetLabels(),
f.indexedTaints,
f.indexedNodeLabels,
node.GetTotalResources(),
node.GetUnallocatableResources(),
node.AllocatableByPriority,
)
}
return result
}

func (f *NodeFactory) allocateNodeIndex() uint64 {
return f.nodeIndexCounter.Add(1)
}
24 changes: 24 additions & 0 deletions internal/scheduler/internaltypes/resource_list_map_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,27 @@ func RlMapRemoveZeros(m map[string]ResourceList) map[string]ResourceList {
}
return result
}

func NewAllocatableByPriorityAndResourceType(priorities []int32, rl ResourceList) map[int32]ResourceList {
result := map[int32]ResourceList{}
for _, priority := range priorities {
result[priority] = rl
}
return result
}

// MarkAllocated indicates resources have been allocated to pods of priority p,
// hence reducing the resources allocatable to pods of priority p or lower.
func MarkAllocated(m map[int32]ResourceList, p int32, rs ResourceList) {
MarkAllocatable(m, p, rs.Negate())
}

// MarkAllocatable indicates resources have been released by pods of priority p,
// thus increasing the resources allocatable to pods of priority p or lower.
func MarkAllocatable(m map[int32]ResourceList, p int32, rs ResourceList) {
for priority, allocatableResourcesAtPriority := range m {
if priority <= p {
m[priority] = allocatableResourcesAtPriority.Add(rs)
}
}
}
50 changes: 50 additions & 0 deletions internal/scheduler/internaltypes/resource_list_map_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,56 @@ func TestRlMapRemoveZeros(t *testing.T) {
assert.Equal(t, expected, RlMapRemoveZeros(input))
}

func TestNewAllocatableByPriorityAndResourceType(t *testing.T) {
factory := testFactory()
rl := testResourceList(factory, "2", "2Ki")

result := NewAllocatableByPriorityAndResourceType([]int32{1, 2}, rl)
assert.Equal(t, 2, len(result))
assert.Equal(t, int64(2000), result[1].GetByNameZeroIfMissing("cpu"))
assert.Equal(t, int64(2000), result[2].GetByNameZeroIfMissing("cpu"))
}

func TestMarkAllocated(t *testing.T) {
factory := testFactory()
m := map[int32]ResourceList{
1: testResourceList(factory, "10", "10Gi"),
2: testResourceList(factory, "20", "20Gi"),
3: testResourceList(factory, "30", "30Gi"),
4: testResourceList(factory, "40", "40Gi"),
}

expected := map[int32]ResourceList{
1: testResourceList(factory, "8", "8Gi"),
2: testResourceList(factory, "18", "18Gi"),
3: testResourceList(factory, "30", "30Gi"),
4: testResourceList(factory, "40", "40Gi"),
}

MarkAllocated(m, 2, testResourceList(factory, "2", "2Gi"))
assert.Equal(t, expected, m)
}

func TestMarkAllocatable(t *testing.T) {
factory := testFactory()
m := map[int32]ResourceList{
1: testResourceList(factory, "10", "10Gi"),
2: testResourceList(factory, "20", "20Gi"),
3: testResourceList(factory, "30", "30Gi"),
4: testResourceList(factory, "40", "40Gi"),
}

expected := map[int32]ResourceList{
1: testResourceList(factory, "12", "12Gi"),
2: testResourceList(factory, "22", "22Gi"),
3: testResourceList(factory, "30", "30Gi"),
4: testResourceList(factory, "40", "40Gi"),
}

MarkAllocatable(m, 2, testResourceList(factory, "2", "2Gi"))
assert.Equal(t, expected, m)
}

func testMapAllPositive(factory *ResourceListFactory) map[string]ResourceList {
return map[string]ResourceList{
"a": testResourceList(factory, "1", "1Ki"),
Expand Down
Loading

0 comments on commit d533d7d

Please sign in to comment.