From 9dc99145c5eb552c0bcc80741e324be913c63d09 Mon Sep 17 00:00:00 2001 From: xilinxing Date: Wed, 21 Sep 2022 11:22:38 +0800 Subject: [PATCH] add selector spread plugin - fix plugin use listers listing resource failed Signed-off-by: xilinxing --- .../chart/volcano/templates/scheduler.yaml | 9 +- installer/volcano-development-arm64.yaml | 10 +- installer/volcano-development.yaml | 9 +- pkg/scheduler/cache/cache.go | 9 + pkg/scheduler/plugins/nodeorder/nodeorder.go | 79 +++++- .../node/topology/helpers.go | 57 +++++ .../plugins/selectorspread/selector_spread.go | 234 ++++++++++++++++++ vendor/modules.txt | 2 + 8 files changed, 399 insertions(+), 10 deletions(-) create mode 100644 vendor/k8s.io/component-helpers/node/topology/helpers.go create mode 100644 vendor/k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread/selector_spread.go diff --git a/installer/helm/chart/volcano/templates/scheduler.yaml b/installer/helm/chart/volcano/templates/scheduler.yaml index 650390bff5..6d0a234f70 100644 --- a/installer/helm/chart/volcano/templates/scheduler.yaml +++ b/installer/helm/chart/volcano/templates/scheduler.yaml @@ -43,8 +43,8 @@ rules: resources: ["persistentvolumes"] verbs: ["list", "watch"] - apiGroups: [""] - resources: ["namespaces"] - verbs: ["list", "watch"] + resources: ["namespaces", "services", "replicationcontrollers"] + verbs: ["list", "watch", "get"] - apiGroups: [""] resources: ["resourcequotas"] verbs: ["list", "watch"] @@ -72,6 +72,9 @@ rules: - apiGroups: [""] resources: ["configmaps"] verbs: ["get", "create", "delete", "update"] + - apiGroups: ["apps"] + resources: ["daemonsets", "replicasets", "statefulsets"] + verbs: ["list", "watch", "get"] --- kind: ClusterRoleBinding @@ -138,7 +141,7 @@ metadata: prometheus.io/port: "8080" prometheus.io/scrape: "true" name: {{ .Release.Name }}-scheduler-service - namespace: {{ .Release.Namespace }} + namespace: {{ .Release.Namespace }} spec: ports: - port: 8080 diff --git a/installer/volcano-development-arm64.yaml b/installer/volcano-development-arm64.yaml index 53928962cc..c998843590 100644 --- a/installer/volcano-development-arm64.yaml +++ b/installer/volcano-development-arm64.yaml @@ -134,6 +134,7 @@ spec: priorityClassName: system-cluster-critical containers: - args: + - --enabled-admission=/jobs/mutate,/jobs/validate,/podgroups/mutate,/pods/validate,/pods/mutate,/queues/mutate,/queues/validate - --tls-cert-file=/admission.local.config/certificates/tls.crt - --tls-private-key-file=/admission.local.config/certificates/tls.key - --ca-cert-file=/admission.local.config/certificates/ca.crt @@ -8565,8 +8566,8 @@ rules: resources: ["persistentvolumes"] verbs: ["list", "watch"] - apiGroups: [""] - resources: ["namespaces"] - verbs: ["list", "watch"] + resources: ["namespaces", "services", "replicationcontrollers"] + verbs: ["list", "watch", "get"] - apiGroups: [""] resources: ["resourcequotas"] verbs: ["list", "watch"] @@ -8594,6 +8595,9 @@ rules: - apiGroups: [""] resources: ["configmaps"] verbs: ["get", "create", "delete", "update"] + - apiGroups: ["apps"] + resources: ["daemonsets", "replicasets", "statefulsets"] + verbs: ["list", "watch", "get"] --- # Source: volcano/templates/scheduler.yaml kind: ClusterRoleBinding @@ -8618,7 +8622,7 @@ metadata: prometheus.io/port: "8080" prometheus.io/scrape: "true" name: volcano-scheduler-service - namespace: volcano-system + namespace: volcano-system spec: ports: - port: 8080 diff --git a/installer/volcano-development.yaml b/installer/volcano-development.yaml index 6431557a62..9b8eef6199 100644 --- a/installer/volcano-development.yaml +++ b/installer/volcano-development.yaml @@ -8566,8 +8566,8 @@ rules: resources: ["persistentvolumes"] verbs: ["list", "watch"] - apiGroups: [""] - resources: ["namespaces"] - verbs: ["list", "watch"] + resources: ["namespaces", "services", "replicationcontrollers"] + verbs: ["list", "watch", "get"] - apiGroups: [""] resources: ["resourcequotas"] verbs: ["list", "watch"] @@ -8595,6 +8595,9 @@ rules: - apiGroups: [""] resources: ["configmaps"] verbs: ["get", "create", "delete", "update"] + - apiGroups: ["apps"] + resources: ["daemonsets", "replicasets", "statefulsets"] + verbs: ["list", "watch", "get"] --- # Source: volcano/templates/scheduler.yaml kind: ClusterRoleBinding @@ -8619,7 +8622,7 @@ metadata: prometheus.io/port: "8080" prometheus.io/scrape: "true" name: volcano-scheduler-service - namespace: volcano-system + namespace: volcano-system spec: ports: - port: 8080 diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 4c44774750..abbe89907a 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -489,6 +489,15 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu sc.informerFactory = informerFactory mySchedulerPodName, c := getMultiSchedulerInfo() + // explictly register informers to the factory, otherwise resources listers cannot get anything + // even with no erorr returned. `Namespace` informer is used by `InterPodAffinity` plugin, + // `SelectorSpread` and `PodTopologySpread` plugins uses the following four so far. + informerFactory.Core().V1().Namespaces().Informer() + informerFactory.Core().V1().Services().Informer() + informerFactory.Core().V1().ReplicationControllers().Informer() + informerFactory.Apps().V1().ReplicaSets().Informer() + informerFactory.Apps().V1().StatefulSets().Informer() + // create informer for node information sc.nodeInformer = informerFactory.Core().V1().Nodes() sc.nodeInformer.Informer().AddEventHandlerWithResyncPeriod( diff --git a/pkg/scheduler/plugins/nodeorder/nodeorder.go b/pkg/scheduler/plugins/nodeorder/nodeorder.go index 1d6db781d4..232e36157f 100644 --- a/pkg/scheduler/plugins/nodeorder/nodeorder.go +++ b/pkg/scheduler/plugins/nodeorder/nodeorder.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration" "volcano.sh/volcano/pkg/scheduler/api" @@ -58,6 +59,8 @@ const ( TaintTolerationWeight = "tainttoleration.weight" // ImageLocalityWeight is the key for providing Image Locality Priority Weight in YAML ImageLocalityWeight = "imagelocality.weight" + // SelectorSpreadWeight is the key for providing Selector Spread Priority Weight in YAML + selectorSpreadWeight = "selectorspread.weight" ) type nodeOrderPlugin struct { @@ -82,6 +85,7 @@ type priorityWeight struct { balancedResourceWeight int taintTolerationWeight int imageLocalityWeight int + selectorSpreadWeight int } // calculateWeight from the provided arguments. @@ -122,6 +126,7 @@ func calculateWeight(args framework.Arguments) priorityWeight { balancedResourceWeight: 1, taintTolerationWeight: 1, imageLocalityWeight: 1, + selectorSpreadWeight: 0, } // Checks whether nodeaffinity.weight is provided or not, if given, modifies the value in weight struct. @@ -145,6 +150,9 @@ func calculateWeight(args framework.Arguments) priorityWeight { // Checks whether imagelocality.weight is provided or not, if given, modifies the value in weight struct. args.GetInt(&weight.imageLocalityWeight, ImageLocalityWeight) + // Checks whether selectorspread.weight is provided or not, if given, modifies the value in weight struct. + args.GetInt(&weight.selectorSpreadWeight, selectorSpreadWeight) + return weight } @@ -252,6 +260,7 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { // If imageLocalityWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score. nodeScore += float64(score) * float64(weight.imageLocalityWeight) + klog.V(4).Infof("Image Locality score: %f", nodeScore) } // NodeResourcesLeastAllocated @@ -264,6 +273,7 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { // If leastReqWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score. nodeScore += float64(score) * float64(weight.leastReqWeight) + klog.V(4).Infof("Least Request score: %f", nodeScore) } // NodeResourcesMostAllocated @@ -276,6 +286,7 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { // If mostRequestedWeight is provided, host.Score is multiplied with weight, it's 0 by default nodeScore += float64(score) * float64(weight.mostReqWeight) + klog.V(4).Infof("Most Request score: %f", nodeScore) } // NodeResourcesBalancedAllocation @@ -288,6 +299,7 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { // If balancedResourceWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score. nodeScore += float64(score) * float64(weight.balancedResourceWeight) + klog.V(4).Infof("Balanced Request score: %f", nodeScore) } // NodeAffinity @@ -301,6 +313,7 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { // TODO: should we normalize the score // If nodeAffinityWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score. nodeScore += float64(score) * float64(weight.nodeAffinityWeight) + klog.V(4).Infof("Node Affinity score: %f", nodeScore) } klog.V(4).Infof("Total Score for task %s/%s on node %s is: %f", task.Namespace, task.Name, node.Name, nodeScore) @@ -315,6 +328,9 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { p, _ = tainttoleration.New(nil, handle) taintToleration := p.(*tainttoleration.TaintToleration) + p, _ = selectorspread.New(nil, handle) + selectorSpread := p.(*selectorspread.SelectorSpread) + batchNodeOrderFn := func(task *api.TaskInfo, nodeInfo []*api.NodeInfo) (map[string]float64, error) { // InterPodAffinity state := k8sframework.NewCycleState() @@ -334,8 +350,13 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { return nil, err } + selectorSpreadScores, err := selectorSpreadScore(selectorSpread, state, task.Pod, nodes, weight.selectorSpreadWeight) + if err != nil { + return nil, err + } + for _, node := range nodes { - nodeScores[node.Name] = podAffinityScores[node.Name] + nodeTolerationScores[node.Name] + nodeScores[node.Name] = podAffinityScores[node.Name] + nodeTolerationScores[node.Name] + selectorSpreadScores[node.Name] } klog.V(4).Infof("Batch Total Score for task %s/%s is: %v", task.Namespace, task.Name, nodeScores) @@ -463,5 +484,61 @@ func taintTolerationScore( return nodeScores, nil } +func selectorSpreadScore( + selectorSpread *selectorspread.SelectorSpread, + cycleState *k8sframework.CycleState, + pod *v1.Pod, + nodes []*v1.Node, + selectorSpreadWeight int, +) (map[string]float64, error) { + preScoreStatus := selectorSpread.PreScore(context.TODO(), cycleState, pod, nodes) + if !preScoreStatus.IsSuccess() { + return nil, preScoreStatus.AsError() + } + + nodescoreList := make(k8sframework.NodeScoreList, len(nodes)) + // size of errCh should be no less than parallelization number, see interPodAffinityScore. + workerNum := 16 + errCh := make(chan error, workerNum) + parallelizeContext, parallelizeCancel := context.WithCancel(context.TODO()) + workqueue.ParallelizeUntil(parallelizeContext, workerNum, len(nodes), func(index int) { + nodeName := nodes[index].Name + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s, status := selectorSpread.Score(ctx, cycleState, pod, nodeName) + if !status.IsSuccess() { + parallelizeCancel() + errCh <- fmt.Errorf("calculate selector spread priority failed %v", status.Message()) + return + } + nodescoreList[index] = k8sframework.NodeScore{ + Name: nodeName, + Score: s, + } + }) + + select { + case err := <-errCh: + return nil, err + default: + } + + selectorSpread.NormalizeScore(context.TODO(), cycleState, pod, nodescoreList) + + nodeScores := make(map[string]float64, len(nodes)) + for i, nodeScore := range nodescoreList { + // return error if score plugin returns invalid score. + if nodeScore.Score > k8sframework.MaxNodeScore || nodeScore.Score < k8sframework.MinNodeScore { + return nil, fmt.Errorf("selector spread returns an invalid score %v for node %s", nodeScore.Score, nodeScore.Name) + } + nodeScore.Score *= int64(selectorSpreadWeight) + nodescoreList[i] = nodeScore + nodeScores[nodeScore.Name] = float64(nodeScore.Score) + } + + klog.V(4).Infof("selector spread Score for task %s/%s is: %v", pod.Namespace, pod.Name, nodeScores) + return nodeScores, nil +} + func (pp *nodeOrderPlugin) OnSessionClose(ssn *framework.Session) { } diff --git a/vendor/k8s.io/component-helpers/node/topology/helpers.go b/vendor/k8s.io/component-helpers/node/topology/helpers.go new file mode 100644 index 0000000000..18c838cca5 --- /dev/null +++ b/vendor/k8s.io/component-helpers/node/topology/helpers.go @@ -0,0 +1,57 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topology + +import ( + "k8s.io/api/core/v1" +) + +// GetZoneKey is a helper function that builds a string identifier that is unique per failure-zone; +// it returns empty-string for no zone. +// Since there are currently two separate zone keys: +// * "failure-domain.beta.kubernetes.io/zone" +// * "topology.kubernetes.io/zone" +// GetZoneKey will first check failure-domain.beta.kubernetes.io/zone and if not exists, will then check +// topology.kubernetes.io/zone +func GetZoneKey(node *v1.Node) string { + labels := node.Labels + if labels == nil { + return "" + } + + // TODO: "failure-domain.beta..." names are deprecated, but will + // stick around a long time due to existing on old extant objects like PVs. + // Maybe one day we can stop considering them (see #88493). + zone, ok := labels[v1.LabelFailureDomainBetaZone] + if !ok { + zone, _ = labels[v1.LabelTopologyZone] + } + + region, ok := labels[v1.LabelFailureDomainBetaRegion] + if !ok { + region, _ = labels[v1.LabelTopologyRegion] + } + + if region == "" && zone == "" { + return "" + } + + // We include the null character just in case region or failureDomain has a colon + // (We do assume there's no null characters in a region or failureDomain) + // As a nice side-benefit, the null character is not printed by fmt.Print or glog + return region + ":\x00:" + zone +} diff --git a/vendor/k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread/selector_spread.go b/vendor/k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread/selector_spread.go new file mode 100644 index 0000000000..c50e0bfaf8 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread/selector_spread.go @@ -0,0 +1,234 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package selectorspread + +import ( + "context" + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + appslisters "k8s.io/client-go/listers/apps/v1" + corelisters "k8s.io/client-go/listers/core/v1" + utilnode "k8s.io/component-helpers/node/topology" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" +) + +// SelectorSpread is a plugin that calculates selector spread priority. +type SelectorSpread struct { + sharedLister framework.SharedLister + services corelisters.ServiceLister + replicationControllers corelisters.ReplicationControllerLister + replicaSets appslisters.ReplicaSetLister + statefulSets appslisters.StatefulSetLister +} + +var _ framework.PreScorePlugin = &SelectorSpread{} +var _ framework.ScorePlugin = &SelectorSpread{} + +const ( + // Name is the name of the plugin used in the plugin registry and configurations. + Name = names.SelectorSpread + // preScoreStateKey is the key in CycleState to SelectorSpread pre-computed data for Scoring. + preScoreStateKey = "PreScore" + Name + + // When zone information is present, give 2/3 of the weighting to zone spreading, 1/3 to node spreading + // TODO: Any way to justify this weighting? + zoneWeighting float64 = 2.0 / 3.0 +) + +// Name returns name of the plugin. It is used in logs, etc. +func (pl *SelectorSpread) Name() string { + return Name +} + +// preScoreState computed at PreScore and used at Score. +type preScoreState struct { + selector labels.Selector +} + +// Clone implements the mandatory Clone interface. We don't really copy the data since +// there is no need for that. +func (s *preScoreState) Clone() framework.StateData { + return s +} + +// skipSelectorSpread returns true if the pod's TopologySpreadConstraints are specified. +// Note that this doesn't take into account default constraints defined for +// the PodTopologySpread plugin. +func skipSelectorSpread(pod *v1.Pod) bool { + return len(pod.Spec.TopologySpreadConstraints) != 0 +} + +// Score invoked at the Score extension point. +// The "score" returned in this function is the matching number of pods on the `nodeName`, +// it is normalized later. +func (pl *SelectorSpread) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { + if skipSelectorSpread(pod) { + return 0, nil + } + + c, err := state.Read(preScoreStateKey) + if err != nil { + return 0, framework.AsStatus(fmt.Errorf("reading %q from cycleState: %w", preScoreStateKey, err)) + } + + s, ok := c.(*preScoreState) + if !ok { + return 0, framework.AsStatus(fmt.Errorf("cannot convert saved state to selectorspread.preScoreState")) + } + + nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName) + if err != nil { + return 0, framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", nodeName, err)) + } + + count := countMatchingPods(pod.Namespace, s.selector, nodeInfo) + return int64(count), nil +} + +// NormalizeScore invoked after scoring all nodes. +// For this plugin, it calculates the score of each node +// based on the number of existing matching pods on the node +// where zone information is included on the nodes, it favors nodes +// in zones with fewer existing matching pods. +func (pl *SelectorSpread) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status { + if skipSelectorSpread(pod) { + return nil + } + + countsByZone := make(map[string]int64, 10) + maxCountByZone := int64(0) + maxCountByNodeName := int64(0) + + for i := range scores { + if scores[i].Score > maxCountByNodeName { + maxCountByNodeName = scores[i].Score + } + nodeInfo, err := pl.sharedLister.NodeInfos().Get(scores[i].Name) + if err != nil { + return framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", scores[i].Name, err)) + } + zoneID := utilnode.GetZoneKey(nodeInfo.Node()) + if zoneID == "" { + continue + } + countsByZone[zoneID] += scores[i].Score + } + + for zoneID := range countsByZone { + if countsByZone[zoneID] > maxCountByZone { + maxCountByZone = countsByZone[zoneID] + } + } + + haveZones := len(countsByZone) != 0 + + maxCountByNodeNameFloat64 := float64(maxCountByNodeName) + maxCountByZoneFloat64 := float64(maxCountByZone) + MaxNodeScoreFloat64 := float64(framework.MaxNodeScore) + + for i := range scores { + // initializing to the default/max node score of maxPriority + fScore := MaxNodeScoreFloat64 + if maxCountByNodeName > 0 { + fScore = MaxNodeScoreFloat64 * (float64(maxCountByNodeName-scores[i].Score) / maxCountByNodeNameFloat64) + } + // If there is zone information present, incorporate it + if haveZones { + nodeInfo, err := pl.sharedLister.NodeInfos().Get(scores[i].Name) + if err != nil { + return framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", scores[i].Name, err)) + } + + zoneID := utilnode.GetZoneKey(nodeInfo.Node()) + if zoneID != "" { + zoneScore := MaxNodeScoreFloat64 + if maxCountByZone > 0 { + zoneScore = MaxNodeScoreFloat64 * (float64(maxCountByZone-countsByZone[zoneID]) / maxCountByZoneFloat64) + } + fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore) + } + } + scores[i].Score = int64(fScore) + } + return nil +} + +// ScoreExtensions of the Score plugin. +func (pl *SelectorSpread) ScoreExtensions() framework.ScoreExtensions { + return pl +} + +// PreScore builds and writes cycle state used by Score and NormalizeScore. +func (pl *SelectorSpread) PreScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status { + if skipSelectorSpread(pod) { + return nil + } + selector := helper.DefaultSelector( + pod, + pl.services, + pl.replicationControllers, + pl.replicaSets, + pl.statefulSets, + ) + state := &preScoreState{ + selector: selector, + } + cycleState.Write(preScoreStateKey, state) + return nil +} + +// New initializes a new plugin and returns it. +func New(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { + sharedLister := handle.SnapshotSharedLister() + if sharedLister == nil { + return nil, fmt.Errorf("SnapshotSharedLister is nil") + } + sharedInformerFactory := handle.SharedInformerFactory() + if sharedInformerFactory == nil { + return nil, fmt.Errorf("SharedInformerFactory is nil") + } + return &SelectorSpread{ + sharedLister: sharedLister, + services: sharedInformerFactory.Core().V1().Services().Lister(), + replicationControllers: sharedInformerFactory.Core().V1().ReplicationControllers().Lister(), + replicaSets: sharedInformerFactory.Apps().V1().ReplicaSets().Lister(), + statefulSets: sharedInformerFactory.Apps().V1().StatefulSets().Lister(), + }, nil +} + +// countMatchingPods counts pods based on namespace and matching all selectors +func countMatchingPods(namespace string, selector labels.Selector, nodeInfo *framework.NodeInfo) int { + if len(nodeInfo.Pods) == 0 || selector.Empty() { + return 0 + } + count := 0 + for _, p := range nodeInfo.Pods { + // Ignore pods being deleted for spreading purposes + // Similar to how it is done for SelectorSpreadPriority + if namespace == p.Pod.Namespace && p.Pod.DeletionTimestamp == nil { + if selector.Matches(labels.Set(p.Pod.Labels)) { + count++ + } + } + } + return count +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 1fbd1fc966..dbb7b88bda 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -912,6 +912,7 @@ k8s.io/component-base/metrics/testutil k8s.io/component-base/version # k8s.io/component-helpers v0.23.0 => k8s.io/component-helpers v0.23.0 ## explicit; go 1.16 +k8s.io/component-helpers/node/topology k8s.io/component-helpers/node/util/sysctl k8s.io/component-helpers/scheduling/corev1 k8s.io/component-helpers/scheduling/corev1/nodeaffinity @@ -996,6 +997,7 @@ k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits +k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding/metrics