Skip to content

Commit

Permalink
add selector spread plugin
Browse files Browse the repository at this point in the history
Signed-off-by: xilinxing <[email protected]>
  • Loading branch information
xilinxing committed Sep 21, 2022
1 parent b26336f commit 980e308
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 1 deletion.
72 changes: 71 additions & 1 deletion pkg/scheduler/plugins/nodeorder/nodeorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"

"volcano.sh/volcano/pkg/scheduler/api"
Expand Down Expand Up @@ -82,6 +83,7 @@ type priorityWeight struct {
balancedResourceWeight int
taintTolerationWeight int
imageLocalityWeight int
selectorSpreadWeight int
}

// calculateWeight from the provided arguments.
Expand Down Expand Up @@ -122,6 +124,7 @@ func calculateWeight(args framework.Arguments) priorityWeight {
balancedResourceWeight: 1,
taintTolerationWeight: 1,
imageLocalityWeight: 1,
selectorSpreadWeight: 1,
}

// Checks whether nodeaffinity.weight is provided or not, if given, modifies the value in weight struct.
Expand All @@ -145,6 +148,9 @@ func calculateWeight(args framework.Arguments) priorityWeight {
// Checks whether imagelocality.weight is provided or not, if given, modifies the value in weight struct.
args.GetInt(&weight.imageLocalityWeight, ImageLocalityWeight)

// Checks whether selectorspread.weight is provided or not, if given, modifies the value in weight struct.
args.GetInt(&weight.selectorSpreadWeight, ImageLocalityWeight)

return weight
}

Expand Down Expand Up @@ -315,6 +321,9 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
p, _ = tainttoleration.New(nil, handle)
taintToleration := p.(*tainttoleration.TaintToleration)

p, _ = selectorspread.New(nil, handle)
selectorSpread := p.(*selectorspread.SelectorSpread)

batchNodeOrderFn := func(task *api.TaskInfo, nodeInfo []*api.NodeInfo) (map[string]float64, error) {
// InterPodAffinity
state := k8sframework.NewCycleState()
Expand All @@ -334,8 +343,13 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
return nil, err
}

selectorSpreadScores, err := selectorSpreadScore(selectorSpread, state, task.Pod, nodes, weight.selectorSpreadWeight)
if err != nil {
return nil, err
}

for _, node := range nodes {
nodeScores[node.Name] = podAffinityScores[node.Name] + nodeTolerationScores[node.Name]
nodeScores[node.Name] = podAffinityScores[node.Name] + nodeTolerationScores[node.Name] + selectorSpreadScores[node.Name]
}

klog.V(4).Infof("Batch Total Score for task %s/%s is: %v", task.Namespace, task.Name, nodeScores)
Expand Down Expand Up @@ -463,5 +477,61 @@ func taintTolerationScore(
return nodeScores, nil
}

func selectorSpreadScore(
selectorSpread *selectorspread.SelectorSpread,
cycleState *k8sframework.CycleState,
pod *v1.Pod,
nodes []*v1.Node,
selectorSpreadWeight int,
) (map[string]float64, error) {
preScoreStatus := selectorSpread.PreScore(context.TODO(), cycleState, pod, nodes)
if !preScoreStatus.IsSuccess() {
return nil, preScoreStatus.AsError()
}

nodescoreList := make(k8sframework.NodeScoreList, len(nodes))
// size of errCh should be no less than parallelization number, see interPodAffinityScore.
workerNum := 16
errCh := make(chan error, workerNum)
parallelizeContext, parallelizeCancel := context.WithCancel(context.TODO())
workqueue.ParallelizeUntil(parallelizeContext, workerNum, len(nodes), func(index int) {
nodeName := nodes[index].Name
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s, status := selectorSpread.Score(ctx, cycleState, pod, nodeName)
if !status.IsSuccess() {
parallelizeCancel()
errCh <- fmt.Errorf("calculate selector spread priority failed %v", status.Message())
return
}
nodescoreList[index] = k8sframework.NodeScore{
Name: nodeName,
Score: s,
}
})

select {
case err := <-errCh:
return nil, err
default:
}

selectorSpread.NormalizeScore(context.TODO(), cycleState, pod, nodescoreList)

nodeScores := make(map[string]float64, len(nodes))
for i, nodeScore := range nodescoreList {
// return error if score plugin returns invalid score.
if nodeScore.Score > k8sframework.MaxNodeScore || nodeScore.Score < k8sframework.MinNodeScore {
return nil, fmt.Errorf("selector spread returns an invalid score %v for node %s", nodeScore.Score, nodeScore.Name)
}
nodeScore.Score *= int64(selectorSpreadWeight)
nodescoreList[i] = nodeScore
nodeScores[nodeScore.Name] = float64(nodeScore.Score)
}

klog.V(4).Infof("selector spread Score for task %s/%s is: %v", pod.Namespace, pod.Name, nodeScores)
return nodeScores, nil
}

func (pp *nodeOrderPlugin) OnSessionClose(ssn *framework.Session) {
}
2 changes: 2 additions & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,7 @@ k8s.io/component-base/metrics/testutil
k8s.io/component-base/version
# k8s.io/component-helpers v0.23.0 => k8s.io/component-helpers v0.23.0
## explicit; go 1.16
k8s.io/component-helpers/node/topology
k8s.io/component-helpers/node/util/sysctl
k8s.io/component-helpers/scheduling/corev1
k8s.io/component-helpers/scheduling/corev1/nodeaffinity
Expand Down Expand Up @@ -996,6 +997,7 @@ k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports
k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources
k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable
k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits
k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread
k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration
k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding
k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding/metrics
Expand Down

0 comments on commit 980e308

Please sign in to comment.