From 5f93dd4e47547ba6736cb303aa82ccdde79474be Mon Sep 17 00:00:00 2001 From: duanmengkk Date: Tue, 23 Jan 2024 15:04:34 +0800 Subject: [PATCH] optimize the pv and pvc code Signed-off-by: duanmengkk --- .../controllers/pod/leaf_pod_controller.go | 8 +- .../controllers/pod/root_pod_controller.go | 21 +++-- .../controllers/pv/leaf_pv_controller.go | 14 ++-- .../controllers/pv/root_pv_controller.go | 39 ++++++++++ .../controllers/pvc/leaf_pvc_controller.go | 9 +-- .../controllers/pvc/root_pvc_controller.go | 78 +++++++++---------- pkg/utils/constants.go | 11 +-- 7 files changed, 102 insertions(+), 78 deletions(-) diff --git a/pkg/clustertree/cluster-manager/controllers/pod/leaf_pod_controller.go b/pkg/clustertree/cluster-manager/controllers/pod/leaf_pod_controller.go index 9f90f5595..0a368d63a 100644 --- a/pkg/clustertree/cluster-manager/controllers/pod/leaf_pod_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pod/leaf_pod_controller.go @@ -2,7 +2,6 @@ package pod import ( "context" - "time" "github.com/google/go-cmp/cmp" "github.com/pkg/errors" @@ -25,7 +24,6 @@ import ( const ( LeafPodControllerName = "leaf-pod-controller" - LeafPodRequeueTime = 10 * time.Second ) type LeafPodReconciler struct { @@ -40,12 +38,12 @@ func (r *LeafPodReconciler) Reconcile(ctx context.Context, request reconcile.Req if apierrors.IsNotFound(err) { // delete pod in root if err := DeletePodInRootCluster(ctx, request.NamespacedName, r.RootClient); err != nil { - return reconcile.Result{RequeueAfter: LeafPodRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } return reconcile.Result{}, nil } klog.Errorf("get %s error: %v", request.NamespacedName, err) - return reconcile.Result{RequeueAfter: LeafPodRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } podCopy := pod.DeepCopy() @@ -59,7 +57,7 @@ func (r *LeafPodReconciler) Reconcile(ctx context.Context, request reconcile.Req podCopy.ResourceVersion = "0" if err := r.RootClient.Status().Update(ctx, podCopy); err != nil && !apierrors.IsNotFound(err) { klog.V(4).Info(errors.Wrap(err, "error while updating pod status in kubernetes")) - return reconcile.Result{RequeueAfter: LeafPodRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } } return reconcile.Result{}, nil diff --git a/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go b/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go index 11c152baa..124b8f888 100644 --- a/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go @@ -37,7 +37,6 @@ import ( const ( RootPodControllerName = "root-pod-controller" - RootPodRequeueTime = 10 * time.Second ) type RootPodReconciler struct { @@ -128,17 +127,17 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req lr, err := r.GlobalLeafManager.GetLeafResourceByNodeName(nodeName) if err != nil { // wait for leaf resource init - return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } if err := r.DeletePodInLeafCluster(ctx, lr, request.NamespacedName, false); err != nil { klog.Errorf("delete pod in leaf error[1]: %v, %s", err, request.NamespacedName) - return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } } return reconcile.Result{}, nil } klog.Errorf("get %s error: %v", request.NamespacedName, err) - return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } rootpod := *(cachepod.DeepCopy()) @@ -154,7 +153,7 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req targetNode := &corev1.Node{} if err := r.RootClient.Get(ctx, nn, targetNode); err != nil { - return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } if targetNode.Annotations == nil { @@ -171,13 +170,13 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req // TODO: GlobalLeafResourceManager may not inited.... // belongs to the current node if !r.GlobalLeafManager.HasNode(rootpod.Spec.NodeName) { - return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } lr, err := r.GlobalLeafManager.GetLeafResourceByNodeName(rootpod.Spec.NodeName) if err != nil { // wait for leaf resource init - return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } // skip namespace @@ -189,7 +188,7 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req if !rootpod.GetDeletionTimestamp().IsZero() { if err := r.DeletePodInLeafCluster(ctx, lr, request.NamespacedName, true); err != nil { klog.Errorf("delete pod in leaf error[1]: %v, %s", err, request.NamespacedName) - return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } return reconcile.Result{}, nil } @@ -202,20 +201,20 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req if errors.IsNotFound(err) { if err := r.CreatePodInLeafCluster(ctx, lr, &rootpod, r.GlobalLeafManager.GetClusterNode(rootpod.Spec.NodeName).LeafNodeSelector); err != nil { klog.Errorf("create pod inleaf error, err: %s", err) - return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } else { return reconcile.Result{}, nil } } else { klog.Errorf("get pod in leaf error[3]: %v, %s", err, request.NamespacedName) - return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } } // update pod in leaf if podutils.ShouldEnqueue(leafPod, &rootpod) { if err := r.UpdatePodInLeafCluster(ctx, lr, &rootpod, leafPod, r.GlobalLeafManager.GetClusterNode(rootpod.Spec.NodeName).LeafNodeSelector); err != nil { - return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } } diff --git a/pkg/clustertree/cluster-manager/controllers/pv/leaf_pv_controller.go b/pkg/clustertree/cluster-manager/controllers/pv/leaf_pv_controller.go index 962de74fa..0723515bc 100644 --- a/pkg/clustertree/cluster-manager/controllers/pv/leaf_pv_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pv/leaf_pv_controller.go @@ -2,7 +2,6 @@ package pv import ( "context" - "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -26,7 +25,6 @@ import ( const ( LeafPVControllerName = "leaf-pv-controller" - LeafPVRequeueTime = 10 * time.Second ) type LeafPVController struct { @@ -44,7 +42,7 @@ func (l *LeafPVController) Reconcile(ctx context.Context, request reconcile.Requ if err != nil { if !errors.IsNotFound(err) { klog.Errorf("get pv from leaf cluster failed, error: %v", err) - return reconcile.Result{RequeueAfter: LeafPVRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } pvNeedDelete = true } @@ -55,7 +53,7 @@ func (l *LeafPVController) Reconcile(ctx context.Context, request reconcile.Requ if err != nil { if !errors.IsNotFound(err) { klog.Errorf("get root pv failed, error: %v", err) - return reconcile.Result{RequeueAfter: LeafPVRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } if pvNeedDelete || pv.DeletionTimestamp != nil { @@ -72,7 +70,7 @@ func (l *LeafPVController) Reconcile(ctx context.Context, request reconcile.Requ if err != nil { if !errors.IsNotFound(err) { klog.Errorf("get tmp pvc failed, error: %v", err) - return reconcile.Result{RequeueAfter: LeafPVRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } klog.Warningf("tmp pvc not exist, error: %v", err) return reconcile.Result{}, nil @@ -108,7 +106,7 @@ func (l *LeafPVController) Reconcile(ctx context.Context, request reconcile.Requ rootPV, err = l.RootClientSet.CoreV1().PersistentVolumes().Create(ctx, rootPV, metav1.CreateOptions{}) if err != nil || rootPV == nil { klog.Errorf("create pv in root cluster failed, error: %v", err) - return reconcile.Result{RequeueAfter: LeafPVRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } return reconcile.Result{}, nil @@ -123,7 +121,7 @@ func (l *LeafPVController) Reconcile(ctx context.Context, request reconcile.Requ if err = l.RootClientSet.CoreV1().PersistentVolumes().Delete(ctx, request.NamespacedName.Name, metav1.DeleteOptions{}); err != nil { if !errors.IsNotFound(err) { klog.Errorf("delete root pv failed, error: %v", err) - return reconcile.Result{RequeueAfter: LeafPVRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } } klog.V(4).Infof("root pv name: %q deleted", request.NamespacedName.Name) @@ -167,7 +165,7 @@ func (l *LeafPVController) Reconcile(ctx context.Context, request reconcile.Requ if err != nil { klog.Errorf("patch pv namespace: %q, name: %q to root cluster failed, error: %v", request.NamespacedName.Namespace, request.NamespacedName.Name, err) - return reconcile.Result{RequeueAfter: LeafPVRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } return reconcile.Result{}, nil } diff --git a/pkg/clustertree/cluster-manager/controllers/pv/root_pv_controller.go b/pkg/clustertree/cluster-manager/controllers/pv/root_pv_controller.go index 6d9489bde..4c64f494e 100644 --- a/pkg/clustertree/cluster-manager/controllers/pv/root_pv_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pv/root_pv_controller.go @@ -31,6 +31,22 @@ type RootPVController struct { } func (r *RootPVController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { + pv := &v1.PersistentVolume{} + shouldDelete := false + err := r.RootClient.Get(ctx, request.NamespacedName, pv) + if err != nil { + if !errors.IsNotFound(err) { + klog.Warningf("get pv from root cluster failed, error: %v", err) + return reconcile.Result{RequeueAfter: requeueTime}, nil + } + shouldDelete = true + pv.Namespace = request.Namespace + pv.Name = request.Name + } + + if !pv.DeletionTimestamp.IsZero() || shouldDelete { + return r.cleanupPv(pv) + } return reconcile.Result{}, nil } @@ -85,3 +101,26 @@ func (r *RootPVController) SetupWithManager(mgr manager.Manager) error { })). Complete(r) } + +func (r *RootPVController) cleanupPv(pv *v1.PersistentVolume) (reconcile.Result, error) { + clusters := utils.ListResourceClusters(pv.Annotations) + if len(clusters) == 0 { + klog.Warningf("pv leaf %q doesn't existed", pv.GetName()) + return reconcile.Result{}, nil + } + + lr, err := r.GlobalLeafManager.GetLeafResource(clusters[0]) + if err != nil { + klog.Warningf("pv leaf %q doesn't existed in LeafResources", pv.GetName()) + return reconcile.Result{}, nil + } + + if err = lr.Clientset.CoreV1().PersistentVolumes().Delete(context.TODO(), pv.GetName(), + metav1.DeleteOptions{}); err != nil { + if !errors.IsNotFound(err) { + klog.Errorf("delete pv from leaf cluster failed, %q, error: %v", pv.GetName(), err) + return reconcile.Result{RequeueAfter: requeueTime}, nil + } + } + return reconcile.Result{}, nil +} diff --git a/pkg/clustertree/cluster-manager/controllers/pvc/leaf_pvc_controller.go b/pkg/clustertree/cluster-manager/controllers/pvc/leaf_pvc_controller.go index 228f076a5..eb7597af2 100644 --- a/pkg/clustertree/cluster-manager/controllers/pvc/leaf_pvc_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pvc/leaf_pvc_controller.go @@ -28,7 +28,6 @@ import ( const ( LeafPVCControllerName = "leaf-pvc-controller" - LeafPVCRequeueTime = 10 * time.Second ) type LeafPVCController struct { @@ -44,7 +43,7 @@ func (l *LeafPVCController) Reconcile(ctx context.Context, request reconcile.Req if err != nil { if !errors.IsNotFound(err) { klog.Errorf("get pvc from leaf cluster failed, error: %v", err) - return reconcile.Result{RequeueAfter: LeafPVCRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } klog.V(4).Infof("leaf pvc namespace: %q, name: %q deleted", request.NamespacedName.Namespace, request.NamespacedName.Name) @@ -55,7 +54,7 @@ func (l *LeafPVCController) Reconcile(ctx context.Context, request reconcile.Req err = l.RootClient.Get(ctx, request.NamespacedName, rootPVC) if err != nil { if !errors.IsNotFound(err) { - return reconcile.Result{RequeueAfter: LeafPVCRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } klog.Warningf("pvc namespace: %q, name: %q has been deleted from root cluster", request.NamespacedName.Namespace, request.NamespacedName.Name) @@ -83,7 +82,7 @@ func (l *LeafPVCController) Reconcile(ctx context.Context, request reconcile.Req }) if err != nil { if !errors.IsNotFound(err) { - return reconcile.Result{RequeueAfter: LeafPVCRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } return reconcile.Result{}, nil } @@ -110,7 +109,7 @@ func (l *LeafPVCController) Reconcile(ctx context.Context, request reconcile.Req if err != nil { klog.Errorf("patch pvc namespace: %q, name: %q to root cluster failed, error: %v", request.NamespacedName.Namespace, request.NamespacedName.Name, err) - return reconcile.Result{RequeueAfter: RootPVCRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } return reconcile.Result{}, nil diff --git a/pkg/clustertree/cluster-manager/controllers/pvc/root_pvc_controller.go b/pkg/clustertree/cluster-manager/controllers/pvc/root_pvc_controller.go index ac36d0c46..d065f74d3 100644 --- a/pkg/clustertree/cluster-manager/controllers/pvc/root_pvc_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pvc/root_pvc_controller.go @@ -3,7 +3,6 @@ package pvc import ( "context" "reflect" - "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -26,7 +25,6 @@ import ( const ( RootPVCControllerName = "root-pvc-controller" - RootPVCRequeueTime = 10 * time.Second ) type RootPVCController struct { @@ -36,25 +34,32 @@ type RootPVCController struct { func (r *RootPVCController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { pvc := &v1.PersistentVolumeClaim{} + shouldDelete := false err := r.RootClient.Get(ctx, request.NamespacedName, pvc) if err != nil { if !errors.IsNotFound(err) { klog.Warningf("get pvc from root cluster failed, error: %v", err) - return reconcile.Result{RequeueAfter: RootPVCRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } - return reconcile.Result{}, nil + shouldDelete = true + pvc.Namespace = request.Namespace + pvc.Name = request.Name + } + + if !pvc.DeletionTimestamp.IsZero() || shouldDelete { + return r.cleanupPvc(pvc) } clusters := utils.ListResourceClusters(pvc.Annotations) if len(clusters) == 0 { klog.V(4).Infof("pvc leaf %q: %q doesn't existed", request.NamespacedName.Namespace, request.NamespacedName.Name) - return reconcile.Result{RequeueAfter: RootPVCRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } lr, err := r.GlobalLeafManager.GetLeafResource(clusters[0]) if err != nil { klog.Warningf("pvc leaf %q: %q doesn't existed in LeafResources", request.NamespacedName.Namespace, request.NamespacedName.Name) - return reconcile.Result{RequeueAfter: RootPVCRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } pvcOld := &v1.PersistentVolumeClaim{} @@ -62,7 +67,7 @@ func (r *RootPVCController) Reconcile(ctx context.Context, request reconcile.Req if err != nil { if !errors.IsNotFound(err) { klog.Warningf("get pvc from leaf cluster failed, error: %v", err) - return reconcile.Result{RequeueAfter: RootPVCRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } // TODO Create? return reconcile.Result{}, nil @@ -92,7 +97,7 @@ func (r *RootPVCController) Reconcile(ctx context.Context, request reconcile.Req if err != nil && !errors.IsNotFound(err) { klog.Errorf("patch pvc namespace: %q, name: %q from root cluster failed, error: %v", request.NamespacedName.Namespace, request.NamespacedName.Name, err) - return reconcile.Result{RequeueAfter: RootPVCRequeueTime}, nil + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } return reconcile.Result{}, nil @@ -112,40 +117,7 @@ func (r *RootPVCController) SetupWithManager(mgr manager.Manager) error { return !podutils.IsOneWayPVC(curr) }, DeleteFunc: func(deleteEvent event.DeleteEvent) bool { - if deleteEvent.DeleteStateUnknown { - //TODO ListAndDelete - klog.Warningf("missing delete pvc root event %q: %q", deleteEvent.Object.GetNamespace(), deleteEvent.Object.GetName()) - return false - } - - pvc := deleteEvent.Object.(*v1.PersistentVolumeClaim) - // skip one way pvc, oneway_pv_controller will handle this PVC - if podutils.IsOneWayPVC(pvc) { - return false - } - - clusters := utils.ListResourceClusters(pvc.Annotations) - if len(clusters) == 0 { - klog.V(4).Infof("pvc leaf %q: %q doesn't existed", deleteEvent.Object.GetNamespace(), deleteEvent.Object.GetName()) - return false - } - - lr, err := r.GlobalLeafManager.GetLeafResource(clusters[0]) - if err != nil { - klog.Warningf("pvc leaf %q: %q doesn't existed in LeafResources", deleteEvent.Object.GetNamespace(), - deleteEvent.Object.GetName()) - return false - } - - if err = lr.Clientset.CoreV1().PersistentVolumeClaims(deleteEvent.Object.GetNamespace()).Delete(context.TODO(), - deleteEvent.Object.GetName(), metav1.DeleteOptions{}); err != nil { - if !errors.IsNotFound(err) { - klog.Errorf("delete pvc from leaf cluster failed, %q: %q, error: %v", deleteEvent.Object.GetNamespace(), - deleteEvent.Object.GetName(), err) - } - } - - return false + return true }, GenericFunc: func(genericEvent event.GenericEvent) bool { return false @@ -153,3 +125,25 @@ func (r *RootPVCController) SetupWithManager(mgr manager.Manager) error { })). Complete(r) } + +func (r *RootPVCController) cleanupPvc(pvc *v1.PersistentVolumeClaim) (reconcile.Result, error) { + clusters := utils.ListResourceClusters(pvc.Annotations) + if len(clusters) == 0 { + klog.V(4).Infof("pvc leaf %q: %q doesn't existed", pvc.GetNamespace(), pvc.GetName()) + return reconcile.Result{}, nil + } + + lr, err := r.GlobalLeafManager.GetLeafResource(clusters[0]) + if err != nil { + klog.Warningf("pvc leaf %q: %q doesn't existed in LeafResources", pvc.GetNamespace(), pvc.GetName()) + return reconcile.Result{}, nil + } + + if err = lr.Clientset.CoreV1().PersistentVolumeClaims(pvc.GetNamespace()).Delete(context.TODO(), pvc.GetName(), metav1.DeleteOptions{}); err != nil { + if !errors.IsNotFound(err) { + klog.Errorf("delete pvc from leaf cluster failed, %q: %q, error: %v", pvc.GetNamespace(), pvc.GetName(), err) + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, err + } + } + return reconcile.Result{}, nil +} diff --git a/pkg/utils/constants.go b/pkg/utils/constants.go index 145496536..2ff95ea82 100644 --- a/pkg/utils/constants.go +++ b/pkg/utils/constants.go @@ -1,6 +1,8 @@ package utils import ( + "time" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" ) @@ -133,13 +135,6 @@ const ( DefaultK8sArch = "amd64" DefaultInformerResyncPeriod = 0 - DefaultListenPort = 10250 - DefaultPodSyncWorkers = 10 - DefaultWorkers = 5 - DefaultKubeNamespace = corev1.NamespaceAll - - DefaultTaintEffect = string(corev1.TaintEffectNoSchedule) - DefaultTaintKey = "kosmos-node.io/plugin" DefaultLeafKubeQPS = 40.0 DefaultLeafKubeBurst = 60 @@ -152,6 +147,8 @@ const ( // LabelNodeRoleNode specifies that a node hosts node components LabelNodeRoleNode = "node-role.kubernetes.io/node" + + DefaultRequeueTime = 10 * time.Second ) const (