From 980e308afc54b295203cf90c98120394a29a911c Mon Sep 17 00:00:00 2001 From: xilinxing Date: Wed, 21 Sep 2022 11:22:38 +0800 Subject: [PATCH] add selector spread plugin Signed-off-by: xilinxing --- pkg/scheduler/plugins/nodeorder/nodeorder.go | 72 +++++++++++++++++++- vendor/modules.txt | 2 + 2 files changed, 73 insertions(+), 1 deletion(-) diff --git a/pkg/scheduler/plugins/nodeorder/nodeorder.go b/pkg/scheduler/plugins/nodeorder/nodeorder.go index 1d6db781d4b..a62ecf957cf 100644 --- a/pkg/scheduler/plugins/nodeorder/nodeorder.go +++ b/pkg/scheduler/plugins/nodeorder/nodeorder.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration" "volcano.sh/volcano/pkg/scheduler/api" @@ -82,6 +83,7 @@ type priorityWeight struct { balancedResourceWeight int taintTolerationWeight int imageLocalityWeight int + selectorSpreadWeight int } // calculateWeight from the provided arguments. @@ -122,6 +124,7 @@ func calculateWeight(args framework.Arguments) priorityWeight { balancedResourceWeight: 1, taintTolerationWeight: 1, imageLocalityWeight: 1, + selectorSpreadWeight: 1, } // Checks whether nodeaffinity.weight is provided or not, if given, modifies the value in weight struct. @@ -145,6 +148,9 @@ func calculateWeight(args framework.Arguments) priorityWeight { // Checks whether imagelocality.weight is provided or not, if given, modifies the value in weight struct. args.GetInt(&weight.imageLocalityWeight, ImageLocalityWeight) + // Checks whether selectorspread.weight is provided or not, if given, modifies the value in weight struct. + args.GetInt(&weight.selectorSpreadWeight, ImageLocalityWeight) + return weight } @@ -315,6 +321,9 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { p, _ = tainttoleration.New(nil, handle) taintToleration := p.(*tainttoleration.TaintToleration) + p, _ = selectorspread.New(nil, handle) + selectorSpread := p.(*selectorspread.SelectorSpread) + batchNodeOrderFn := func(task *api.TaskInfo, nodeInfo []*api.NodeInfo) (map[string]float64, error) { // InterPodAffinity state := k8sframework.NewCycleState() @@ -334,8 +343,13 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { return nil, err } + selectorSpreadScores, err := selectorSpreadScore(selectorSpread, state, task.Pod, nodes, weight.selectorSpreadWeight) + if err != nil { + return nil, err + } + for _, node := range nodes { - nodeScores[node.Name] = podAffinityScores[node.Name] + nodeTolerationScores[node.Name] + nodeScores[node.Name] = podAffinityScores[node.Name] + nodeTolerationScores[node.Name] + selectorSpreadScores[node.Name] } klog.V(4).Infof("Batch Total Score for task %s/%s is: %v", task.Namespace, task.Name, nodeScores) @@ -463,5 +477,61 @@ func taintTolerationScore( return nodeScores, nil } +func selectorSpreadScore( + selectorSpread *selectorspread.SelectorSpread, + cycleState *k8sframework.CycleState, + pod *v1.Pod, + nodes []*v1.Node, + selectorSpreadWeight int, +) (map[string]float64, error) { + preScoreStatus := selectorSpread.PreScore(context.TODO(), cycleState, pod, nodes) + if !preScoreStatus.IsSuccess() { + return nil, preScoreStatus.AsError() + } + + nodescoreList := make(k8sframework.NodeScoreList, len(nodes)) + // size of errCh should be no less than parallelization number, see interPodAffinityScore. + workerNum := 16 + errCh := make(chan error, workerNum) + parallelizeContext, parallelizeCancel := context.WithCancel(context.TODO()) + workqueue.ParallelizeUntil(parallelizeContext, workerNum, len(nodes), func(index int) { + nodeName := nodes[index].Name + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s, status := selectorSpread.Score(ctx, cycleState, pod, nodeName) + if !status.IsSuccess() { + parallelizeCancel() + errCh <- fmt.Errorf("calculate selector spread priority failed %v", status.Message()) + return + } + nodescoreList[index] = k8sframework.NodeScore{ + Name: nodeName, + Score: s, + } + }) + + select { + case err := <-errCh: + return nil, err + default: + } + + selectorSpread.NormalizeScore(context.TODO(), cycleState, pod, nodescoreList) + + nodeScores := make(map[string]float64, len(nodes)) + for i, nodeScore := range nodescoreList { + // return error if score plugin returns invalid score. + if nodeScore.Score > k8sframework.MaxNodeScore || nodeScore.Score < k8sframework.MinNodeScore { + return nil, fmt.Errorf("selector spread returns an invalid score %v for node %s", nodeScore.Score, nodeScore.Name) + } + nodeScore.Score *= int64(selectorSpreadWeight) + nodescoreList[i] = nodeScore + nodeScores[nodeScore.Name] = float64(nodeScore.Score) + } + + klog.V(4).Infof("selector spread Score for task %s/%s is: %v", pod.Namespace, pod.Name, nodeScores) + return nodeScores, nil +} + func (pp *nodeOrderPlugin) OnSessionClose(ssn *framework.Session) { } diff --git a/vendor/modules.txt b/vendor/modules.txt index 1fbd1fc966c..dbb7b88bdad 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -912,6 +912,7 @@ k8s.io/component-base/metrics/testutil k8s.io/component-base/version # k8s.io/component-helpers v0.23.0 => k8s.io/component-helpers v0.23.0 ## explicit; go 1.16 +k8s.io/component-helpers/node/topology k8s.io/component-helpers/node/util/sysctl k8s.io/component-helpers/scheduling/corev1 k8s.io/component-helpers/scheduling/corev1/nodeaffinity @@ -996,6 +997,7 @@ k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits +k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding/metrics