Skip to content

Commit

Permalink
add podTopologySpread plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Xuzheng Chang <[email protected]>
  • Loading branch information
Monokaix committed Sep 2, 2022
1 parent ddf94fc commit 0d0d76c
Show file tree
Hide file tree
Showing 7 changed files with 1,011 additions and 44 deletions.
163 changes: 119 additions & 44 deletions pkg/scheduler/plugins/nodeorder/nodeorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"

"volcano.sh/volcano/pkg/scheduler/api"
Expand All @@ -58,6 +59,8 @@ const (
TaintTolerationWeight = "tainttoleration.weight"
// ImageLocalityWeight is the key for providing Image Locality Priority Weight in YAML
ImageLocalityWeight = "imagelocality.weight"
// PodTopologySpreadWeight is the key for providing Pod Topology Spread Priority Weight in YAML
PodTopologySpreadWeight = "podtopologyspread.weight"
)

type nodeOrderPlugin struct {
Expand All @@ -75,13 +78,14 @@ func (pp *nodeOrderPlugin) Name() string {
}

type priorityWeight struct {
leastReqWeight int
mostReqWeight int
nodeAffinityWeight int
podAffinityWeight int
balancedResourceWeight int
taintTolerationWeight int
imageLocalityWeight int
leastReqWeight int
mostReqWeight int
nodeAffinityWeight int
podAffinityWeight int
balancedResourceWeight int
taintTolerationWeight int
imageLocalityWeight int
podTopologySpreadWeight int
}

// calculateWeight from the provided arguments.
Expand All @@ -91,37 +95,39 @@ type priorityWeight struct {
//
// User should specify priority weights in the config in this format:
//
// actions: "reclaim, allocate, backfill, preempt"
// tiers:
// - plugins:
// - name: priority
// - name: gang
// - name: conformance
// - plugins:
// - name: drf
// - name: predicates
// - name: proportion
// - name: nodeorder
// arguments:
// leastrequested.weight: 1
// mostrequested.weight: 0
// nodeaffinity.weight: 1
// podaffinity.weight: 1
// balancedresource.weight: 1
// tainttoleration.weight: 1
// imagelocality.weight: 1
// actions: "reclaim, allocate, backfill, preempt"
// tiers:
// - plugins:
// - name: priority
// - name: gang
// - name: conformance
// - plugins:
// - name: drf
// - name: predicates
// - name: proportion
// - name: nodeorder
// arguments:
// leastrequested.weight: 1
// mostrequested.weight: 0
// nodeaffinity.weight: 1
// podaffinity.weight: 1
// balancedresource.weight: 1
// tainttoleration.weight: 1
// imagelocality.weight: 1
// podtopologyspread.weight: 1
func calculateWeight(args framework.Arguments) priorityWeight {
// Initial values for weights.
// By default, for backward compatibility and for reasonable scores,
// least requested priority is enabled and most requested priority is disabled.
weight := priorityWeight{
leastReqWeight: 1,
mostReqWeight: 0,
nodeAffinityWeight: 1,
podAffinityWeight: 1,
balancedResourceWeight: 1,
taintTolerationWeight: 1,
imageLocalityWeight: 1,
leastReqWeight: 1,
mostReqWeight: 0,
nodeAffinityWeight: 1,
podAffinityWeight: 1,
balancedResourceWeight: 1,
taintTolerationWeight: 1,
imageLocalityWeight: 1,
podTopologySpreadWeight: 1,
}

// Checks whether nodeaffinity.weight is provided or not, if given, modifies the value in weight struct.
Expand All @@ -145,6 +151,8 @@ func calculateWeight(args framework.Arguments) priorityWeight {
// Checks whether imagelocality.weight is provided or not, if given, modifies the value in weight struct.
args.GetInt(&weight.imageLocalityWeight, ImageLocalityWeight)

// Checks whether podtopologyspread.weight is provided or not, if given, modifies the value in weight struct.
args.GetInt(&weight.podTopologySpreadWeight, PodTopologySpreadWeight)
return weight
}

Expand Down Expand Up @@ -315,6 +323,12 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
p, _ = tainttoleration.New(nil, handle)
taintToleration := p.(*tainttoleration.TaintToleration)

ptsArgs := &config.PodTopologySpreadArgs{
DefaultingType: config.SystemDefaulting,
}
p, _ = podtopologyspread.New(ptsArgs, handle)
podTopologySpread := p.(*podtopologyspread.PodTopologySpread)

batchNodeOrderFn := func(task *api.TaskInfo, nodeInfo []*api.NodeInfo) (map[string]float64, error) {
// InterPodAffinity
state := k8sframework.NewCycleState()
Expand All @@ -334,8 +348,13 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
return nil, err
}

podTopologySpreadScores, err := podTopologySpreadScore(podTopologySpread, state, task.Pod, nodes, weight.podTopologySpreadWeight)
if err != nil {
return nil, err
}

for _, node := range nodes {
nodeScores[node.Name] = podAffinityScores[node.Name] + nodeTolerationScores[node.Name]
nodeScores[node.Name] = podAffinityScores[node.Name] + nodeTolerationScores[node.Name] + podTopologySpreadScores[node.Name]
}

klog.V(4).Infof("Batch Total Score for task %s/%s is: %v", task.Namespace, task.Name, nodeScores)
Expand All @@ -356,7 +375,7 @@ func interPodAffinityScore(
return nil, preScoreStatus.AsError()
}

nodescoreList := make(k8sframework.NodeScoreList, len(nodes))
nodeScoreList := make(k8sframework.NodeScoreList, len(nodes))
// the default parallelization worker number is 16.
// the whole scoring will fail if one of the processes failed.
// so just create a parallelizeContext to control the whole ParallelizeUntil process.
Expand All @@ -380,7 +399,7 @@ func interPodAffinityScore(
errCh <- fmt.Errorf("calculate inter pod affinity priority failed %v", status.Message())
return
}
nodescoreList[index] = k8sframework.NodeScore{
nodeScoreList[index] = k8sframework.NodeScore{
Name: nodeName,
Score: s,
}
Expand All @@ -392,16 +411,16 @@ func interPodAffinityScore(
default:
}

interPodAffinity.NormalizeScore(context.TODO(), state, pod, nodescoreList)
interPodAffinity.NormalizeScore(context.TODO(), state, pod, nodeScoreList)

nodeScores := make(map[string]float64, len(nodes))
for i, nodeScore := range nodescoreList {
for i, nodeScore := range nodeScoreList {
// return error if score plugin returns invalid score.
if nodeScore.Score > k8sframework.MaxNodeScore || nodeScore.Score < k8sframework.MinNodeScore {
return nil, fmt.Errorf("inter pod affinity returns an invalid score %v for node %s", nodeScore.Score, nodeScore.Name)
}
nodeScore.Score *= int64(podAffinityWeight)
nodescoreList[i] = nodeScore
nodeScoreList[i] = nodeScore
nodeScores[nodeScore.Name] = float64(nodeScore.Score)
}

Expand All @@ -421,7 +440,7 @@ func taintTolerationScore(
return nil, preScoreStatus.AsError()
}

nodescoreList := make(k8sframework.NodeScoreList, len(nodes))
nodeScoreList := make(k8sframework.NodeScoreList, len(nodes))
// size of errCh should be no less than parallelization number, see interPodAffinityScore.
workerNum := 16
errCh := make(chan error, workerNum)
Expand All @@ -438,7 +457,7 @@ func taintTolerationScore(
errCh <- fmt.Errorf("calculate taint toleration priority failed %v", status.Message())
return
}
nodescoreList[index] = k8sframework.NodeScore{
nodeScoreList[index] = k8sframework.NodeScore{
Name: nodeName,
Score: s,
}
Expand All @@ -450,22 +469,78 @@ func taintTolerationScore(
default:
}

taintToleration.NormalizeScore(context.TODO(), cycleState, pod, nodescoreList)
taintToleration.NormalizeScore(context.TODO(), cycleState, pod, nodeScoreList)

nodeScores := make(map[string]float64, len(nodes))
for i, nodeScore := range nodescoreList {
for i, nodeScore := range nodeScoreList {
// return error if score plugin returns invalid score.
if nodeScore.Score > k8sframework.MaxNodeScore || nodeScore.Score < k8sframework.MinNodeScore {
return nil, fmt.Errorf("taint toleration returns an invalid score %v for node %s", nodeScore.Score, nodeScore.Name)
}
nodeScore.Score *= int64(taintTolerationWeight)
nodescoreList[i] = nodeScore
nodeScoreList[i] = nodeScore
nodeScores[nodeScore.Name] = float64(nodeScore.Score)
}

klog.V(4).Infof("taint toleration Score for task %s/%s is: %v", pod.Namespace, pod.Name, nodeScores)
return nodeScores, nil
}

func podTopologySpreadScore(
podTopologySpread *podtopologyspread.PodTopologySpread,
cycleState *k8sframework.CycleState,
pod *v1.Pod,
nodes []*v1.Node,
podTopologySpreadWeight int,
) (map[string]float64, error) {
preScoreStatus := podTopologySpread.PreScore(context.TODO(), cycleState, pod, nodes)
if !preScoreStatus.IsSuccess() {
return nil, preScoreStatus.AsError()
}

nodeScoreList := make(k8sframework.NodeScoreList, len(nodes))
// size of errCh should be no less than parallelization number, see interPodAffinityScore.
workerNum := 16
errCh := make(chan error, workerNum)
parallelizeContext, parallelizeCancel := context.WithCancel(context.TODO())
workqueue.ParallelizeUntil(parallelizeContext, workerNum, len(nodes), func(index int) {
nodeName := nodes[index].Name
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s, status := podTopologySpread.Score(ctx, cycleState, pod, nodeName)
if !status.IsSuccess() {
parallelizeCancel()
errCh <- fmt.Errorf("calculate taint toleration priority failed %v", status.Message())
return
}
nodeScoreList[index] = k8sframework.NodeScore{
Name: nodeName,
Score: s,
}
})

select {
case err := <-errCh:
return nil, err
default:
}

podTopologySpread.NormalizeScore(context.TODO(), cycleState, pod, nodeScoreList)

nodeScores := make(map[string]float64, len(nodes))
for i, nodeScore := range nodeScoreList {
// return error if score plugin returns invalid score.
if nodeScore.Score > k8sframework.MaxNodeScore || nodeScore.Score < k8sframework.MinNodeScore {
return nil, fmt.Errorf("pod topology spread returns an invalid score %v for node %s", nodeScore.Score, nodeScore.Name)
}
nodeScore.Score *= int64(podTopologySpreadWeight)
nodeScoreList[i] = nodeScore
nodeScores[nodeScore.Name] = float64(nodeScore.Score)
}

klog.V(4).Infof("pod topology spread Score for task %s/%s is: %v", pod.Namespace, pod.Name, nodeScores)
return nodeScores, nil
}

func (pp *nodeOrderPlugin) OnSessionClose(ssn *framework.Session) {
}
16 changes: 16 additions & 0 deletions pkg/scheduler/plugins/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone"

Expand Down Expand Up @@ -319,6 +320,11 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
// 7. VolumeZone
plugin, _ = volumezone.New(nil, handle)
volumeZoneFilter := plugin.(*volumezone.VolumeZone)
// 8. PodTopologySpread
// TODO: make this configurable? ref: https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/#internal-default-constraints
ptsArgs := &config.PodTopologySpreadArgs{DefaultingType: config.SystemDefaulting}
plugin, _ = podtopologyspread.New(ptsArgs, handle)
podTopologySpreadFilter := plugin.(*podtopologyspread.PodTopologySpread)

ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error {
nodeInfo, found := nodeMap[node.Name]
Expand Down Expand Up @@ -406,6 +412,16 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
return fmt.Errorf("plugin %s predicates failed %s", volumeZoneFilter.Name(), status.Message())
}

// Check PodTopologySpread
status = podTopologySpreadFilter.PreFilter(context.TODO(), state, task.Pod)
if !status.IsSuccess() {
return fmt.Errorf("plugin %s pre-predicates failed %s", podTopologySpreadFilter.Name(), status.Message())
}
status = podTopologySpreadFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
if !status.IsSuccess() {
return fmt.Errorf("plugin %s predicates failed %s", podTopologySpreadFilter.Name(), status.Message())
}

if predicate.gpuSharingEnable {
// CheckGPUSharingPredicate
fit, err := checkNodeGPUSharingPredicate(task.Pod, node)
Expand Down
Loading

0 comments on commit 0d0d76c

Please sign in to comment.