diff --git a/pkg/scheduler/plugins/nodeorder/nodeorder.go b/pkg/scheduler/plugins/nodeorder/nodeorder.go index bfd25ad3a30..1062ced0f59 100644 --- a/pkg/scheduler/plugins/nodeorder/nodeorder.go +++ b/pkg/scheduler/plugins/nodeorder/nodeorder.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration" "volcano.sh/volcano/pkg/scheduler/api" @@ -58,6 +59,8 @@ const ( TaintTolerationWeight = "tainttoleration.weight" // ImageLocalityWeight is the key for providing Image Locality Priority Weight in YAML ImageLocalityWeight = "imagelocality.weight" + // PodTopologySpreadWeight is the key for providing Pod Topology Spread Priority Weight in YAML + PodTopologySpreadWeight = "podtopologyspread.weight" ) type nodeOrderPlugin struct { @@ -75,13 +78,14 @@ func (pp *nodeOrderPlugin) Name() string { } type priorityWeight struct { - leastReqWeight int - mostReqWeight int - nodeAffinityWeight int - podAffinityWeight int - balancedResourceWeight int - taintTolerationWeight int - imageLocalityWeight int + leastReqWeight int + mostReqWeight int + nodeAffinityWeight int + podAffinityWeight int + balancedResourceWeight int + taintTolerationWeight int + imageLocalityWeight int + podTopologySpreadWeight int } // calculateWeight from the provided arguments. @@ -91,37 +95,39 @@ type priorityWeight struct { // // User should specify priority weights in the config in this format: // -// actions: "reclaim, allocate, backfill, preempt" -// tiers: -// - plugins: -// - name: priority -// - name: gang -// - name: conformance -// - plugins: -// - name: drf -// - name: predicates -// - name: proportion -// - name: nodeorder -// arguments: -// leastrequested.weight: 1 -// mostrequested.weight: 0 -// nodeaffinity.weight: 1 -// podaffinity.weight: 1 -// balancedresource.weight: 1 -// tainttoleration.weight: 1 -// imagelocality.weight: 1 +// actions: "reclaim, allocate, backfill, preempt" +// tiers: +// - plugins: +// - name: priority +// - name: gang +// - name: conformance +// - plugins: +// - name: drf +// - name: predicates +// - name: proportion +// - name: nodeorder +// arguments: +// leastrequested.weight: 1 +// mostrequested.weight: 0 +// nodeaffinity.weight: 1 +// podaffinity.weight: 1 +// balancedresource.weight: 1 +// tainttoleration.weight: 1 +// imagelocality.weight: 1 +// podtopologyspread.weight: 1 func calculateWeight(args framework.Arguments) priorityWeight { // Initial values for weights. // By default, for backward compatibility and for reasonable scores, // least requested priority is enabled and most requested priority is disabled. weight := priorityWeight{ - leastReqWeight: 1, - mostReqWeight: 0, - nodeAffinityWeight: 1, - podAffinityWeight: 1, - balancedResourceWeight: 1, - taintTolerationWeight: 1, - imageLocalityWeight: 1, + leastReqWeight: 1, + mostReqWeight: 0, + nodeAffinityWeight: 1, + podAffinityWeight: 1, + balancedResourceWeight: 1, + taintTolerationWeight: 1, + imageLocalityWeight: 1, + podTopologySpreadWeight: 1, } // Checks whether nodeaffinity.weight is provided or not, if given, modifies the value in weight struct. @@ -145,6 +151,8 @@ func calculateWeight(args framework.Arguments) priorityWeight { // Checks whether imagelocality.weight is provided or not, if given, modifies the value in weight struct. args.GetInt(&weight.imageLocalityWeight, ImageLocalityWeight) + // Checks whether podtopologyspread.weight is provided or not, if given, modifies the value in weight struct. + args.GetInt(&weight.podTopologySpreadWeight, PodTopologySpreadWeight) return weight } @@ -315,6 +323,12 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { p, _ = tainttoleration.New(nil, handle) taintToleration := p.(*tainttoleration.TaintToleration) + ptsArgs := &config.PodTopologySpreadArgs{ + DefaultingType: config.SystemDefaulting, + } + p, _ = podtopologyspread.New(ptsArgs, handle) + podTopologySpread := p.(*podtopologyspread.PodTopologySpread) + batchNodeOrderFn := func(task *api.TaskInfo, nodeInfo []*api.NodeInfo) (map[string]float64, error) { // InterPodAffinity state := k8sframework.NewCycleState() @@ -334,8 +348,13 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { return nil, err } + podTopologySpreadScores, err := podTopologySpreadScore(podTopologySpread, state, task.Pod, nodes, weight.podTopologySpreadWeight) + if err != nil { + return nil, err + } + for _, node := range nodes { - nodeScores[node.Name] = podAffinityScores[node.Name] + nodeTolerationScores[node.Name] + nodeScores[node.Name] = podAffinityScores[node.Name] + nodeTolerationScores[node.Name] + podTopologySpreadScores[node.Name] } klog.V(4).Infof("Batch Total Score for task %s/%s is: %v", task.Namespace, task.Name, nodeScores) @@ -356,7 +375,7 @@ func interPodAffinityScore( return nil, preScoreStatus.AsError() } - nodescoreList := make(k8sframework.NodeScoreList, len(nodes)) + nodeScoreList := make(k8sframework.NodeScoreList, len(nodes)) // the default parallelization worker number is 16. // the whole scoring will fail if one of the processes failed. // so just create a parallelizeContext to control the whole ParallelizeUntil process. @@ -380,7 +399,7 @@ func interPodAffinityScore( errCh <- fmt.Errorf("calculate inter pod affinity priority failed %v", status.Message()) return } - nodescoreList[index] = k8sframework.NodeScore{ + nodeScoreList[index] = k8sframework.NodeScore{ Name: nodeName, Score: s, } @@ -392,16 +411,16 @@ func interPodAffinityScore( default: } - interPodAffinity.NormalizeScore(context.TODO(), state, pod, nodescoreList) + interPodAffinity.NormalizeScore(context.TODO(), state, pod, nodeScoreList) nodeScores := make(map[string]float64, len(nodes)) - for i, nodeScore := range nodescoreList { + for i, nodeScore := range nodeScoreList { // return error if score plugin returns invalid score. if nodeScore.Score > k8sframework.MaxNodeScore || nodeScore.Score < k8sframework.MinNodeScore { return nil, fmt.Errorf("inter pod affinity returns an invalid score %v for node %s", nodeScore.Score, nodeScore.Name) } nodeScore.Score *= int64(podAffinityWeight) - nodescoreList[i] = nodeScore + nodeScoreList[i] = nodeScore nodeScores[nodeScore.Name] = float64(nodeScore.Score) } @@ -421,7 +440,7 @@ func taintTolerationScore( return nil, preScoreStatus.AsError() } - nodescoreList := make(k8sframework.NodeScoreList, len(nodes)) + nodeScoreList := make(k8sframework.NodeScoreList, len(nodes)) // size of errCh should be no less than parallelization number, see interPodAffinityScore. workerNum := 16 errCh := make(chan error, workerNum) @@ -438,7 +457,7 @@ func taintTolerationScore( errCh <- fmt.Errorf("calculate taint toleration priority failed %v", status.Message()) return } - nodescoreList[index] = k8sframework.NodeScore{ + nodeScoreList[index] = k8sframework.NodeScore{ Name: nodeName, Score: s, } @@ -450,16 +469,16 @@ func taintTolerationScore( default: } - taintToleration.NormalizeScore(context.TODO(), cycleState, pod, nodescoreList) + taintToleration.NormalizeScore(context.TODO(), cycleState, pod, nodeScoreList) nodeScores := make(map[string]float64, len(nodes)) - for i, nodeScore := range nodescoreList { + for i, nodeScore := range nodeScoreList { // return error if score plugin returns invalid score. if nodeScore.Score > k8sframework.MaxNodeScore || nodeScore.Score < k8sframework.MinNodeScore { return nil, fmt.Errorf("taint toleration returns an invalid score %v for node %s", nodeScore.Score, nodeScore.Name) } nodeScore.Score *= int64(taintTolerationWeight) - nodescoreList[i] = nodeScore + nodeScoreList[i] = nodeScore nodeScores[nodeScore.Name] = float64(nodeScore.Score) } @@ -467,5 +486,61 @@ func taintTolerationScore( return nodeScores, nil } +func podTopologySpreadScore( + podTopologySpread *podtopologyspread.PodTopologySpread, + cycleState *k8sframework.CycleState, + pod *v1.Pod, + nodes []*v1.Node, + podTopologySpreadWeight int, +) (map[string]float64, error) { + preScoreStatus := podTopologySpread.PreScore(context.TODO(), cycleState, pod, nodes) + if !preScoreStatus.IsSuccess() { + return nil, preScoreStatus.AsError() + } + + nodeScoreList := make(k8sframework.NodeScoreList, len(nodes)) + // size of errCh should be no less than parallelization number, see interPodAffinityScore. + workerNum := 16 + errCh := make(chan error, workerNum) + parallelizeContext, parallelizeCancel := context.WithCancel(context.TODO()) + workqueue.ParallelizeUntil(parallelizeContext, workerNum, len(nodes), func(index int) { + nodeName := nodes[index].Name + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s, status := podTopologySpread.Score(ctx, cycleState, pod, nodeName) + if !status.IsSuccess() { + parallelizeCancel() + errCh <- fmt.Errorf("calculate pod topology spread priority failed %v", status.Message()) + return + } + nodeScoreList[index] = k8sframework.NodeScore{ + Name: nodeName, + Score: s, + } + }) + + select { + case err := <-errCh: + return nil, err + default: + } + + podTopologySpread.NormalizeScore(context.TODO(), cycleState, pod, nodeScoreList) + + nodeScores := make(map[string]float64, len(nodes)) + for i, nodeScore := range nodeScoreList { + // return error if score plugin returns invalid score. + if nodeScore.Score > k8sframework.MaxNodeScore || nodeScore.Score < k8sframework.MinNodeScore { + return nil, fmt.Errorf("pod topology spread returns an invalid score %v for node %s", nodeScore.Score, nodeScore.Name) + } + nodeScore.Score *= int64(podTopologySpreadWeight) + nodeScoreList[i] = nodeScore + nodeScores[nodeScore.Name] = float64(nodeScore.Score) + } + + klog.V(4).Infof("pod topology spread Score for task %s/%s is: %v", pod.Namespace, pod.Name, nodeScores) + return nodeScores, nil +} + func (pp *nodeOrderPlugin) OnSessionClose(ssn *framework.Session) { } diff --git a/pkg/scheduler/plugins/predicates/predicates.go b/pkg/scheduler/plugins/predicates/predicates.go index 6766d83a599..d37669e49d7 100644 --- a/pkg/scheduler/plugins/predicates/predicates.go +++ b/pkg/scheduler/plugins/predicates/predicates.go @@ -33,6 +33,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone" @@ -319,6 +320,11 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { // 7. VolumeZone plugin, _ = volumezone.New(nil, handle) volumeZoneFilter := plugin.(*volumezone.VolumeZone) + // 8. PodTopologySpread + // TODO: make this configurable? ref: https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/#internal-default-constraints + ptsArgs := &config.PodTopologySpreadArgs{DefaultingType: config.SystemDefaulting} + plugin, _ = podtopologyspread.New(ptsArgs, handle) + podTopologySpreadFilter := plugin.(*podtopologyspread.PodTopologySpread) ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error { nodeInfo, found := nodeMap[node.Name] @@ -406,6 +412,16 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { return fmt.Errorf("plugin %s predicates failed %s", volumeZoneFilter.Name(), status.Message()) } + // Check PodTopologySpread + status = podTopologySpreadFilter.PreFilter(context.TODO(), state, task.Pod) + if !status.IsSuccess() { + return fmt.Errorf("plugin %s pre-predicates failed %s", podTopologySpreadFilter.Name(), status.Message()) + } + status = podTopologySpreadFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) + if !status.IsSuccess() { + return fmt.Errorf("plugin %s predicates failed %s", podTopologySpreadFilter.Name(), status.Message()) + } + if predicate.gpuSharingEnable { // CheckGPUSharingPredicate fit, err := checkNodeGPUSharingPredicate(task.Pod, node) diff --git a/vendor/k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread/common.go b/vendor/k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread/common.go new file mode 100644 index 00000000000..2c66b50d1f1 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread/common.go @@ -0,0 +1,99 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podtopologyspread + +import ( + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper" +) + +type topologyPair struct { + key string + value string +} + +// topologySpreadConstraint is an internal version for v1.TopologySpreadConstraint +// and where the selector is parsed. +// Fields are exported for comparison during testing. +type topologySpreadConstraint struct { + MaxSkew int32 + TopologyKey string + Selector labels.Selector +} + +// buildDefaultConstraints builds the constraints for a pod using +// .DefaultConstraints and the selectors from the services, replication +// controllers, replica sets and stateful sets that match the pod. +func (pl *PodTopologySpread) buildDefaultConstraints(p *v1.Pod, action v1.UnsatisfiableConstraintAction) ([]topologySpreadConstraint, error) { + constraints, err := filterTopologySpreadConstraints(pl.defaultConstraints, action) + if err != nil || len(constraints) == 0 { + return nil, err + } + selector := helper.DefaultSelector(p, pl.services, pl.replicationCtrls, pl.replicaSets, pl.statefulSets) + if selector.Empty() { + return nil, nil + } + for i := range constraints { + constraints[i].Selector = selector + } + return constraints, nil +} + +// nodeLabelsMatchSpreadConstraints checks if ALL topology keys in spread Constraints are present in node labels. +func nodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints []topologySpreadConstraint) bool { + for _, c := range constraints { + if _, ok := nodeLabels[c.TopologyKey]; !ok { + return false + } + } + return true +} + +func filterTopologySpreadConstraints(constraints []v1.TopologySpreadConstraint, action v1.UnsatisfiableConstraintAction) ([]topologySpreadConstraint, error) { + var result []topologySpreadConstraint + for _, c := range constraints { + if c.WhenUnsatisfiable == action { + selector, err := metav1.LabelSelectorAsSelector(c.LabelSelector) + if err != nil { + return nil, err + } + result = append(result, topologySpreadConstraint{ + MaxSkew: c.MaxSkew, + TopologyKey: c.TopologyKey, + Selector: selector, + }) + } + } + return result, nil +} + +func countPodsMatchSelector(podInfos []*framework.PodInfo, selector labels.Selector, ns string) int { + count := 0 + for _, p := range podInfos { + // Bypass terminating Pod (see #87621). + if p.Pod.DeletionTimestamp != nil || p.Pod.Namespace != ns { + continue + } + if selector.Matches(labels.Set(p.Pod.Labels)) { + count++ + } + } + return count +} diff --git a/vendor/k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go b/vendor/k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go new file mode 100644 index 00000000000..e8a75b5e186 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go @@ -0,0 +1,339 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podtopologyspread + +import ( + "context" + "fmt" + "math" + "sync/atomic" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +const preFilterStateKey = "PreFilter" + Name + +// preFilterState computed at PreFilter and used at Filter. +// It combines TpKeyToCriticalPaths and TpPairToMatchNum to represent: +// (1) critical paths where the least pods are matched on each spread constraint. +// (2) number of pods matched on each spread constraint. +// A nil preFilterState denotes it's not set at all (in PreFilter phase); +// An empty preFilterState object denotes it's a legit state and is set in PreFilter phase. +// Fields are exported for comparison during testing. +type preFilterState struct { + Constraints []topologySpreadConstraint + // We record 2 critical paths instead of all critical paths here. + // criticalPaths[0].MatchNum always holds the minimum matching number. + // criticalPaths[1].MatchNum is always greater or equal to criticalPaths[0].MatchNum, but + // it's not guaranteed to be the 2nd minimum match number. + TpKeyToCriticalPaths map[string]*criticalPaths + // TpPairToMatchNum is keyed with topologyPair, and valued with the number of matching pods. + TpPairToMatchNum map[topologyPair]*int32 +} + +// Clone makes a copy of the given state. +func (s *preFilterState) Clone() framework.StateData { + if s == nil { + return nil + } + copy := preFilterState{ + // Constraints are shared because they don't change. + Constraints: s.Constraints, + TpKeyToCriticalPaths: make(map[string]*criticalPaths, len(s.TpKeyToCriticalPaths)), + TpPairToMatchNum: make(map[topologyPair]*int32, len(s.TpPairToMatchNum)), + } + for tpKey, paths := range s.TpKeyToCriticalPaths { + copy.TpKeyToCriticalPaths[tpKey] = &criticalPaths{paths[0], paths[1]} + } + for tpPair, matchNum := range s.TpPairToMatchNum { + copyPair := topologyPair{key: tpPair.key, value: tpPair.value} + copyCount := *matchNum + copy.TpPairToMatchNum[copyPair] = ©Count + } + return © +} + +// CAVEAT: the reason that `[2]criticalPath` can work is based on the implementation of current +// preemption algorithm, in particular the following 2 facts: +// Fact 1: we only preempt pods on the same node, instead of pods on multiple nodes. +// Fact 2: each node is evaluated on a separate copy of the preFilterState during its preemption cycle. +// If we plan to turn to a more complex algorithm like "arbitrary pods on multiple nodes", this +// structure needs to be revisited. +// Fields are exported for comparison during testing. +type criticalPaths [2]struct { + // TopologyValue denotes the topology value mapping to topology key. + TopologyValue string + // MatchNum denotes the number of matching pods. + MatchNum int32 +} + +func newCriticalPaths() *criticalPaths { + return &criticalPaths{{MatchNum: math.MaxInt32}, {MatchNum: math.MaxInt32}} +} + +func (p *criticalPaths) update(tpVal string, num int32) { + // first verify if `tpVal` exists or not + i := -1 + if tpVal == p[0].TopologyValue { + i = 0 + } else if tpVal == p[1].TopologyValue { + i = 1 + } + + if i >= 0 { + // `tpVal` exists + p[i].MatchNum = num + if p[0].MatchNum > p[1].MatchNum { + // swap paths[0] and paths[1] + p[0], p[1] = p[1], p[0] + } + } else { + // `tpVal` doesn't exist + if num < p[0].MatchNum { + // update paths[1] with paths[0] + p[1] = p[0] + // update paths[0] + p[0].TopologyValue, p[0].MatchNum = tpVal, num + } else if num < p[1].MatchNum { + // update paths[1] + p[1].TopologyValue, p[1].MatchNum = tpVal, num + } + } +} + +func (s *preFilterState) updateWithPod(updatedPod, preemptorPod *v1.Pod, node *v1.Node, delta int32) { + if s == nil || updatedPod.Namespace != preemptorPod.Namespace || node == nil { + return + } + if !nodeLabelsMatchSpreadConstraints(node.Labels, s.Constraints) { + return + } + + podLabelSet := labels.Set(updatedPod.Labels) + for _, constraint := range s.Constraints { + if !constraint.Selector.Matches(podLabelSet) { + continue + } + + k, v := constraint.TopologyKey, node.Labels[constraint.TopologyKey] + pair := topologyPair{key: k, value: v} + *s.TpPairToMatchNum[pair] += delta + + s.TpKeyToCriticalPaths[k].update(v, *s.TpPairToMatchNum[pair]) + } +} + +// PreFilter invoked at the prefilter extension point. +func (pl *PodTopologySpread) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status { + s, err := pl.calPreFilterState(pod) + if err != nil { + return framework.AsStatus(err) + } + cycleState.Write(preFilterStateKey, s) + return nil +} + +// PreFilterExtensions returns prefilter extensions, pod add and remove. +func (pl *PodTopologySpread) PreFilterExtensions() framework.PreFilterExtensions { + return pl +} + +// AddPod from pre-computed data in cycleState. +func (pl *PodTopologySpread) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { + s, err := getPreFilterState(cycleState) + if err != nil { + return framework.AsStatus(err) + } + + s.updateWithPod(podInfoToAdd.Pod, podToSchedule, nodeInfo.Node(), 1) + return nil +} + +// RemovePod from pre-computed data in cycleState. +func (pl *PodTopologySpread) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { + s, err := getPreFilterState(cycleState) + if err != nil { + return framework.AsStatus(err) + } + + s.updateWithPod(podInfoToRemove.Pod, podToSchedule, nodeInfo.Node(), -1) + return nil +} + +// getPreFilterState fetches a pre-computed preFilterState. +func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) { + c, err := cycleState.Read(preFilterStateKey) + if err != nil { + // preFilterState doesn't exist, likely PreFilter wasn't invoked. + return nil, fmt.Errorf("reading %q from cycleState: %w", preFilterStateKey, err) + } + + s, ok := c.(*preFilterState) + if !ok { + return nil, fmt.Errorf("%+v convert to podtopologyspread.preFilterState error", c) + } + return s, nil +} + +// calPreFilterState computes preFilterState describing how pods are spread on topologies. +func (pl *PodTopologySpread) calPreFilterState(pod *v1.Pod) (*preFilterState, error) { + allNodes, err := pl.sharedLister.NodeInfos().List() + if err != nil { + return nil, fmt.Errorf("listing NodeInfos: %w", err) + } + var constraints []topologySpreadConstraint + if len(pod.Spec.TopologySpreadConstraints) > 0 { + // We have feature gating in APIServer to strip the spec + // so don't need to re-check feature gate, just check length of Constraints. + constraints, err = filterTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints, v1.DoNotSchedule) + if err != nil { + return nil, fmt.Errorf("obtaining pod's hard topology spread constraints: %w", err) + } + } else { + constraints, err = pl.buildDefaultConstraints(pod, v1.DoNotSchedule) + if err != nil { + return nil, fmt.Errorf("setting default hard topology spread constraints: %w", err) + } + } + if len(constraints) == 0 { + return &preFilterState{}, nil + } + + s := preFilterState{ + Constraints: constraints, + TpKeyToCriticalPaths: make(map[string]*criticalPaths, len(constraints)), + TpPairToMatchNum: make(map[topologyPair]*int32, sizeHeuristic(len(allNodes), constraints)), + } + requiredSchedulingTerm := nodeaffinity.GetRequiredNodeAffinity(pod) + for _, n := range allNodes { + node := n.Node() + if node == nil { + klog.ErrorS(nil, "Node not found") + continue + } + // In accordance to design, if NodeAffinity or NodeSelector is defined, + // spreading is applied to nodes that pass those filters. + // Ignore parsing errors for backwards compatibility. + match, _ := requiredSchedulingTerm.Match(node) + if !match { + continue + } + // Ensure current node's labels contains all topologyKeys in 'Constraints'. + if !nodeLabelsMatchSpreadConstraints(node.Labels, constraints) { + continue + } + for _, c := range constraints { + pair := topologyPair{key: c.TopologyKey, value: node.Labels[c.TopologyKey]} + s.TpPairToMatchNum[pair] = new(int32) + } + } + + processNode := func(i int) { + nodeInfo := allNodes[i] + node := nodeInfo.Node() + + for _, constraint := range constraints { + pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]} + tpCount := s.TpPairToMatchNum[pair] + if tpCount == nil { + continue + } + count := countPodsMatchSelector(nodeInfo.Pods, constraint.Selector, pod.Namespace) + atomic.AddInt32(tpCount, int32(count)) + } + } + pl.parallelizer.Until(context.Background(), len(allNodes), processNode) + + // calculate min match for each topology pair + for i := 0; i < len(constraints); i++ { + key := constraints[i].TopologyKey + s.TpKeyToCriticalPaths[key] = newCriticalPaths() + } + for pair, num := range s.TpPairToMatchNum { + s.TpKeyToCriticalPaths[pair.key].update(pair.value, *num) + } + + return &s, nil +} + +// Filter invoked at the filter extension point. +func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { + node := nodeInfo.Node() + if node == nil { + return framework.AsStatus(fmt.Errorf("node not found")) + } + + s, err := getPreFilterState(cycleState) + if err != nil { + return framework.AsStatus(err) + } + + // However, "empty" preFilterState is legit which tolerates every toSchedule Pod. + if len(s.Constraints) == 0 { + return nil + } + + podLabelSet := labels.Set(pod.Labels) + for _, c := range s.Constraints { + tpKey := c.TopologyKey + tpVal, ok := node.Labels[c.TopologyKey] + if !ok { + klog.V(5).InfoS("Node doesn't have required label", "node", klog.KObj(node), "label", tpKey) + return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNodeLabelNotMatch) + } + + selfMatchNum := int32(0) + if c.Selector.Matches(podLabelSet) { + selfMatchNum = 1 + } + + pair := topologyPair{key: tpKey, value: tpVal} + paths, ok := s.TpKeyToCriticalPaths[tpKey] + if !ok { + // error which should not happen + klog.ErrorS(nil, "Internal error occurred while retrieving paths from topology key", "topologyKey", tpKey, "paths", s.TpKeyToCriticalPaths) + continue + } + // judging criteria: + // 'existing matching num' + 'if self-match (1 or 0)' - 'global min matching num' <= 'maxSkew' + minMatchNum := paths[0].MatchNum + matchNum := int32(0) + if tpCount := s.TpPairToMatchNum[pair]; tpCount != nil { + matchNum = *tpCount + } + skew := matchNum + selfMatchNum - minMatchNum + if skew > c.MaxSkew { + klog.V(5).InfoS("Node failed spreadConstraint: matchNum + selfMatchNum - minMatchNum > maxSkew", "node", klog.KObj(node), "topologyKey", tpKey, "matchNum", matchNum, "selfMatchNum", selfMatchNum, "minMatchNum", minMatchNum, "maxSkew", c.MaxSkew) + return framework.NewStatus(framework.Unschedulable, ErrReasonConstraintsNotMatch) + } + } + + return nil +} + +func sizeHeuristic(nodes int, constraints []topologySpreadConstraint) int { + for _, c := range constraints { + if c.TopologyKey == v1.LabelHostname { + return nodes + } + } + return 0 +} diff --git a/vendor/k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go b/vendor/k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go new file mode 100644 index 00000000000..568c1cc77ea --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go @@ -0,0 +1,143 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podtopologyspread + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" + appslisters "k8s.io/client-go/listers/apps/v1" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/kubernetes/pkg/scheduler/apis/config" + "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" +) + +const ( + // ErrReasonConstraintsNotMatch is used for PodTopologySpread filter error. + ErrReasonConstraintsNotMatch = "node(s) didn't match pod topology spread constraints" + // ErrReasonNodeLabelNotMatch is used when the node doesn't hold the required label. + ErrReasonNodeLabelNotMatch = ErrReasonConstraintsNotMatch + " (missing required label)" +) + +var systemDefaultConstraints = []v1.TopologySpreadConstraint{ + { + TopologyKey: v1.LabelHostname, + WhenUnsatisfiable: v1.ScheduleAnyway, + MaxSkew: 3, + }, + { + TopologyKey: v1.LabelTopologyZone, + WhenUnsatisfiable: v1.ScheduleAnyway, + MaxSkew: 5, + }, +} + +// PodTopologySpread is a plugin that ensures pod's topologySpreadConstraints is satisfied. +type PodTopologySpread struct { + systemDefaulted bool + parallelizer parallelize.Parallelizer + defaultConstraints []v1.TopologySpreadConstraint + sharedLister framework.SharedLister + services corelisters.ServiceLister + replicationCtrls corelisters.ReplicationControllerLister + replicaSets appslisters.ReplicaSetLister + statefulSets appslisters.StatefulSetLister +} + +var _ framework.PreFilterPlugin = &PodTopologySpread{} +var _ framework.FilterPlugin = &PodTopologySpread{} +var _ framework.PreScorePlugin = &PodTopologySpread{} +var _ framework.ScorePlugin = &PodTopologySpread{} +var _ framework.EnqueueExtensions = &PodTopologySpread{} + +const ( + // Name is the name of the plugin used in the plugin registry and configurations. + Name = names.PodTopologySpread +) + +// Name returns name of the plugin. It is used in logs, etc. +func (pl *PodTopologySpread) Name() string { + return Name +} + +// New initializes a new plugin and returns it. +func New(plArgs runtime.Object, h framework.Handle) (framework.Plugin, error) { + if h.SnapshotSharedLister() == nil { + return nil, fmt.Errorf("SnapshotSharedlister is nil") + } + args, err := getArgs(plArgs) + if err != nil { + return nil, err + } + if err := validation.ValidatePodTopologySpreadArgs(nil, &args); err != nil { + return nil, err + } + pl := &PodTopologySpread{ + parallelizer: h.Parallelizer(), + sharedLister: h.SnapshotSharedLister(), + defaultConstraints: args.DefaultConstraints, + } + if args.DefaultingType == config.SystemDefaulting { + pl.defaultConstraints = systemDefaultConstraints + pl.systemDefaulted = true + } + if len(pl.defaultConstraints) != 0 { + if h.SharedInformerFactory() == nil { + return nil, fmt.Errorf("SharedInformerFactory is nil") + } + pl.setListers(h.SharedInformerFactory()) + } + return pl, nil +} + +func getArgs(obj runtime.Object) (config.PodTopologySpreadArgs, error) { + ptr, ok := obj.(*config.PodTopologySpreadArgs) + if !ok { + return config.PodTopologySpreadArgs{}, fmt.Errorf("want args to be of type PodTopologySpreadArgs, got %T", obj) + } + return *ptr, nil +} + +func (pl *PodTopologySpread) setListers(factory informers.SharedInformerFactory) { + pl.services = factory.Core().V1().Services().Lister() + pl.replicationCtrls = factory.Core().V1().ReplicationControllers().Lister() + pl.replicaSets = factory.Apps().V1().ReplicaSets().Lister() + pl.statefulSets = factory.Apps().V1().StatefulSets().Lister() +} + +// EventsToRegister returns the possible events that may make a Pod +// failed by this plugin schedulable. +func (pl *PodTopologySpread) EventsToRegister() []framework.ClusterEvent { + return []framework.ClusterEvent{ + // All ActionType includes the following events: + // - Add. An unschedulable Pod may fail due to violating topology spread constraints, + // adding an assigned Pod may make it schedulable. + // - Update. Updating on an existing Pod's labels (e.g., removal) may make + // an unschedulable Pod schedulable. + // - Delete. An unschedulable Pod may fail due to violating an existing Pod's topology spread constraints, + // deleting an existing Pod may make it schedulable. + {Resource: framework.Pod, ActionType: framework.All}, + // Node add|delete|updateLabel maybe lead an topology key changed, + // and make these pod in scheduling schedulable or unschedulable. + {Resource: framework.Node, ActionType: framework.Add | framework.Delete | framework.UpdateNodeLabel}, + } +} diff --git a/vendor/k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread/scoring.go b/vendor/k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread/scoring.go new file mode 100644 index 00000000000..5bc938f028e --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread/scoring.go @@ -0,0 +1,294 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podtopologyspread + +import ( + "context" + "fmt" + "math" + "sync/atomic" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +const preScoreStateKey = "PreScore" + Name +const invalidScore = -1 + +// preScoreState computed at PreScore and used at Score. +// Fields are exported for comparison during testing. +type preScoreState struct { + Constraints []topologySpreadConstraint + // IgnoredNodes is a set of node names which miss some Constraints[*].topologyKey. + IgnoredNodes sets.String + // TopologyPairToPodCounts is keyed with topologyPair, and valued with the number of matching pods. + TopologyPairToPodCounts map[topologyPair]*int64 + // TopologyNormalizingWeight is the weight we give to the counts per topology. + // This allows the pod counts of smaller topologies to not be watered down by + // bigger ones. + TopologyNormalizingWeight []float64 +} + +// Clone implements the mandatory Clone interface. We don't really copy the data since +// there is no need for that. +func (s *preScoreState) Clone() framework.StateData { + return s +} + +// initPreScoreState iterates "filteredNodes" to filter out the nodes which +// don't have required topologyKey(s), and initialize: +// 1) s.TopologyPairToPodCounts: keyed with both eligible topology pair and node names. +// 2) s.IgnoredNodes: the set of nodes that shouldn't be scored. +// 3) s.TopologyNormalizingWeight: The weight to be given to each constraint based on the number of values in a topology. +func (pl *PodTopologySpread) initPreScoreState(s *preScoreState, pod *v1.Pod, filteredNodes []*v1.Node, requireAllTopologies bool) error { + var err error + if len(pod.Spec.TopologySpreadConstraints) > 0 { + s.Constraints, err = filterTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints, v1.ScheduleAnyway) + if err != nil { + return fmt.Errorf("obtaining pod's soft topology spread constraints: %w", err) + } + } else { + s.Constraints, err = pl.buildDefaultConstraints(pod, v1.ScheduleAnyway) + if err != nil { + return fmt.Errorf("setting default soft topology spread constraints: %w", err) + } + } + if len(s.Constraints) == 0 { + return nil + } + topoSize := make([]int, len(s.Constraints)) + for _, node := range filteredNodes { + if requireAllTopologies && !nodeLabelsMatchSpreadConstraints(node.Labels, s.Constraints) { + // Nodes which don't have all required topologyKeys present are ignored + // when scoring later. + s.IgnoredNodes.Insert(node.Name) + continue + } + for i, constraint := range s.Constraints { + // per-node counts are calculated during Score. + if constraint.TopologyKey == v1.LabelHostname { + continue + } + pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]} + if s.TopologyPairToPodCounts[pair] == nil { + s.TopologyPairToPodCounts[pair] = new(int64) + topoSize[i]++ + } + } + } + + s.TopologyNormalizingWeight = make([]float64, len(s.Constraints)) + for i, c := range s.Constraints { + sz := topoSize[i] + if c.TopologyKey == v1.LabelHostname { + sz = len(filteredNodes) - len(s.IgnoredNodes) + } + s.TopologyNormalizingWeight[i] = topologyNormalizingWeight(sz) + } + return nil +} + +// PreScore builds and writes cycle state used by Score and NormalizeScore. +func (pl *PodTopologySpread) PreScore( + ctx context.Context, + cycleState *framework.CycleState, + pod *v1.Pod, + filteredNodes []*v1.Node, +) *framework.Status { + allNodes, err := pl.sharedLister.NodeInfos().List() + if err != nil { + return framework.AsStatus(fmt.Errorf("getting all nodes: %w", err)) + } + + if len(filteredNodes) == 0 || len(allNodes) == 0 { + // No nodes to score. + return nil + } + + state := &preScoreState{ + IgnoredNodes: sets.NewString(), + TopologyPairToPodCounts: make(map[topologyPair]*int64), + } + // Only require that nodes have all the topology labels if using + // non-system-default spreading rules. This allows nodes that don't have a + // zone label to still have hostname spreading. + requireAllTopologies := len(pod.Spec.TopologySpreadConstraints) > 0 || !pl.systemDefaulted + err = pl.initPreScoreState(state, pod, filteredNodes, requireAllTopologies) + if err != nil { + return framework.AsStatus(fmt.Errorf("calculating preScoreState: %w", err)) + } + + // return if incoming pod doesn't have soft topology spread Constraints. + if len(state.Constraints) == 0 { + cycleState.Write(preScoreStateKey, state) + return nil + } + + // Ignore parsing errors for backwards compatibility. + requiredNodeAffinity := nodeaffinity.GetRequiredNodeAffinity(pod) + processAllNode := func(i int) { + nodeInfo := allNodes[i] + node := nodeInfo.Node() + if node == nil { + return + } + // (1) `node` should satisfy incoming pod's NodeSelector/NodeAffinity + // (2) All topologyKeys need to be present in `node` + match, _ := requiredNodeAffinity.Match(node) + if !match || (requireAllTopologies && !nodeLabelsMatchSpreadConstraints(node.Labels, state.Constraints)) { + return + } + + for _, c := range state.Constraints { + pair := topologyPair{key: c.TopologyKey, value: node.Labels[c.TopologyKey]} + // If current topology pair is not associated with any candidate node, + // continue to avoid unnecessary calculation. + // Per-node counts are also skipped, as they are done during Score. + tpCount := state.TopologyPairToPodCounts[pair] + if tpCount == nil { + continue + } + count := countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace) + atomic.AddInt64(tpCount, int64(count)) + } + } + pl.parallelizer.Until(ctx, len(allNodes), processAllNode) + + cycleState.Write(preScoreStateKey, state) + return nil +} + +// Score invoked at the Score extension point. +// The "score" returned in this function is the matching number of pods on the `nodeName`, +// it is normalized later. +func (pl *PodTopologySpread) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { + nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName) + if err != nil { + return 0, framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", nodeName, err)) + } + + node := nodeInfo.Node() + s, err := getPreScoreState(cycleState) + if err != nil { + return 0, framework.AsStatus(err) + } + + // Return if the node is not qualified. + if s.IgnoredNodes.Has(node.Name) { + return 0, nil + } + + // For each present , current node gets a credit of . + // And we sum up and return it as this node's score. + var score float64 + for i, c := range s.Constraints { + if tpVal, ok := node.Labels[c.TopologyKey]; ok { + var cnt int64 + if c.TopologyKey == v1.LabelHostname { + cnt = int64(countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace)) + } else { + pair := topologyPair{key: c.TopologyKey, value: tpVal} + cnt = *s.TopologyPairToPodCounts[pair] + } + score += scoreForCount(cnt, c.MaxSkew, s.TopologyNormalizingWeight[i]) + } + } + return int64(score), nil +} + +// NormalizeScore invoked after scoring all nodes. +func (pl *PodTopologySpread) NormalizeScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status { + s, err := getPreScoreState(cycleState) + if err != nil { + return framework.AsStatus(err) + } + if s == nil { + return nil + } + + // Calculate and + var minScore int64 = math.MaxInt64 + var maxScore int64 + for i, score := range scores { + // it's mandatory to check if is present in m.IgnoredNodes + if s.IgnoredNodes.Has(score.Name) { + scores[i].Score = invalidScore + continue + } + if score.Score < minScore { + minScore = score.Score + } + if score.Score > maxScore { + maxScore = score.Score + } + } + + for i := range scores { + if scores[i].Score == invalidScore { + scores[i].Score = 0 + continue + } + if maxScore == 0 { + scores[i].Score = framework.MaxNodeScore + continue + } + s := scores[i].Score + scores[i].Score = framework.MaxNodeScore * (maxScore + minScore - s) / maxScore + } + return nil +} + +// ScoreExtensions of the Score plugin. +func (pl *PodTopologySpread) ScoreExtensions() framework.ScoreExtensions { + return pl +} + +func getPreScoreState(cycleState *framework.CycleState) (*preScoreState, error) { + c, err := cycleState.Read(preScoreStateKey) + if err != nil { + return nil, fmt.Errorf("error reading %q from cycleState: %w", preScoreStateKey, err) + } + + s, ok := c.(*preScoreState) + if !ok { + return nil, fmt.Errorf("%+v convert to podtopologyspread.preScoreState error", c) + } + return s, nil +} + +// topologyNormalizingWeight calculates the weight for the topology, based on +// the number of values that exist for a topology. +// Since is at least 1 (all nodes that passed the Filters are in the +// same topology), and k8s supports 5k nodes, the result is in the interval +// <1.09, 8.52>. +// +// Note: could also be zero when no nodes have the required topologies, +// however we don't care about topology weight in this case as we return a 0 +// score for all nodes. +func topologyNormalizingWeight(size int) float64 { + return math.Log(float64(size + 2)) +} + +// scoreForCount calculates the score based on number of matching pods in a +// topology domain, the constraint's maxSkew and the topology weight. +// `maxSkew-1` is added to the score so that differences between topology +// domains get watered down, controlling the tolerance of the score to skews. +func scoreForCount(cnt int64, maxSkew int32, tpWeight float64) float64 { + return float64(cnt)*tpWeight + float64(maxSkew-1) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index bc3481bca93..9840219e6b0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -996,6 +996,7 @@ k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits +k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding/metrics