From 82a06686a1938406f62097d60ba7a84830cb2f59 Mon Sep 17 00:00:00 2001 From: renxiangyu_yewu Date: Fri, 8 Nov 2024 19:39:56 +0800 Subject: [PATCH] fix: fix kosmos-scheduler reschedule pv's pod Signed-off-by: renxiangyu_yewu --- .../leafnode_volume_binding.go | 343 +++++++++++++++++- 1 file changed, 327 insertions(+), 16 deletions(-) diff --git a/pkg/scheduler/lifted/plugins/leafnodevolumebinding/leafnode_volume_binding.go b/pkg/scheduler/lifted/plugins/leafnodevolumebinding/leafnode_volume_binding.go index 9c7b09223..2b24d1c4a 100644 --- a/pkg/scheduler/lifted/plugins/leafnodevolumebinding/leafnode_volume_binding.go +++ b/pkg/scheduler/lifted/plugins/leafnodevolumebinding/leafnode_volume_binding.go @@ -24,20 +24,29 @@ import ( "context" "errors" "fmt" + "sort" "sync" "time" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" corelisters "k8s.io/client-go/listers/core/v1" + storagelisters "k8s.io/client-go/listers/storage/v1" + corev1helpers "k8s.io/component-helpers/scheduling/corev1" "k8s.io/component-helpers/storage/ephemeral" + "k8s.io/component-helpers/storage/volume" "k8s.io/klog/v2" + v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/scheduler/framework" scheduling "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding/metrics" "github.com/kosmos.io/kosmos/pkg/apis/config" "github.com/kosmos.io/kosmos/pkg/scheduler/lifted/helpers" + "github.com/kosmos.io/kosmos/pkg/utils" ) const ( @@ -70,8 +79,11 @@ func (d *stateData) Clone() framework.StateData { type VolumeBinding struct { Binder scheduling.SchedulerVolumeBinder PVCLister corelisters.PersistentVolumeClaimLister + PVLister corelisters.PersistentVolumeLister NodeLister corelisters.NodeLister + SCLister storagelisters.StorageClassLister frameworkHandler framework.Handle + pvCache scheduling.PVAssumeCache } var _ framework.PreFilterPlugin = &VolumeBinding{} @@ -204,22 +216,22 @@ func (pl *VolumeBinding) Filter(_ context.Context, cs *framework.CycleState, pod return framework.AsStatus(err) } - if helpers.HasLeafNodeTaint(node) { - if cluster, ok := node.Annotations[nodeMode]; ok && cluster == "one2cluster" { - klog.V(5).InfoS("This is one2cluster ", "pod", klog.KObj(pod), "node", klog.KObj(node)) - return nil - } else { - if len(state.boundClaims) <= 0 { - return nil - } - } - } - if state.skip { return nil } - podVolumes, reasons, err := pl.Binder.FindPodVolumes(pod, state.boundClaims, state.claimsToBind, node) + podVolumes := &scheduling.PodVolumes{} + reasons := scheduling.ConflictReasons{} + isKosmosClusterNode := false + + // If the node is a kosmos node, and it is in "one2cluster" mode, it needs to be processed separately. + cluster, ok := node.Annotations[nodeMode] + if helpers.HasLeafNodeTaint(node) && ok && cluster == "one2cluster" { + isKosmosClusterNode = true + _, reasons, err = pl.FindPodKosmosVolumes(pod, state.boundClaims, state.claimsToBind, node) + } else { + podVolumes, reasons, err = pl.Binder.FindPodVolumes(pod, state.boundClaims, state.claimsToBind, node) + } if err != nil { return framework.AsStatus(err) @@ -233,13 +245,309 @@ func (pl *VolumeBinding) Filter(_ context.Context, cs *framework.CycleState, pod return status } - // multiple goroutines call `Filter` on different nodes simultaneously and the `CycleState` may be duplicated, so we must use a local lock here - state.Lock() - state.podVolumesByNode[node.Name] = podVolumes - state.Unlock() + // this operation is not required for kosmos node + if !isKosmosClusterNode { + // multiple goroutines call `Filter` on different nodes simultaneously and the `CycleState` may be duplicated, so we must use a local lock here + state.Lock() + state.podVolumesByNode[node.Name] = podVolumes + state.Unlock() + } + return nil } +// FindPodKosmosVolumes finds the matching PVs for PVCs and kosmos nodes to provision PVs +// for the given pod and node. If the kosmos node does not fit, confilict reasons are +// returned. +func (pl *VolumeBinding) FindPodKosmosVolumes(pod *corev1.Pod, boundClaims, claimsToBind []*corev1.PersistentVolumeClaim, node *corev1.Node) (podVolumes *PodVolumes, reasons scheduling.ConflictReasons, err error) { + podVolumes = &PodVolumes{} + + // Warning: Below log needs high verbosity as it can be printed several times (#60933). + klog.V(5).InfoS("FindPodKosmosVolumes", "pod", klog.KObj(pod), "node", klog.KObj(node)) + + // Initialize to true for pods that don't have volumes. These + // booleans get translated into reason strings when the function + // returns without an error. + unboundVolumesSatisfied := true + boundVolumesSatisfied := true + sufficientStorage := true + boundPVsFound := true + defer func() { + if err != nil { + return + } + if !boundVolumesSatisfied { + reasons = append(reasons, scheduling.ErrReasonNodeConflict) + } + if !unboundVolumesSatisfied { + reasons = append(reasons, scheduling.ErrReasonBindConflict) + } + if !sufficientStorage { + reasons = append(reasons, scheduling.ErrReasonNotEnoughSpace) + } + if !boundPVsFound { + reasons = append(reasons, scheduling.ErrReasonPVNotExist) + } + }() + + defer func() { + if err != nil { + metrics.VolumeSchedulingStageFailed.WithLabelValues("predicate").Inc() + } + }() + + var ( + staticBindings []*BindingInfo + dynamicProvisions []*corev1.PersistentVolumeClaim + ) + defer func() { + // Although we do not distinguish nil from empty in this function, for + // easier testing, we normalize empty to nil. + if len(staticBindings) == 0 { + staticBindings = nil + } + if len(dynamicProvisions) == 0 { + dynamicProvisions = nil + } + podVolumes.StaticBindings = staticBindings + podVolumes.DynamicProvisions = dynamicProvisions + }() + + // Check PV node affinity on bound volumes + if len(boundClaims) > 0 { + boundVolumesSatisfied, boundPVsFound, err = pl.checkBoundClaims(boundClaims, node, pod) + if err != nil { + return + } + } + + // Find matching volumes and node for unbound claims + if len(claimsToBind) > 0 { + var ( + claimsToFindMatching []*corev1.PersistentVolumeClaim + claimsToProvision []*corev1.PersistentVolumeClaim + ) + + // Filter out claims to provision + for _, claim := range claimsToBind { + if clusterOwners, ok := claim.Annotations[utils.KosmosResourceOwnersAnnotations]; ok { + if clusterOwners != node.Annotations[utils.KosmosNodeOwnedByClusterAnnotations] { + // Fast path, skip unmatched node. + unboundVolumesSatisfied = false + return + } + claimsToProvision = append(claimsToProvision, claim) + } else { + claimsToFindMatching = append(claimsToFindMatching, claim) + } + } + + // Find matching volumes + if len(claimsToFindMatching) > 0 { + var unboundClaims []*corev1.PersistentVolumeClaim + unboundVolumesSatisfied, staticBindings, unboundClaims, err = pl.findMatchingVolumes(pod, claimsToFindMatching, node) + if err != nil { + return + } + claimsToProvision = append(claimsToProvision, unboundClaims...) + } + + // Check for claims to provision. This is the first time where we potentially + // find out that storage is not sufficient for the node. + if len(claimsToProvision) > 0 { + unboundVolumesSatisfied, sufficientStorage, dynamicProvisions, err = pl.checkVolumeProvisions(pod, claimsToProvision, node) + if err != nil { + return + } + } + } + + return +} + +// checkVolumeProvisions checks given unbound claims (the claims have gone through func +// findMatchingVolumes, and do not have matching volumes for binding), and return true +// if all the claims are eligible for dynamic provision. +func (pl *VolumeBinding) checkVolumeProvisions(pod *corev1.Pod, claimsToProvision []*corev1.PersistentVolumeClaim, node *corev1.Node) (provisionSatisfied, sufficientStorage bool, dynamicProvisions []*corev1.PersistentVolumeClaim, err error) { + dynamicProvisions = []*corev1.PersistentVolumeClaim{} + + // We return early with provisionedClaims == nil if a check + // fails or we encounter an error. + for _, claim := range claimsToProvision { + pvcName := getPVCName(claim) + className := volume.GetPersistentVolumeClaimClass(claim) + if className == "" { + return false, false, nil, fmt.Errorf("no class for claim %q", pvcName) + } + + class, err := pl.SCLister.Get(className) + if err != nil { + return false, false, nil, fmt.Errorf("failed to find storage class %q", className) + } + provisioner := class.Provisioner + if provisioner == "" || provisioner == volume.NotSupportedProvisioner { + klog.V(4).InfoS("Storage class of claim does not support dynamic provisioning", "storageClassName", className, "PVC", klog.KObj(claim)) + return false, true, nil, nil + } + + // Check if the node can satisfy the topology requirement in the class + if !v1helper.MatchTopologySelectorTerms(class.AllowedTopologies, labels.Set(node.Labels)) { + klog.V(4).InfoS("Node cannot satisfy provisioning topology requirements of claim", "node", klog.KObj(node), "PVC", klog.KObj(claim)) + return false, true, nil, nil + } + + dynamicProvisions = append(dynamicProvisions, claim) + + } + klog.V(4).InfoS("Provisioning for claims of pod that has no matching volumes...", "claimCount", len(claimsToProvision), "pod", klog.KObj(pod), "node", klog.KObj(node)) + + return true, true, dynamicProvisions, nil +} + +func getPVCName(pvc *corev1.PersistentVolumeClaim) string { + return pvc.Namespace + "/" + pvc.Name +} + +// findMatchingVolumes tries to find matching volumes for given claims, +// and return unbound claims for further provision. +func (pl *VolumeBinding) findMatchingVolumes(pod *corev1.Pod, claimsToBind []*corev1.PersistentVolumeClaim, node *corev1.Node) (foundMatches bool, bindings []*BindingInfo, unboundClaims []*corev1.PersistentVolumeClaim, err error) { + // Sort all the claims by increasing size request to get the smallest fits + sort.Sort(byPVCSize(claimsToBind)) + + chosenPVs := map[string]*corev1.PersistentVolume{} + + foundMatches = true + + for _, pvc := range claimsToBind { + // Get storage class name from each PVC + storageClassName := volume.GetPersistentVolumeClaimClass(pvc) + allPVs := pl.pvCache.ListPVs(storageClassName) + + // Find a matching PV + pv, err := volume.FindMatchingVolume(pvc, allPVs, node, chosenPVs, true) + if err != nil { + return false, nil, nil, err + } + if pv == nil { + klog.V(4).InfoS("No matching volumes for pod", "pod", klog.KObj(pod), "PVC", klog.KObj(pvc), "node", klog.KObj(node)) + unboundClaims = append(unboundClaims, pvc) + foundMatches = false + continue + } + + // matching PV needs to be excluded so we don't select it again + chosenPVs[pv.Name] = pv + bindings = append(bindings, &BindingInfo{pvc: pvc, pv: pv}) + klog.V(5).InfoS("Found matching PV for PVC for pod", "PV", klog.KObj(pv), "PVC", klog.KObj(pvc), "node", klog.KObj(node), "pod", klog.KObj(pod)) + } + + if foundMatches { + klog.V(4).InfoS("Found matching volumes for pod", "pod", klog.KObj(pod), "node", klog.KObj(node)) + } + + return +} + +// checkBoundClaims Check whether the kosmos cluster owner annotation of the bound pvc/pvc matches the owner annotation of the node +func (pl *VolumeBinding) checkBoundClaims(claims []*corev1.PersistentVolumeClaim, node *corev1.Node, pod *corev1.Pod) (bool, bool, error) { + for _, pvc := range claims { + pvName := pvc.Spec.VolumeName + pv, err := pl.PVLister.Get(pvName) + if err != nil { + if apierrors.IsNotFound(err) { + err = nil + } + return true, false, err + } + + // todo Verification of migrated pods is not currently supported. + // translator.IsPVMigratable(pv) + + err = pl.checkKosmosResourceOwner(pv, node) + if err != nil { + klog.V(4).InfoS("PersistentVolume and node mismatch for pod", "PV", klog.KRef("", pvName), "node", klog.KObj(node), "pod", klog.KObj(pod), "err", err) + return false, true, nil + } + klog.V(5).InfoS("PersistentVolume and node matches for pod", "PV", klog.KRef("", pvName), "node", klog.KObj(node), "pod", klog.KObj(pod)) + } + + klog.V(4).InfoS("All bound volumes for pod match with node", "pod", klog.KObj(pod), "node", klog.KObj(node)) + return true, true, nil +} + +// checkKosmosResourceOwner looks at the PV node affinity, and checks if the kosmos node has the same corresponding labels +// This ensures that we don't mount a volume that doesn't belong to this node +func (pl *VolumeBinding) checkKosmosResourceOwner(pv *corev1.PersistentVolume, node *corev1.Node) error { + clusterOwners, ok := pv.Annotations[utils.KosmosResourceOwnersAnnotations] + if !ok { + // For pvc that has been bound to pv, but is not managed by kosmos + err := CheckNodeAffinity(pv, node.Labels) + return err + } + + if !(clusterOwners == node.Annotations[utils.KosmosNodeOwnedByClusterAnnotations]) { + klog.V(4).InfoS("This pv does not belong to the kosmos node", "node", klog.KObj(node), "pv", klog.KObj(pv)) + return fmt.Errorf("the owner cluster %s of the pv mismatch the owner cluster of %s this kosmos node", clusterOwners, node.Annotations[utils.KosmosNodeOwnedByClusterAnnotations]) + } + + return nil +} + +// CheckNodeAffinity looks at the PV node affinity, and checks if the node has the same corresponding labels +// This ensures that we don't mount a volume that doesn't belong to this node +func CheckNodeAffinity(pv *corev1.PersistentVolume, nodeLabels map[string]string) error { + if pv.Spec.NodeAffinity == nil { + return fmt.Errorf("node Affinity not specified") + } + + if pv.Spec.NodeAffinity.Required != nil { + node := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Labels: nodeLabels}} + terms := pv.Spec.NodeAffinity.Required + if matches, err := corev1helpers.MatchNodeSelectorTerms(node, terms); err != nil { + return err + } else if !matches { + return fmt.Errorf("no matching NodeSelectorTerms") + } + } + + return nil +} + +type byPVCSize []*corev1.PersistentVolumeClaim + +func (a byPVCSize) Len() int { + return len(a) +} + +func (a byPVCSize) Swap(i, j int) { + a[i], a[j] = a[j], a[i] +} + +func (a byPVCSize) Less(i, j int) bool { + iSize := a[i].Spec.Resources.Requests[corev1.ResourceStorage] + jSize := a[j].Spec.Resources.Requests[corev1.ResourceStorage] + // return true if iSize is less than jSize + return iSize.Cmp(jSize) == -1 +} + +// BindingInfo holds a binding between PV and PVC. +type BindingInfo struct { + // PVC that needs to be bound + pvc *corev1.PersistentVolumeClaim + + // Proposed PV to bind to this PVC + pv *corev1.PersistentVolume +} + +// PodVolumes holds pod's volumes information used in volume scheduling. +type PodVolumes struct { + // StaticBindings are binding decisions for PVCs which can be bound to + // pre-provisioned static PVs. + StaticBindings []*BindingInfo + // DynamicProvisions are PVCs that require dynamic provisioning + DynamicProvisions []*corev1.PersistentVolumeClaim +} + // Reserve reserves volumes of pod and saves binding status in cycle state. func (pl *VolumeBinding) Reserve(_ context.Context, cs *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status { node, err := pl.NodeLister.Get(nodeName) @@ -355,6 +663,9 @@ func New(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) { Binder: binder, PVCLister: pvcInformer.Lister(), NodeLister: nodeInformer.Lister(), + PVLister: pvInformer.Lister(), + SCLister: storageClassInformer.Lister(), frameworkHandler: fh, + pvCache: scheduling.NewPVAssumeCache(pvInformer.Informer()), }, nil }