Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add podTopologySpread plugin #2487

Merged
merged 1 commit into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 119 additions & 44 deletions pkg/scheduler/plugins/nodeorder/nodeorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"

"volcano.sh/volcano/pkg/scheduler/api"
Expand All @@ -58,6 +59,8 @@ const (
TaintTolerationWeight = "tainttoleration.weight"
// ImageLocalityWeight is the key for providing Image Locality Priority Weight in YAML
ImageLocalityWeight = "imagelocality.weight"
// PodTopologySpreadWeight is the key for providing Pod Topology Spread Priority Weight in YAML
PodTopologySpreadWeight = "podtopologyspread.weight"
)

type nodeOrderPlugin struct {
Expand All @@ -75,13 +78,14 @@ func (pp *nodeOrderPlugin) Name() string {
}

type priorityWeight struct {
leastReqWeight int
mostReqWeight int
nodeAffinityWeight int
podAffinityWeight int
balancedResourceWeight int
taintTolerationWeight int
imageLocalityWeight int
leastReqWeight int
mostReqWeight int
nodeAffinityWeight int
podAffinityWeight int
balancedResourceWeight int
taintTolerationWeight int
imageLocalityWeight int
podTopologySpreadWeight int
}

// calculateWeight from the provided arguments.
Expand All @@ -91,37 +95,39 @@ type priorityWeight struct {
//
// User should specify priority weights in the config in this format:
//
// actions: "reclaim, allocate, backfill, preempt"
// tiers:
// - plugins:
// - name: priority
// - name: gang
// - name: conformance
// - plugins:
// - name: drf
// - name: predicates
// - name: proportion
// - name: nodeorder
// arguments:
// leastrequested.weight: 1
// mostrequested.weight: 0
// nodeaffinity.weight: 1
// podaffinity.weight: 1
// balancedresource.weight: 1
// tainttoleration.weight: 1
// imagelocality.weight: 1
// actions: "reclaim, allocate, backfill, preempt"
// tiers:
// - plugins:
// - name: priority
// - name: gang
// - name: conformance
// - plugins:
// - name: drf
// - name: predicates
// - name: proportion
// - name: nodeorder
// arguments:
// leastrequested.weight: 1
// mostrequested.weight: 0
// nodeaffinity.weight: 1
// podaffinity.weight: 1
// balancedresource.weight: 1
// tainttoleration.weight: 1
// imagelocality.weight: 1
// podtopologyspread.weight: 2
func calculateWeight(args framework.Arguments) priorityWeight {
// Initial values for weights.
// By default, for backward compatibility and for reasonable scores,
// least requested priority is enabled and most requested priority is disabled.
weight := priorityWeight{
leastReqWeight: 1,
mostReqWeight: 0,
nodeAffinityWeight: 1,
podAffinityWeight: 1,
balancedResourceWeight: 1,
taintTolerationWeight: 1,
imageLocalityWeight: 1,
leastReqWeight: 1,
mostReqWeight: 0,
nodeAffinityWeight: 1,
podAffinityWeight: 1,
balancedResourceWeight: 1,
taintTolerationWeight: 1,
imageLocalityWeight: 1,
podTopologySpreadWeight: 2, // be consistent with kubernetes default setting.
}

// Checks whether nodeaffinity.weight is provided or not, if given, modifies the value in weight struct.
Expand All @@ -145,6 +151,8 @@ func calculateWeight(args framework.Arguments) priorityWeight {
// Checks whether imagelocality.weight is provided or not, if given, modifies the value in weight struct.
args.GetInt(&weight.imageLocalityWeight, ImageLocalityWeight)

// Checks whether podtopologyspread.weight is provided or not, if given, modifies the value in weight struct.
args.GetInt(&weight.podTopologySpreadWeight, PodTopologySpreadWeight)
return weight
}

Expand Down Expand Up @@ -315,6 +323,12 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
p, _ = tainttoleration.New(nil, handle)
taintToleration := p.(*tainttoleration.TaintToleration)

ptsArgs := &config.PodTopologySpreadArgs{
DefaultingType: config.SystemDefaulting,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make clear whether the config.SystemDefaulting impact batch workload without configuring topologySpread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This default policy has action ScheduleAnyway when unsatisfiable, so it will not affect filter process, and the default maxSkew value has a little affect on score process.

}
p, _ = podtopologyspread.New(ptsArgs, handle)
podTopologySpread := p.(*podtopologyspread.PodTopologySpread)

batchNodeOrderFn := func(task *api.TaskInfo, nodeInfo []*api.NodeInfo) (map[string]float64, error) {
// InterPodAffinity
state := k8sframework.NewCycleState()
Expand All @@ -334,8 +348,13 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
return nil, err
}

podTopologySpreadScores, err := podTopologySpreadScore(podTopologySpread, state, task.Pod, nodes, weight.podTopologySpreadWeight)
if err != nil {
return nil, err
}

for _, node := range nodes {
nodeScores[node.Name] = podAffinityScores[node.Name] + nodeTolerationScores[node.Name]
nodeScores[node.Name] = podAffinityScores[node.Name] + nodeTolerationScores[node.Name] + podTopologySpreadScores[node.Name]
}

klog.V(4).Infof("Batch Total Score for task %s/%s is: %v", task.Namespace, task.Name, nodeScores)
Expand All @@ -356,7 +375,7 @@ func interPodAffinityScore(
return nil, preScoreStatus.AsError()
}

nodescoreList := make(k8sframework.NodeScoreList, len(nodes))
nodeScoreList := make(k8sframework.NodeScoreList, len(nodes))
// the default parallelization worker number is 16.
// the whole scoring will fail if one of the processes failed.
// so just create a parallelizeContext to control the whole ParallelizeUntil process.
Expand All @@ -380,7 +399,7 @@ func interPodAffinityScore(
errCh <- fmt.Errorf("calculate inter pod affinity priority failed %v", status.Message())
return
}
nodescoreList[index] = k8sframework.NodeScore{
nodeScoreList[index] = k8sframework.NodeScore{
Name: nodeName,
Score: s,
}
Expand All @@ -392,16 +411,16 @@ func interPodAffinityScore(
default:
}

interPodAffinity.NormalizeScore(context.TODO(), state, pod, nodescoreList)
interPodAffinity.NormalizeScore(context.TODO(), state, pod, nodeScoreList)

nodeScores := make(map[string]float64, len(nodes))
for i, nodeScore := range nodescoreList {
for i, nodeScore := range nodeScoreList {
// return error if score plugin returns invalid score.
if nodeScore.Score > k8sframework.MaxNodeScore || nodeScore.Score < k8sframework.MinNodeScore {
return nil, fmt.Errorf("inter pod affinity returns an invalid score %v for node %s", nodeScore.Score, nodeScore.Name)
}
nodeScore.Score *= int64(podAffinityWeight)
nodescoreList[i] = nodeScore
nodeScoreList[i] = nodeScore
nodeScores[nodeScore.Name] = float64(nodeScore.Score)
}

Expand All @@ -421,7 +440,7 @@ func taintTolerationScore(
return nil, preScoreStatus.AsError()
}

nodescoreList := make(k8sframework.NodeScoreList, len(nodes))
nodeScoreList := make(k8sframework.NodeScoreList, len(nodes))
// size of errCh should be no less than parallelization number, see interPodAffinityScore.
workerNum := 16
errCh := make(chan error, workerNum)
Expand All @@ -438,7 +457,7 @@ func taintTolerationScore(
errCh <- fmt.Errorf("calculate taint toleration priority failed %v", status.Message())
return
}
nodescoreList[index] = k8sframework.NodeScore{
nodeScoreList[index] = k8sframework.NodeScore{
Name: nodeName,
Score: s,
}
Expand All @@ -450,22 +469,78 @@ func taintTolerationScore(
default:
}

taintToleration.NormalizeScore(context.TODO(), cycleState, pod, nodescoreList)
taintToleration.NormalizeScore(context.TODO(), cycleState, pod, nodeScoreList)

nodeScores := make(map[string]float64, len(nodes))
for i, nodeScore := range nodescoreList {
for i, nodeScore := range nodeScoreList {
// return error if score plugin returns invalid score.
if nodeScore.Score > k8sframework.MaxNodeScore || nodeScore.Score < k8sframework.MinNodeScore {
return nil, fmt.Errorf("taint toleration returns an invalid score %v for node %s", nodeScore.Score, nodeScore.Name)
}
nodeScore.Score *= int64(taintTolerationWeight)
nodescoreList[i] = nodeScore
nodeScoreList[i] = nodeScore
nodeScores[nodeScore.Name] = float64(nodeScore.Score)
}

klog.V(4).Infof("taint toleration Score for task %s/%s is: %v", pod.Namespace, pod.Name, nodeScores)
return nodeScores, nil
}

func podTopologySpreadScore(
podTopologySpread *podtopologyspread.PodTopologySpread,
cycleState *k8sframework.CycleState,
pod *v1.Pod,
nodes []*v1.Node,
podTopologySpreadWeight int,
) (map[string]float64, error) {
preScoreStatus := podTopologySpread.PreScore(context.TODO(), cycleState, pod, nodes)
if !preScoreStatus.IsSuccess() {
return nil, preScoreStatus.AsError()
}

nodeScoreList := make(k8sframework.NodeScoreList, len(nodes))
// size of errCh should be no less than parallelization number, see interPodAffinityScore.
workerNum := 16
errCh := make(chan error, workerNum)
parallelizeContext, parallelizeCancel := context.WithCancel(context.TODO())
workqueue.ParallelizeUntil(parallelizeContext, workerNum, len(nodes), func(index int) {
nodeName := nodes[index].Name
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s, status := podTopologySpread.Score(ctx, cycleState, pod, nodeName)
if !status.IsSuccess() {
parallelizeCancel()
errCh <- fmt.Errorf("calculate pod topology spread priority failed %v", status.Message())
return
}
nodeScoreList[index] = k8sframework.NodeScore{
Name: nodeName,
Score: s,
}
})

select {
case err := <-errCh:
return nil, err
default:
}

podTopologySpread.NormalizeScore(context.TODO(), cycleState, pod, nodeScoreList)

nodeScores := make(map[string]float64, len(nodes))
for i, nodeScore := range nodeScoreList {
// return error if score plugin returns invalid score.
if nodeScore.Score > k8sframework.MaxNodeScore || nodeScore.Score < k8sframework.MinNodeScore {
return nil, fmt.Errorf("pod topology spread returns an invalid score %v for node %s", nodeScore.Score, nodeScore.Name)
}
nodeScore.Score *= int64(podTopologySpreadWeight)
nodeScoreList[i] = nodeScore
nodeScores[nodeScore.Name] = float64(nodeScore.Score)
}

klog.V(4).Infof("pod topology spread Score for task %s/%s is: %v", pod.Namespace, pod.Name, nodeScores)
return nodeScores, nil
}

func (pp *nodeOrderPlugin) OnSessionClose(ssn *framework.Session) {
}
16 changes: 16 additions & 0 deletions pkg/scheduler/plugins/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone"

Expand Down Expand Up @@ -319,6 +320,11 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
// 7. VolumeZone
plugin, _ = volumezone.New(nil, handle)
volumeZoneFilter := plugin.(*volumezone.VolumeZone)
// 8. PodTopologySpread
// Setting cluster level default constraints is not support for now.
ptsArgs := &config.PodTopologySpreadArgs{DefaultingType: config.SystemDefaulting}
plugin, _ = podtopologyspread.New(ptsArgs, handle)
podTopologySpreadFilter := plugin.(*podtopologyspread.PodTopologySpread)

ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error {
nodeInfo, found := nodeMap[node.Name]
Expand Down Expand Up @@ -406,6 +412,16 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
return fmt.Errorf("plugin %s predicates failed %s", volumeZoneFilter.Name(), status.Message())
}

// Check PodTopologySpread
status = podTopologySpreadFilter.PreFilter(context.TODO(), state, task.Pod)
if !status.IsSuccess() {
return fmt.Errorf("plugin %s pre-predicates failed %s", podTopologySpreadFilter.Name(), status.Message())
}
status = podTopologySpreadFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
if !status.IsSuccess() {
return fmt.Errorf("plugin %s predicates failed %s", podTopologySpreadFilter.Name(), status.Message())
}

if predicate.gpuSharingEnable {
// CheckGPUSharingPredicate
fit, err := checkNodeGPUSharingPredicate(task.Pod, node)
Expand Down
Loading