Skip to content

Commit

Permalink
add selector spread plugin
Browse files Browse the repository at this point in the history
- fix plugin use listers listing resource failed

Signed-off-by: xilinxing <[email protected]>
  • Loading branch information
xilinxing committed Sep 24, 2022
1 parent b26336f commit 9dc9914
Show file tree
Hide file tree
Showing 8 changed files with 399 additions and 10 deletions.
9 changes: 6 additions & 3 deletions installer/helm/chart/volcano/templates/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ rules:
resources: ["persistentvolumes"]
verbs: ["list", "watch"]
- apiGroups: [""]
resources: ["namespaces"]
verbs: ["list", "watch"]
resources: ["namespaces", "services", "replicationcontrollers"]
verbs: ["list", "watch", "get"]
- apiGroups: [""]
resources: ["resourcequotas"]
verbs: ["list", "watch"]
Expand Down Expand Up @@ -72,6 +72,9 @@ rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "create", "delete", "update"]
- apiGroups: ["apps"]
resources: ["daemonsets", "replicasets", "statefulsets"]
verbs: ["list", "watch", "get"]

---
kind: ClusterRoleBinding
Expand Down Expand Up @@ -138,7 +141,7 @@ metadata:
prometheus.io/port: "8080"
prometheus.io/scrape: "true"
name: {{ .Release.Name }}-scheduler-service
namespace: {{ .Release.Namespace }}
namespace: {{ .Release.Namespace }}
spec:
ports:
- port: 8080
Expand Down
10 changes: 7 additions & 3 deletions installer/volcano-development-arm64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ spec:
priorityClassName: system-cluster-critical
containers:
- args:
- --enabled-admission=/jobs/mutate,/jobs/validate,/podgroups/mutate,/pods/validate,/pods/mutate,/queues/mutate,/queues/validate
- --tls-cert-file=/admission.local.config/certificates/tls.crt
- --tls-private-key-file=/admission.local.config/certificates/tls.key
- --ca-cert-file=/admission.local.config/certificates/ca.crt
Expand Down Expand Up @@ -8565,8 +8566,8 @@ rules:
resources: ["persistentvolumes"]
verbs: ["list", "watch"]
- apiGroups: [""]
resources: ["namespaces"]
verbs: ["list", "watch"]
resources: ["namespaces", "services", "replicationcontrollers"]
verbs: ["list", "watch", "get"]
- apiGroups: [""]
resources: ["resourcequotas"]
verbs: ["list", "watch"]
Expand Down Expand Up @@ -8594,6 +8595,9 @@ rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "create", "delete", "update"]
- apiGroups: ["apps"]
resources: ["daemonsets", "replicasets", "statefulsets"]
verbs: ["list", "watch", "get"]
---
# Source: volcano/templates/scheduler.yaml
kind: ClusterRoleBinding
Expand All @@ -8618,7 +8622,7 @@ metadata:
prometheus.io/port: "8080"
prometheus.io/scrape: "true"
name: volcano-scheduler-service
namespace: volcano-system
namespace: volcano-system
spec:
ports:
- port: 8080
Expand Down
9 changes: 6 additions & 3 deletions installer/volcano-development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8566,8 +8566,8 @@ rules:
resources: ["persistentvolumes"]
verbs: ["list", "watch"]
- apiGroups: [""]
resources: ["namespaces"]
verbs: ["list", "watch"]
resources: ["namespaces", "services", "replicationcontrollers"]
verbs: ["list", "watch", "get"]
- apiGroups: [""]
resources: ["resourcequotas"]
verbs: ["list", "watch"]
Expand Down Expand Up @@ -8595,6 +8595,9 @@ rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "create", "delete", "update"]
- apiGroups: ["apps"]
resources: ["daemonsets", "replicasets", "statefulsets"]
verbs: ["list", "watch", "get"]
---
# Source: volcano/templates/scheduler.yaml
kind: ClusterRoleBinding
Expand All @@ -8619,7 +8622,7 @@ metadata:
prometheus.io/port: "8080"
prometheus.io/scrape: "true"
name: volcano-scheduler-service
namespace: volcano-system
namespace: volcano-system
spec:
ports:
- port: 8080
Expand Down
9 changes: 9 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,15 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
sc.informerFactory = informerFactory
mySchedulerPodName, c := getMultiSchedulerInfo()

// explictly register informers to the factory, otherwise resources listers cannot get anything
// even with no erorr returned. `Namespace` informer is used by `InterPodAffinity` plugin,
// `SelectorSpread` and `PodTopologySpread` plugins uses the following four so far.
informerFactory.Core().V1().Namespaces().Informer()
informerFactory.Core().V1().Services().Informer()
informerFactory.Core().V1().ReplicationControllers().Informer()
informerFactory.Apps().V1().ReplicaSets().Informer()
informerFactory.Apps().V1().StatefulSets().Informer()

// create informer for node information
sc.nodeInformer = informerFactory.Core().V1().Nodes()
sc.nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
Expand Down
79 changes: 78 additions & 1 deletion pkg/scheduler/plugins/nodeorder/nodeorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"

"volcano.sh/volcano/pkg/scheduler/api"
Expand All @@ -58,6 +59,8 @@ const (
TaintTolerationWeight = "tainttoleration.weight"
// ImageLocalityWeight is the key for providing Image Locality Priority Weight in YAML
ImageLocalityWeight = "imagelocality.weight"
// SelectorSpreadWeight is the key for providing Selector Spread Priority Weight in YAML
selectorSpreadWeight = "selectorspread.weight"
)

type nodeOrderPlugin struct {
Expand All @@ -82,6 +85,7 @@ type priorityWeight struct {
balancedResourceWeight int
taintTolerationWeight int
imageLocalityWeight int
selectorSpreadWeight int
}

// calculateWeight from the provided arguments.
Expand Down Expand Up @@ -122,6 +126,7 @@ func calculateWeight(args framework.Arguments) priorityWeight {
balancedResourceWeight: 1,
taintTolerationWeight: 1,
imageLocalityWeight: 1,
selectorSpreadWeight: 0,
}

// Checks whether nodeaffinity.weight is provided or not, if given, modifies the value in weight struct.
Expand All @@ -145,6 +150,9 @@ func calculateWeight(args framework.Arguments) priorityWeight {
// Checks whether imagelocality.weight is provided or not, if given, modifies the value in weight struct.
args.GetInt(&weight.imageLocalityWeight, ImageLocalityWeight)

// Checks whether selectorspread.weight is provided or not, if given, modifies the value in weight struct.
args.GetInt(&weight.selectorSpreadWeight, selectorSpreadWeight)

return weight
}

Expand Down Expand Up @@ -252,6 +260,7 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {

// If imageLocalityWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
nodeScore += float64(score) * float64(weight.imageLocalityWeight)
klog.V(4).Infof("Image Locality score: %f", nodeScore)
}

// NodeResourcesLeastAllocated
Expand All @@ -264,6 +273,7 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {

// If leastReqWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
nodeScore += float64(score) * float64(weight.leastReqWeight)
klog.V(4).Infof("Least Request score: %f", nodeScore)
}

// NodeResourcesMostAllocated
Expand All @@ -276,6 +286,7 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {

// If mostRequestedWeight is provided, host.Score is multiplied with weight, it's 0 by default
nodeScore += float64(score) * float64(weight.mostReqWeight)
klog.V(4).Infof("Most Request score: %f", nodeScore)
}

// NodeResourcesBalancedAllocation
Expand All @@ -288,6 +299,7 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {

// If balancedResourceWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
nodeScore += float64(score) * float64(weight.balancedResourceWeight)
klog.V(4).Infof("Balanced Request score: %f", nodeScore)
}

// NodeAffinity
Expand All @@ -301,6 +313,7 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
// TODO: should we normalize the score
// If nodeAffinityWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
nodeScore += float64(score) * float64(weight.nodeAffinityWeight)
klog.V(4).Infof("Node Affinity score: %f", nodeScore)
}

klog.V(4).Infof("Total Score for task %s/%s on node %s is: %f", task.Namespace, task.Name, node.Name, nodeScore)
Expand All @@ -315,6 +328,9 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
p, _ = tainttoleration.New(nil, handle)
taintToleration := p.(*tainttoleration.TaintToleration)

p, _ = selectorspread.New(nil, handle)
selectorSpread := p.(*selectorspread.SelectorSpread)

batchNodeOrderFn := func(task *api.TaskInfo, nodeInfo []*api.NodeInfo) (map[string]float64, error) {
// InterPodAffinity
state := k8sframework.NewCycleState()
Expand All @@ -334,8 +350,13 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
return nil, err
}

selectorSpreadScores, err := selectorSpreadScore(selectorSpread, state, task.Pod, nodes, weight.selectorSpreadWeight)
if err != nil {
return nil, err
}

for _, node := range nodes {
nodeScores[node.Name] = podAffinityScores[node.Name] + nodeTolerationScores[node.Name]
nodeScores[node.Name] = podAffinityScores[node.Name] + nodeTolerationScores[node.Name] + selectorSpreadScores[node.Name]
}

klog.V(4).Infof("Batch Total Score for task %s/%s is: %v", task.Namespace, task.Name, nodeScores)
Expand Down Expand Up @@ -463,5 +484,61 @@ func taintTolerationScore(
return nodeScores, nil
}

func selectorSpreadScore(
selectorSpread *selectorspread.SelectorSpread,
cycleState *k8sframework.CycleState,
pod *v1.Pod,
nodes []*v1.Node,
selectorSpreadWeight int,
) (map[string]float64, error) {
preScoreStatus := selectorSpread.PreScore(context.TODO(), cycleState, pod, nodes)
if !preScoreStatus.IsSuccess() {
return nil, preScoreStatus.AsError()
}

nodescoreList := make(k8sframework.NodeScoreList, len(nodes))
// size of errCh should be no less than parallelization number, see interPodAffinityScore.
workerNum := 16
errCh := make(chan error, workerNum)
parallelizeContext, parallelizeCancel := context.WithCancel(context.TODO())
workqueue.ParallelizeUntil(parallelizeContext, workerNum, len(nodes), func(index int) {
nodeName := nodes[index].Name
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s, status := selectorSpread.Score(ctx, cycleState, pod, nodeName)
if !status.IsSuccess() {
parallelizeCancel()
errCh <- fmt.Errorf("calculate selector spread priority failed %v", status.Message())
return
}
nodescoreList[index] = k8sframework.NodeScore{
Name: nodeName,
Score: s,
}
})

select {
case err := <-errCh:
return nil, err
default:
}

selectorSpread.NormalizeScore(context.TODO(), cycleState, pod, nodescoreList)

nodeScores := make(map[string]float64, len(nodes))
for i, nodeScore := range nodescoreList {
// return error if score plugin returns invalid score.
if nodeScore.Score > k8sframework.MaxNodeScore || nodeScore.Score < k8sframework.MinNodeScore {
return nil, fmt.Errorf("selector spread returns an invalid score %v for node %s", nodeScore.Score, nodeScore.Name)
}
nodeScore.Score *= int64(selectorSpreadWeight)
nodescoreList[i] = nodeScore
nodeScores[nodeScore.Name] = float64(nodeScore.Score)
}

klog.V(4).Infof("selector spread Score for task %s/%s is: %v", pod.Namespace, pod.Name, nodeScores)
return nodeScores, nil
}

func (pp *nodeOrderPlugin) OnSessionClose(ssn *framework.Session) {
}
57 changes: 57 additions & 0 deletions vendor/k8s.io/component-helpers/node/topology/helpers.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 9dc9914

Please sign in to comment.