Skip to content

Commit

Permalink
Scheduler: calculate node.allocatableByPriority after load from db (a…
Browse files Browse the repository at this point in the history
…rmadaproject#4127)

Signed-off-by: Robert Smith <[email protected]>
  • Loading branch information
robertdavidsmith authored Jan 8, 2025
1 parent f4cd6bc commit 33f3a27
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 119 deletions.
32 changes: 19 additions & 13 deletions internal/scheduler/internaltypes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"math"

"github.com/pkg/errors"
"golang.org/x/exp/maps"
v1 "k8s.io/api/core/v1"

Expand Down Expand Up @@ -60,20 +59,19 @@ func FromSchedulerObjectsNode(node *schedulerobjects.Node,
nodeIndex uint64,
indexedTaints map[string]bool,
indexedNodeLabels map[string]bool,
allowedPriorities []int32,
resourceListFactory *ResourceListFactory,
) (*Node, error) {
) *Node {
totalResources := resourceListFactory.FromNodeProto(node.TotalResources.Resources)

allocatableByPriority := map[int32]ResourceList{}
minimumPriority := int32(math.MaxInt32)
for p, rl := range node.AllocatableByPriorityAndResource {
if p < minimumPriority {
minimumPriority = p
}
allocatableByPriority[p] = resourceListFactory.FromNodeProto(rl.Resources)
for _, p := range allowedPriorities {
allocatableByPriority[p] = totalResources
}
if minimumPriority < 0 {
return nil, errors.Errorf("found negative priority %d on node %s; negative priorities are reserved for internal use", minimumPriority, node.Id)
for p, rl := range node.UnallocatableResources {
MarkAllocated(allocatableByPriority, p, resourceListFactory.FromJobResourceListIgnoreUnknown(rl.Resources))
}
allocatableByPriority[EvictedPriority] = allocatableByPriority[minimumPriority]
allocatableByPriority[EvictedPriority] = allocatableByPriority[minInt32(allowedPriorities)]

unallocatableResources := map[int32]ResourceList{}
for p, u := range node.UnallocatableResources {
Expand All @@ -91,10 +89,10 @@ func FromSchedulerObjectsNode(node *schedulerobjects.Node,
node.Labels,
indexedTaints,
indexedNodeLabels,
resourceListFactory.FromNodeProto(node.TotalResources.Resources),
totalResources,
unallocatableResources,
allocatableByPriority,
), nil
)
}

func CreateNodeAndType(
Expand Down Expand Up @@ -301,3 +299,11 @@ func deepCopyLabels(labels map[string]string) map[string]string {
}
return result
}

func minInt32(arr []int32) int32 {
result := int32(math.MaxInt32)
for _, val := range arr {
result = min(result, val)
}
return result
}
16 changes: 9 additions & 7 deletions internal/scheduler/internaltypes/node_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

v1 "k8s.io/api/core/v1"

"github.com/armadaproject/armada/internal/common/types"
"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)
Expand All @@ -27,6 +28,9 @@ type NodeFactory struct {
// If not set, no labels are indexed.
indexedNodeLabels map[string]bool

// Allowed priorities, includes home and away (from config)
allowedPriorities []int32

// Factory for internaltypes.ResourceList
resourceListFactory *ResourceListFactory

Expand All @@ -37,11 +41,13 @@ type NodeFactory struct {
func NewNodeFactory(
indexedTaints []string,
indexedNodeLabels []string,
priorityClasses map[string]types.PriorityClass,
resourceListFactory *ResourceListFactory,
) *NodeFactory {
return &NodeFactory{
indexedTaints: util.StringListToSet(indexedTaints),
indexedNodeLabels: util.StringListToSet(indexedNodeLabels),
allowedPriorities: types.AllowedPriorities(priorityClasses),
resourceListFactory: resourceListFactory,
nodeIndexCounter: atomic.Uint64{},
}
Expand Down Expand Up @@ -76,11 +82,12 @@ func (f *NodeFactory) CreateNodeAndType(
)
}

func (f *NodeFactory) FromSchedulerObjectsNode(node *schedulerobjects.Node) (*Node, error) {
func (f *NodeFactory) FromSchedulerObjectsNode(node *schedulerobjects.Node) *Node {
return FromSchedulerObjectsNode(node,
f.allocateNodeIndex(),
f.indexedTaints,
f.indexedNodeLabels,
f.allowedPriorities,
f.resourceListFactory,
)
}
Expand All @@ -93,12 +100,7 @@ func (f *NodeFactory) FromSchedulerObjectsExecutors(executors []*schedulerobject
errorLogger(fmt.Sprintf("Executor name mismatch: %q != %q", node.Executor, executor.Id))
continue
}
itNode, err := f.FromSchedulerObjectsNode(node)
if err != nil {
errorLogger(fmt.Sprintf("Invalid node %s: %v", node.Name, err))
continue
}
result = append(result, itNode)
result = append(result, f.FromSchedulerObjectsNode(node))
}
}
return result
Expand Down
9 changes: 7 additions & 2 deletions internal/scheduler/internaltypes/resource_list_map_util.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package internaltypes

import (
"sort"
"strings"

"golang.org/x/exp/maps"
)

func RlMapToString(m map[string]ResourceList) string {
keys := maps.Keys(m)
sort.Strings(keys)
results := []string{}
for k, v := range m {
results = append(results, k+"="+v.String())
for _, k := range keys {
results = append(results, k+"="+m[k].String())
}
return strings.Join(results, " ")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ import (
"github.com/stretchr/testify/assert"
)

func TestRlMapToString(t *testing.T) {
factory := testFactory()

assert.Equal(t, "a=(memory=1024,cpu=1) b=(memory=2048,cpu=2)", RlMapToString(testMapAllPositive(factory)))
}

func TestRlMapSumValues(t *testing.T) {
factory := testFactory()

Expand Down
Loading

0 comments on commit 33f3a27

Please sign in to comment.